Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/base.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

781 statements  

1# dialects/sqlite/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9 

10r''' 

11.. dialect:: sqlite 

12 :name: SQLite 

13 :normal_support: 3.12+ 

14 :best_effort: 3.7.16+ 

15 

16.. _sqlite_datetime: 

17 

18Date and Time Types 

19------------------- 

20 

21SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does 

22not provide out of the box functionality for translating values between Python 

23`datetime` objects and a SQLite-supported format. SQLAlchemy's own 

24:class:`~sqlalchemy.types.DateTime` and related types provide date formatting 

25and parsing functionality when SQLite is used. The implementation classes are 

26:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`. 

27These types represent dates and times as ISO formatted strings, which also 

28nicely support ordering. There's no reliance on typical "libc" internals for 

29these functions so historical dates are fully supported. 

30 

31Ensuring Text affinity 

32^^^^^^^^^^^^^^^^^^^^^^ 

33 

34The DDL rendered for these types is the standard ``DATE``, ``TIME`` 

35and ``DATETIME`` indicators. However, custom storage formats can also be 

36applied to these types. When the 

37storage format is detected as containing no alpha characters, the DDL for 

38these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``, 

39so that the column continues to have textual affinity. 

40 

41.. seealso:: 

42 

43 `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ - 

44 in the SQLite documentation 

45 

46.. _sqlite_autoincrement: 

47 

48SQLite Auto Incrementing Behavior 

49---------------------------------- 

50 

51Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html 

52 

53Key concepts: 

54 

55* SQLite has an implicit "auto increment" feature that takes place for any 

56 non-composite primary-key column that is specifically created using 

57 "INTEGER PRIMARY KEY" for the type + primary key. 

58 

59* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not** 

60 equivalent to the implicit autoincrement feature; this keyword is not 

61 recommended for general use. SQLAlchemy does not render this keyword 

62 unless a special SQLite-specific directive is used (see below). However, 

63 it still requires that the column's type is named "INTEGER". 

64 

65Using the AUTOINCREMENT Keyword 

66^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

67 

68To specifically render the AUTOINCREMENT keyword on the primary key column 

69when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table 

70construct:: 

71 

72 Table( 

73 "sometable", 

74 metadata, 

75 Column("id", Integer, primary_key=True), 

76 sqlite_autoincrement=True, 

77 ) 

78 

79Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER 

80^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

81 

82SQLite's typing model is based on naming conventions. Among other things, this 

83means that any type name which contains the substring ``"INT"`` will be 

84determined to be of "integer affinity". A type named ``"BIGINT"``, 

85``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be 

86of "integer" affinity. However, **the SQLite autoincrement feature, whether 

87implicitly or explicitly enabled, requires that the name of the column's type 

88is exactly the string "INTEGER"**. Therefore, if an application uses a type 

89like :class:`.BigInteger` for a primary key, on SQLite this type will need to 

90be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE 

91TABLE`` statement in order for the autoincrement behavior to be available. 

92 

93One approach to achieve this is to use :class:`.Integer` on SQLite 

94only using :meth:`.TypeEngine.with_variant`:: 

95 

96 table = Table( 

97 "my_table", 

98 metadata, 

99 Column( 

100 "id", 

101 BigInteger().with_variant(Integer, "sqlite"), 

102 primary_key=True, 

103 ), 

104 ) 

105 

106Another is to use a subclass of :class:`.BigInteger` that overrides its DDL 

107name to be ``INTEGER`` when compiled against SQLite:: 

108 

109 from sqlalchemy import BigInteger 

110 from sqlalchemy.ext.compiler import compiles 

111 

112 

113 class SLBigInteger(BigInteger): 

114 pass 

115 

116 

117 @compiles(SLBigInteger, "sqlite") 

118 def bi_c(element, compiler, **kw): 

119 return "INTEGER" 

120 

121 

122 @compiles(SLBigInteger) 

123 def bi_c(element, compiler, **kw): 

124 return compiler.visit_BIGINT(element, **kw) 

125 

126 

127 table = Table( 

128 "my_table", metadata, Column("id", SLBigInteger(), primary_key=True) 

129 ) 

130 

131.. seealso:: 

132 

133 :meth:`.TypeEngine.with_variant` 

134 

135 :ref:`sqlalchemy.ext.compiler_toplevel` 

136 

137 `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_ 

138 

139.. _sqlite_transactions: 

140 

141Transactions with SQLite and the sqlite3 driver 

142----------------------------------------------- 

143 

144As a file-based database, SQLite's approach to transactions differs from 

145traditional databases in many ways. Additionally, the ``sqlite3`` driver 

146standard with Python (as well as the async version ``aiosqlite`` which builds 

147on top of it) has several quirks, workarounds, and API features in the 

148area of transaction control, all of which generally need to be addressed when 

149constructing a SQLAlchemy application that uses SQLite. 

150 

151Legacy Transaction Mode with the sqlite3 driver 

152^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

153 

154The most important aspect of transaction handling with the sqlite3 driver is 

155that it defaults (which will continue through Python 3.15 before being 

156removed in Python 3.16) to legacy transactional behavior which does 

157not strictly follow :pep:`249`. The way in which the driver diverges from the 

158PEP is that it does not "begin" a transaction automatically as dictated by 

159:pep:`249` except in the case of DML statements, e.g. INSERT, UPDATE, and 

160DELETE. Normally, :pep:`249` dictates that a BEGIN must be emitted upon 

161the first SQL statement of any kind, so that all subsequent operations will 

162be established within a transaction until ``connection.commit()`` has been 

163called. The ``sqlite3`` driver, in an effort to be easier to use in 

164highly concurrent environments, skips this step for DQL (e.g. SELECT) statements, 

165and also skips it for DDL (e.g. CREATE TABLE etc.) statements for more legacy 

166reasons. Statements such as SAVEPOINT are also skipped. 

167 

168In modern versions of the ``sqlite3`` driver as of Python 3.12, this legacy 

169mode of operation is referred to as 

170`"legacy transaction control" <https://docs.python.org/3/library/sqlite3.html#sqlite3-transaction-control-isolation-level>`_, and is in 

171effect by default due to the ``Connection.autocommit`` parameter being set to 

172the constant ``sqlite3.LEGACY_TRANSACTION_CONTROL``. Prior to Python 3.12, 

173the ``Connection.autocommit`` attribute did not exist. 

174 

175The implications of legacy transaction mode include: 

176 

177* **Incorrect support for transactional DDL** - statements like CREATE TABLE, ALTER TABLE, 

178 CREATE INDEX etc. will not automatically BEGIN a transaction if one were not 

179 started already, leading to the changes by each statement being 

180 "autocommitted" immediately unless BEGIN were otherwise emitted first. Very 

181 old (pre Python 3.6) versions of SQLite would also force a COMMIT for these 

182 operations even if a transaction were present, however this is no longer the 

183 case. 

184* **SERIALIZABLE behavior not fully functional** - SQLite's transaction isolation 

185 behavior is normally consistent with SERIALIZABLE isolation, as it is a file- 

186 based system that locks the database file entirely for write operations, 

187 preventing COMMIT until all reader transactions (and associated file locks) 

188 have completed. However, sqlite3's legacy transaction mode fails to emit BEGIN for SELECT 

189 statements, which causes these SELECT statements to no longer be "repeatable", 

190 failing one of the consistency guarantees of SERIALIZABLE. 

191* **Incorrect behavior for SAVEPOINT** - as the SAVEPOINT statement does not 

192 imply a BEGIN, a new SAVEPOINT emitted before a BEGIN will function on its 

193 own but fails to participate in the enclosing transaction, meaning a ROLLBACK 

194 of the transaction will not rollback elements that were part of a released 

195 savepoint. 

196 

197Legacy transaction mode first existed in order to faciliate working around 

198SQLite's file locks. Because SQLite relies upon whole-file locks, it is easy to 

199get "database is locked" errors, particularly when newer features like "write 

200ahead logging" are disabled. This is a key reason why ``sqlite3``'s legacy 

201transaction mode is still the default mode of operation; disabling it will 

202produce behavior that is more susceptible to locked database errors. However 

203note that **legacy transaction mode will no longer be the default** in a future 

204Python version (3.16 as of this writing). 

205 

206.. _sqlite_enabling_transactions: 

207 

208Enabling Non-Legacy SQLite Transactional Modes with the sqlite3 or aiosqlite driver 

209^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

210 

211Current SQLAlchemy support allows either for setting the 

212``.Connection.autocommit`` attribute, most directly by using a 

213:func:`._sa.create_engine` parameter, or if on an older version of Python where 

214the attribute is not available, using event hooks to control the behavior of 

215BEGIN. 

216 

217* **Enabling modern sqlite3 transaction control via the autocommit connect parameter** (Python 3.12 and above) 

218 

219 To use SQLite in the mode described at `Transaction control via the autocommit attribute <https://docs.python.org/3/library/sqlite3.html#transaction-control-via-the-autocommit-attribute>`_, 

220 the most straightforward approach is to set the attribute to its recommended value 

221 of ``False`` at the connect level using :paramref:`_sa.create_engine.connect_args``:: 

222 

223 from sqlalchemy import create_engine 

224 

225 engine = create_engine( 

226 "sqlite:///myfile.db", connect_args={"autocommit": False} 

227 ) 

228 

229 This parameter is also passed through when using the aiosqlite driver:: 

230 

231 from sqlalchemy.ext.asyncio import create_async_engine 

232 

233 engine = create_async_engine( 

234 "sqlite+aiosqlite:///myfile.db", connect_args={"autocommit": False} 

235 ) 

236 

237 The parameter can also be set at the attribute level using the :meth:`.PoolEvents.connect` 

238 event hook, however this will only work for sqlite3, as aiosqlite does not yet expose this 

239 attribute on its ``Connection`` object:: 

240 

241 from sqlalchemy import create_engine, event 

242 

243 engine = create_engine("sqlite:///myfile.db") 

244 

245 

246 @event.listens_for(engine, "connect") 

247 def do_connect(dbapi_connection, connection_record): 

248 # enable autocommit=False mode 

249 dbapi_connection.autocommit = False 

250 

251* **Using SQLAlchemy to emit BEGIN in lieu of SQLite's transaction control** (all Python versions, sqlite3 and aiosqlite) 

252 

253 For older versions of ``sqlite3`` or for cross-compatiblity with older and 

254 newer versions, SQLAlchemy can also take over the job of transaction control. 

255 This is achieved by using the :meth:`.ConnectionEvents.begin` hook 

256 to emit the "BEGIN" command directly, while also disabling SQLite's control 

257 of this command using the :meth:`.PoolEvents.connect` event hook to set the 

258 ``Connection.isolation_level`` attribute to ``None``:: 

259 

260 

261 from sqlalchemy import create_engine, event 

262 

263 engine = create_engine("sqlite:///myfile.db") 

264 

265 

266 @event.listens_for(engine, "connect") 

267 def do_connect(dbapi_connection, connection_record): 

268 # disable sqlite3's emitting of the BEGIN statement entirely. 

269 dbapi_connection.isolation_level = None 

270 

271 

272 @event.listens_for(engine, "begin") 

273 def do_begin(conn): 

274 # emit our own BEGIN. sqlite3 still emits COMMIT/ROLLBACK correctly 

275 conn.exec_driver_sql("BEGIN") 

276 

277 When using the asyncio variant ``aiosqlite``, refer to ``engine.sync_engine`` 

278 as in the example below:: 

279 

280 from sqlalchemy import create_engine, event 

281 from sqlalchemy.ext.asyncio import create_async_engine 

282 

283 engine = create_async_engine("sqlite+aiosqlite:///myfile.db") 

284 

285 

286 @event.listens_for(engine.sync_engine, "connect") 

287 def do_connect(dbapi_connection, connection_record): 

288 # disable aiosqlite's emitting of the BEGIN statement entirely. 

289 dbapi_connection.isolation_level = None 

290 

291 

292 @event.listens_for(engine.sync_engine, "begin") 

293 def do_begin(conn): 

294 # emit our own BEGIN. aiosqlite still emits COMMIT/ROLLBACK correctly 

295 conn.exec_driver_sql("BEGIN") 

296 

297.. _sqlite_isolation_level: 

298 

299Using SQLAlchemy's Driver Level AUTOCOMMIT Feature with SQLite 

300^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

301 

302SQLAlchemy has a comprehensive database isolation feature with optional 

303autocommit support that is introduced in the section :ref:`dbapi_autocommit`. 

304 

305For the ``sqlite3`` and ``aiosqlite`` drivers, SQLAlchemy only includes 

306built-in support for "AUTOCOMMIT". Note that this mode is currently incompatible 

307with the non-legacy isolation mode hooks documented in the previous 

308section at :ref:`sqlite_enabling_transactions`. 

309 

310To use the ``sqlite3`` driver with SQLAlchemy driver-level autocommit, 

311create an engine setting the :paramref:`_sa.create_engine.isolation_level` 

312parameter to "AUTOCOMMIT":: 

313 

314 eng = create_engine("sqlite:///myfile.db", isolation_level="AUTOCOMMIT") 

315 

316When using the above mode, any event hooks that set the sqlite3 ``Connection.autocommit`` 

317parameter away from its default of ``sqlite3.LEGACY_TRANSACTION_CONTROL`` 

318as well as hooks that emit ``BEGIN`` should be disabled. 

319 

320Additional Reading for SQLite / sqlite3 transaction control 

321^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

322 

323Links with important information on SQLite, the sqlite3 driver, 

324as well as long historical conversations on how things got to their current state: 

325 

326* `Isolation in SQLite <https://www.sqlite.org/isolation.html>`_ - on the SQLite website 

327* `Transaction control <https://docs.python.org/3/library/sqlite3.html#transaction-control>`_ - describes the sqlite3 autocommit attribute as well 

328 as the legacy isolation_level attribute. 

329* `sqlite3 SELECT does not BEGIN a transaction, but should according to spec <https://github.com/python/cpython/issues/54133>`_ - imported Python standard library issue on github 

330* `sqlite3 module breaks transactions and potentially corrupts data <https://github.com/python/cpython/issues/54949>`_ - imported Python standard library issue on github 

331 

332 

333INSERT/UPDATE/DELETE...RETURNING 

334--------------------------------- 

335 

336The SQLite dialect supports SQLite 3.35's ``INSERT|UPDATE|DELETE..RETURNING`` 

337syntax. ``INSERT..RETURNING`` may be used 

338automatically in some cases in order to fetch newly generated identifiers in 

339place of the traditional approach of using ``cursor.lastrowid``, however 

340``cursor.lastrowid`` is currently still preferred for simple single-statement 

341cases for its better performance. 

342 

343To specify an explicit ``RETURNING`` clause, use the 

344:meth:`._UpdateBase.returning` method on a per-statement basis:: 

345 

346 # INSERT..RETURNING 

347 result = connection.execute( 

348 table.insert().values(name="foo").returning(table.c.col1, table.c.col2) 

349 ) 

350 print(result.all()) 

351 

352 # UPDATE..RETURNING 

353 result = connection.execute( 

354 table.update() 

355 .where(table.c.name == "foo") 

356 .values(name="bar") 

357 .returning(table.c.col1, table.c.col2) 

358 ) 

359 print(result.all()) 

360 

361 # DELETE..RETURNING 

362 result = connection.execute( 

363 table.delete() 

364 .where(table.c.name == "foo") 

365 .returning(table.c.col1, table.c.col2) 

366 ) 

367 print(result.all()) 

368 

369.. versionadded:: 2.0 Added support for SQLite RETURNING 

370 

371 

372.. _sqlite_foreign_keys: 

373 

374Foreign Key Support 

375------------------- 

376 

377SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables, 

378however by default these constraints have no effect on the operation of the 

379table. 

380 

381Constraint checking on SQLite has three prerequisites: 

382 

383* At least version 3.6.19 of SQLite must be in use 

384* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY 

385 or SQLITE_OMIT_TRIGGER symbols enabled. 

386* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all 

387 connections before use -- including the initial call to 

388 :meth:`sqlalchemy.schema.MetaData.create_all`. 

389 

390SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for 

391new connections through the usage of events:: 

392 

393 from sqlalchemy.engine import Engine 

394 from sqlalchemy import event 

395 

396 

397 @event.listens_for(Engine, "connect") 

398 def set_sqlite_pragma(dbapi_connection, connection_record): 

399 # the sqlite3 driver will not set PRAGMA foreign_keys 

400 # if autocommit=False; set to True temporarily 

401 ac = dbapi_connection.autocommit 

402 dbapi_connection.autocommit = True 

403 

404 cursor = dbapi_connection.cursor() 

405 cursor.execute("PRAGMA foreign_keys=ON") 

406 cursor.close() 

407 

408 # restore previous autocommit setting 

409 dbapi_connection.autocommit = ac 

410 

411.. warning:: 

412 

413 When SQLite foreign keys are enabled, it is **not possible** 

414 to emit CREATE or DROP statements for tables that contain 

415 mutually-dependent foreign key constraints; 

416 to emit the DDL for these tables requires that ALTER TABLE be used to 

417 create or drop these constraints separately, for which SQLite has 

418 no support. 

419 

420.. seealso:: 

421 

422 `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_ 

423 - on the SQLite web site. 

424 

425 :ref:`event_toplevel` - SQLAlchemy event API. 

426 

427 :ref:`use_alter` - more information on SQLAlchemy's facilities for handling 

428 mutually-dependent foreign key constraints. 

429 

430.. _sqlite_on_conflict_ddl: 

431 

432ON CONFLICT support for constraints 

433----------------------------------- 

434 

435.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for 

436 SQLite, which occurs within a CREATE TABLE statement. For "ON CONFLICT" as 

437 applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`. 

438 

439SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied 

440to primary key, unique, check, and not null constraints. In DDL, it is 

441rendered either within the "CONSTRAINT" clause or within the column definition 

442itself depending on the location of the target constraint. To render this 

443clause within DDL, the extension parameter ``sqlite_on_conflict`` can be 

444specified with a string conflict resolution algorithm within the 

445:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`, 

446:class:`.CheckConstraint` objects. Within the :class:`_schema.Column` object, 

447there 

448are individual parameters ``sqlite_on_conflict_not_null``, 

449``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each 

450correspond to the three types of relevant constraint types that can be 

451indicated from a :class:`_schema.Column` object. 

452 

453.. seealso:: 

454 

455 `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite 

456 documentation 

457 

458The ``sqlite_on_conflict`` parameters accept a string argument which is just 

459the resolution name to be chosen, which on SQLite can be one of ROLLBACK, 

460ABORT, FAIL, IGNORE, and REPLACE. For example, to add a UNIQUE constraint 

461that specifies the IGNORE algorithm:: 

462 

463 some_table = Table( 

464 "some_table", 

465 metadata, 

466 Column("id", Integer, primary_key=True), 

467 Column("data", Integer), 

468 UniqueConstraint("id", "data", sqlite_on_conflict="IGNORE"), 

469 ) 

470 

471The above renders CREATE TABLE DDL as: 

472 

473.. sourcecode:: sql 

474 

475 CREATE TABLE some_table ( 

476 id INTEGER NOT NULL, 

477 data INTEGER, 

478 PRIMARY KEY (id), 

479 UNIQUE (id, data) ON CONFLICT IGNORE 

480 ) 

481 

482 

483When using the :paramref:`_schema.Column.unique` 

484flag to add a UNIQUE constraint 

485to a single column, the ``sqlite_on_conflict_unique`` parameter can 

486be added to the :class:`_schema.Column` as well, which will be added to the 

487UNIQUE constraint in the DDL:: 

488 

489 some_table = Table( 

490 "some_table", 

491 metadata, 

492 Column("id", Integer, primary_key=True), 

493 Column( 

494 "data", Integer, unique=True, sqlite_on_conflict_unique="IGNORE" 

495 ), 

496 ) 

497 

498rendering: 

499 

500.. sourcecode:: sql 

501 

502 CREATE TABLE some_table ( 

503 id INTEGER NOT NULL, 

504 data INTEGER, 

505 PRIMARY KEY (id), 

506 UNIQUE (data) ON CONFLICT IGNORE 

507 ) 

508 

509To apply the FAIL algorithm for a NOT NULL constraint, 

510``sqlite_on_conflict_not_null`` is used:: 

511 

512 some_table = Table( 

513 "some_table", 

514 metadata, 

515 Column("id", Integer, primary_key=True), 

516 Column( 

517 "data", Integer, nullable=False, sqlite_on_conflict_not_null="FAIL" 

518 ), 

519 ) 

520 

521this renders the column inline ON CONFLICT phrase: 

522 

523.. sourcecode:: sql 

524 

525 CREATE TABLE some_table ( 

526 id INTEGER NOT NULL, 

527 data INTEGER NOT NULL ON CONFLICT FAIL, 

528 PRIMARY KEY (id) 

529 ) 

530 

531 

532Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``:: 

533 

534 some_table = Table( 

535 "some_table", 

536 metadata, 

537 Column( 

538 "id", 

539 Integer, 

540 primary_key=True, 

541 sqlite_on_conflict_primary_key="FAIL", 

542 ), 

543 ) 

544 

545SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict 

546resolution algorithm is applied to the constraint itself: 

547 

548.. sourcecode:: sql 

549 

550 CREATE TABLE some_table ( 

551 id INTEGER NOT NULL, 

552 PRIMARY KEY (id) ON CONFLICT FAIL 

553 ) 

554 

555.. _sqlite_on_conflict_insert: 

556 

557INSERT...ON CONFLICT (Upsert) 

558----------------------------- 

559 

560.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for 

561 SQLite, which occurs within an INSERT statement. For "ON CONFLICT" as 

562 applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`. 

563 

564From version 3.24.0 onwards, SQLite supports "upserts" (update or insert) 

565of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` 

566statement. A candidate row will only be inserted if that row does not violate 

567any unique or primary key constraints. In the case of a unique constraint violation, a 

568secondary action can occur which can be either "DO UPDATE", indicating that 

569the data in the target row should be updated, or "DO NOTHING", which indicates 

570to silently skip this row. 

571 

572Conflicts are determined using columns that are part of existing unique 

573constraints and indexes. These constraints are identified by stating the 

574columns and conditions that comprise the indexes. 

575 

576SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific 

577:func:`_sqlite.insert()` function, which provides 

578the generative methods :meth:`_sqlite.Insert.on_conflict_do_update` 

579and :meth:`_sqlite.Insert.on_conflict_do_nothing`: 

580 

581.. sourcecode:: pycon+sql 

582 

583 >>> from sqlalchemy.dialects.sqlite import insert 

584 

585 >>> insert_stmt = insert(my_table).values( 

586 ... id="some_existing_id", data="inserted value" 

587 ... ) 

588 

589 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

590 ... index_elements=["id"], set_=dict(data="updated value") 

591 ... ) 

592 

593 >>> print(do_update_stmt) 

594 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

595 ON CONFLICT (id) DO UPDATE SET data = ?{stop} 

596 

597 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"]) 

598 

599 >>> print(do_nothing_stmt) 

600 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

601 ON CONFLICT (id) DO NOTHING 

602 

603.. versionadded:: 1.4 

604 

605.. seealso:: 

606 

607 `Upsert 

608 <https://sqlite.org/lang_UPSERT.html>`_ 

609 - in the SQLite documentation. 

610 

611 

612Specifying the Target 

613^^^^^^^^^^^^^^^^^^^^^ 

614 

615Both methods supply the "target" of the conflict using column inference: 

616 

617* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument 

618 specifies a sequence containing string column names, :class:`_schema.Column` 

619 objects, and/or SQL expression elements, which would identify a unique index 

620 or unique constraint. 

621 

622* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` 

623 to infer an index, a partial index can be inferred by also specifying the 

624 :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter: 

625 

626 .. sourcecode:: pycon+sql 

627 

628 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data") 

629 

630 >>> do_update_stmt = stmt.on_conflict_do_update( 

631 ... index_elements=[my_table.c.user_email], 

632 ... index_where=my_table.c.user_email.like("%@gmail.com"), 

633 ... set_=dict(data=stmt.excluded.data), 

634 ... ) 

635 

636 >>> print(do_update_stmt) 

637 {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?) 

638 ON CONFLICT (user_email) 

639 WHERE user_email LIKE '%@gmail.com' 

640 DO UPDATE SET data = excluded.data 

641 

642The SET Clause 

643^^^^^^^^^^^^^^^ 

644 

645``ON CONFLICT...DO UPDATE`` is used to perform an update of the already 

646existing row, using any combination of new values as well as values 

647from the proposed insertion. These values are specified using the 

648:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter. This 

649parameter accepts a dictionary which consists of direct values 

650for UPDATE: 

651 

652.. sourcecode:: pycon+sql 

653 

654 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

655 

656 >>> do_update_stmt = stmt.on_conflict_do_update( 

657 ... index_elements=["id"], set_=dict(data="updated value") 

658 ... ) 

659 

660 >>> print(do_update_stmt) 

661 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

662 ON CONFLICT (id) DO UPDATE SET data = ? 

663 

664.. warning:: 

665 

666 The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take 

667 into account Python-side default UPDATE values or generation functions, 

668 e.g. those specified using :paramref:`_schema.Column.onupdate`. These 

669 values will not be exercised for an ON CONFLICT style of UPDATE, unless 

670 they are manually specified in the 

671 :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary. 

672 

673Updating using the Excluded INSERT Values 

674^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

675 

676In order to refer to the proposed insertion row, the special alias 

677:attr:`~.sqlite.Insert.excluded` is available as an attribute on 

678the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix 

679on a column, that informs the DO UPDATE to update the row with the value that 

680would have been inserted had the constraint not failed: 

681 

682.. sourcecode:: pycon+sql 

683 

684 >>> stmt = insert(my_table).values( 

685 ... id="some_id", data="inserted value", author="jlh" 

686 ... ) 

687 

688 >>> do_update_stmt = stmt.on_conflict_do_update( 

689 ... index_elements=["id"], 

690 ... set_=dict(data="updated value", author=stmt.excluded.author), 

691 ... ) 

692 

693 >>> print(do_update_stmt) 

694 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) 

695 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author 

696 

697Additional WHERE Criteria 

698^^^^^^^^^^^^^^^^^^^^^^^^^ 

699 

700The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts 

701a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where` 

702parameter, which will limit those rows which receive an UPDATE: 

703 

704.. sourcecode:: pycon+sql 

705 

706 >>> stmt = insert(my_table).values( 

707 ... id="some_id", data="inserted value", author="jlh" 

708 ... ) 

709 

710 >>> on_update_stmt = stmt.on_conflict_do_update( 

711 ... index_elements=["id"], 

712 ... set_=dict(data="updated value", author=stmt.excluded.author), 

713 ... where=(my_table.c.status == 2), 

714 ... ) 

715 >>> print(on_update_stmt) 

716 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) 

717 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author 

718 WHERE my_table.status = ? 

719 

720 

721Skipping Rows with DO NOTHING 

722^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

723 

724``ON CONFLICT`` may be used to skip inserting a row entirely 

725if any conflict with a unique constraint occurs; below this is illustrated 

726using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method: 

727 

728.. sourcecode:: pycon+sql 

729 

730 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

731 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) 

732 >>> print(stmt) 

733 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING 

734 

735 

736If ``DO NOTHING`` is used without specifying any columns or constraint, 

737it has the effect of skipping the INSERT for any unique violation which 

738occurs: 

739 

740.. sourcecode:: pycon+sql 

741 

742 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

743 >>> stmt = stmt.on_conflict_do_nothing() 

744 >>> print(stmt) 

745 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING 

746 

747.. _sqlite_type_reflection: 

748 

749Type Reflection 

750--------------- 

751 

752SQLite types are unlike those of most other database backends, in that 

753the string name of the type usually does not correspond to a "type" in a 

754one-to-one fashion. Instead, SQLite links per-column typing behavior 

755to one of five so-called "type affinities" based on a string matching 

756pattern for the type. 

757 

758SQLAlchemy's reflection process, when inspecting types, uses a simple 

759lookup table to link the keywords returned to provided SQLAlchemy types. 

760This lookup table is present within the SQLite dialect as it is for all 

761other dialects. However, the SQLite dialect has a different "fallback" 

762routine for when a particular type name is not located in the lookup map; 

763it instead implements the SQLite "type affinity" scheme located at 

764https://www.sqlite.org/datatype3.html section 2.1. 

765 

766The provided typemap will make direct associations from an exact string 

767name match for the following types: 

768 

769:class:`_types.BIGINT`, :class:`_types.BLOB`, 

770:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`, 

771:class:`_types.CHAR`, :class:`_types.DATE`, 

772:class:`_types.DATETIME`, :class:`_types.FLOAT`, 

773:class:`_types.DECIMAL`, :class:`_types.FLOAT`, 

774:class:`_types.INTEGER`, :class:`_types.INTEGER`, 

775:class:`_types.NUMERIC`, :class:`_types.REAL`, 

776:class:`_types.SMALLINT`, :class:`_types.TEXT`, 

777:class:`_types.TIME`, :class:`_types.TIMESTAMP`, 

778:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`, 

779:class:`_types.NCHAR` 

780 

781When a type name does not match one of the above types, the "type affinity" 

782lookup is used instead: 

783 

784* :class:`_types.INTEGER` is returned if the type name includes the 

785 string ``INT`` 

786* :class:`_types.TEXT` is returned if the type name includes the 

787 string ``CHAR``, ``CLOB`` or ``TEXT`` 

788* :class:`_types.NullType` is returned if the type name includes the 

789 string ``BLOB`` 

790* :class:`_types.REAL` is returned if the type name includes the string 

791 ``REAL``, ``FLOA`` or ``DOUB``. 

792* Otherwise, the :class:`_types.NUMERIC` type is used. 

793 

794.. _sqlite_partial_index: 

795 

796Partial Indexes 

797--------------- 

798 

799A partial index, e.g. one which uses a WHERE clause, can be specified 

800with the DDL system using the argument ``sqlite_where``:: 

801 

802 tbl = Table("testtbl", m, Column("data", Integer)) 

803 idx = Index( 

804 "test_idx1", 

805 tbl.c.data, 

806 sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10), 

807 ) 

808 

809The index will be rendered at create time as: 

810 

811.. sourcecode:: sql 

812 

813 CREATE INDEX test_idx1 ON testtbl (data) 

814 WHERE data > 5 AND data < 10 

815 

816.. _sqlite_dotted_column_names: 

817 

818Dotted Column Names 

819------------------- 

820 

821Using table or column names that explicitly have periods in them is 

822**not recommended**. While this is generally a bad idea for relational 

823databases in general, as the dot is a syntactically significant character, 

824the SQLite driver up until version **3.10.0** of SQLite has a bug which 

825requires that SQLAlchemy filter out these dots in result sets. 

826 

827The bug, entirely outside of SQLAlchemy, can be illustrated thusly:: 

828 

829 import sqlite3 

830 

831 assert sqlite3.sqlite_version_info < ( 

832 3, 

833 10, 

834 0, 

835 ), "bug is fixed in this version" 

836 

837 conn = sqlite3.connect(":memory:") 

838 cursor = conn.cursor() 

839 

840 cursor.execute("create table x (a integer, b integer)") 

841 cursor.execute("insert into x (a, b) values (1, 1)") 

842 cursor.execute("insert into x (a, b) values (2, 2)") 

843 

844 cursor.execute("select x.a, x.b from x") 

845 assert [c[0] for c in cursor.description] == ["a", "b"] 

846 

847 cursor.execute( 

848 """ 

849 select x.a, x.b from x where a=1 

850 union 

851 select x.a, x.b from x where a=2 

852 """ 

853 ) 

854 assert [c[0] for c in cursor.description] == ["a", "b"], [ 

855 c[0] for c in cursor.description 

856 ] 

857 

858The second assertion fails: 

859 

860.. sourcecode:: text 

861 

862 Traceback (most recent call last): 

863 File "test.py", line 19, in <module> 

864 [c[0] for c in cursor.description] 

865 AssertionError: ['x.a', 'x.b'] 

866 

867Where above, the driver incorrectly reports the names of the columns 

868including the name of the table, which is entirely inconsistent vs. 

869when the UNION is not present. 

870 

871SQLAlchemy relies upon column names being predictable in how they match 

872to the original statement, so the SQLAlchemy dialect has no choice but 

873to filter these out:: 

874 

875 

876 from sqlalchemy import create_engine 

877 

878 eng = create_engine("sqlite://") 

879 conn = eng.connect() 

880 

881 conn.exec_driver_sql("create table x (a integer, b integer)") 

882 conn.exec_driver_sql("insert into x (a, b) values (1, 1)") 

883 conn.exec_driver_sql("insert into x (a, b) values (2, 2)") 

884 

885 result = conn.exec_driver_sql("select x.a, x.b from x") 

886 assert result.keys() == ["a", "b"] 

887 

888 result = conn.exec_driver_sql( 

889 """ 

890 select x.a, x.b from x where a=1 

891 union 

892 select x.a, x.b from x where a=2 

893 """ 

894 ) 

895 assert result.keys() == ["a", "b"] 

896 

897Note that above, even though SQLAlchemy filters out the dots, *both 

898names are still addressable*:: 

899 

900 >>> row = result.first() 

901 >>> row["a"] 

902 1 

903 >>> row["x.a"] 

904 1 

905 >>> row["b"] 

906 1 

907 >>> row["x.b"] 

908 1 

909 

910Therefore, the workaround applied by SQLAlchemy only impacts 

911:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In 

912the very specific case where an application is forced to use column names that 

913contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and 

914:meth:`.Row.keys()` is required to return these dotted names unmodified, 

915the ``sqlite_raw_colnames`` execution option may be provided, either on a 

916per-:class:`_engine.Connection` basis:: 

917 

918 result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql( 

919 """ 

920 select x.a, x.b from x where a=1 

921 union 

922 select x.a, x.b from x where a=2 

923 """ 

924 ) 

925 assert result.keys() == ["x.a", "x.b"] 

926 

927or on a per-:class:`_engine.Engine` basis:: 

928 

929 engine = create_engine( 

930 "sqlite://", execution_options={"sqlite_raw_colnames": True} 

931 ) 

932 

933When using the per-:class:`_engine.Engine` execution option, note that 

934**Core and ORM queries that use UNION may not function properly**. 

935 

936SQLite-specific table options 

937----------------------------- 

938 

939One option for CREATE TABLE is supported directly by the SQLite 

940dialect in conjunction with the :class:`_schema.Table` construct: 

941 

942* ``WITHOUT ROWID``:: 

943 

944 Table("some_table", metadata, ..., sqlite_with_rowid=False) 

945 

946* 

947 ``STRICT``:: 

948 

949 Table("some_table", metadata, ..., sqlite_strict=True) 

950 

951 .. versionadded:: 2.0.37 

952 

953.. seealso:: 

954 

955 `SQLite CREATE TABLE options 

956 <https://www.sqlite.org/lang_createtable.html>`_ 

957 

958.. _sqlite_include_internal: 

959 

960Reflecting internal schema tables 

961---------------------------------- 

962 

963Reflection methods that return lists of tables will omit so-called 

964"SQLite internal schema object" names, which are considered by SQLite 

965as any object name that is prefixed with ``sqlite_``. An example of 

966such an object is the ``sqlite_sequence`` table that's generated when 

967the ``AUTOINCREMENT`` column parameter is used. In order to return 

968these objects, the parameter ``sqlite_include_internal=True`` may be 

969passed to methods such as :meth:`_schema.MetaData.reflect` or 

970:meth:`.Inspector.get_table_names`. 

971 

972.. versionadded:: 2.0 Added the ``sqlite_include_internal=True`` parameter. 

973 Previously, these tables were not ignored by SQLAlchemy reflection 

974 methods. 

975 

976.. note:: 

977 

978 The ``sqlite_include_internal`` parameter does not refer to the 

979 "system" tables that are present in schemas such as ``sqlite_master``. 

980 

981.. seealso:: 

982 

983 `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite 

984 documentation. 

985 

986''' # noqa 

987from __future__ import annotations 

988 

989import datetime 

990import numbers 

991import re 

992from typing import Optional 

993 

994from .json import JSON 

995from .json import JSONIndexType 

996from .json import JSONPathType 

997from ... import exc 

998from ... import schema as sa_schema 

999from ... import sql 

1000from ... import text 

1001from ... import types as sqltypes 

1002from ... import util 

1003from ...engine import default 

1004from ...engine import processors 

1005from ...engine import reflection 

1006from ...engine.reflection import ReflectionDefaults 

1007from ...sql import coercions 

1008from ...sql import compiler 

1009from ...sql import elements 

1010from ...sql import roles 

1011from ...sql import schema 

1012from ...types import BLOB # noqa 

1013from ...types import BOOLEAN # noqa 

1014from ...types import CHAR # noqa 

1015from ...types import DECIMAL # noqa 

1016from ...types import FLOAT # noqa 

1017from ...types import INTEGER # noqa 

1018from ...types import NUMERIC # noqa 

1019from ...types import REAL # noqa 

1020from ...types import SMALLINT # noqa 

1021from ...types import TEXT # noqa 

1022from ...types import TIMESTAMP # noqa 

1023from ...types import VARCHAR # noqa 

1024 

1025 

1026class _SQliteJson(JSON): 

1027 def result_processor(self, dialect, coltype): 

1028 default_processor = super().result_processor(dialect, coltype) 

1029 

1030 def process(value): 

1031 try: 

1032 return default_processor(value) 

1033 except TypeError: 

1034 if isinstance(value, numbers.Number): 

1035 return value 

1036 else: 

1037 raise 

1038 

1039 return process 

1040 

1041 

1042class _DateTimeMixin: 

1043 _reg = None 

1044 _storage_format = None 

1045 

1046 def __init__(self, storage_format=None, regexp=None, **kw): 

1047 super().__init__(**kw) 

1048 if regexp is not None: 

1049 self._reg = re.compile(regexp) 

1050 if storage_format is not None: 

1051 self._storage_format = storage_format 

1052 

1053 @property 

1054 def format_is_text_affinity(self): 

1055 """return True if the storage format will automatically imply 

1056 a TEXT affinity. 

1057 

1058 If the storage format contains no non-numeric characters, 

1059 it will imply a NUMERIC storage format on SQLite; in this case, 

1060 the type will generate its DDL as DATE_CHAR, DATETIME_CHAR, 

1061 TIME_CHAR. 

1062 

1063 """ 

1064 spec = self._storage_format % { 

1065 "year": 0, 

1066 "month": 0, 

1067 "day": 0, 

1068 "hour": 0, 

1069 "minute": 0, 

1070 "second": 0, 

1071 "microsecond": 0, 

1072 } 

1073 return bool(re.search(r"[^0-9]", spec)) 

1074 

1075 def adapt(self, cls, **kw): 

1076 if issubclass(cls, _DateTimeMixin): 

1077 if self._storage_format: 

1078 kw["storage_format"] = self._storage_format 

1079 if self._reg: 

1080 kw["regexp"] = self._reg 

1081 return super().adapt(cls, **kw) 

1082 

1083 def literal_processor(self, dialect): 

1084 bp = self.bind_processor(dialect) 

1085 

1086 def process(value): 

1087 return "'%s'" % bp(value) 

1088 

1089 return process 

1090 

1091 

1092class DATETIME(_DateTimeMixin, sqltypes.DateTime): 

1093 r"""Represent a Python datetime object in SQLite using a string. 

1094 

1095 The default string storage format is:: 

1096 

1097 "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1098 

1099 e.g.: 

1100 

1101 .. sourcecode:: text 

1102 

1103 2021-03-15 12:05:57.105542 

1104 

1105 The incoming storage format is by default parsed using the 

1106 Python ``datetime.fromisoformat()`` function. 

1107 

1108 .. versionchanged:: 2.0 ``datetime.fromisoformat()`` is used for default 

1109 datetime string parsing. 

1110 

1111 The storage format can be customized to some degree using the 

1112 ``storage_format`` and ``regexp`` parameters, such as:: 

1113 

1114 import re 

1115 from sqlalchemy.dialects.sqlite import DATETIME 

1116 

1117 dt = DATETIME( 

1118 storage_format=( 

1119 "%(year)04d/%(month)02d/%(day)02d %(hour)02d:%(minute)02d:%(second)02d" 

1120 ), 

1121 regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)", 

1122 ) 

1123 

1124 :param truncate_microseconds: when ``True`` microseconds will be truncated 

1125 from the datetime. Can't be specified together with ``storage_format`` 

1126 or ``regexp``. 

1127 

1128 :param storage_format: format string which will be applied to the dict 

1129 with keys year, month, day, hour, minute, second, and microsecond. 

1130 

1131 :param regexp: regular expression which will be applied to incoming result 

1132 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming 

1133 strings. If the regexp contains named groups, the resulting match dict is 

1134 applied to the Python datetime() constructor as keyword arguments. 

1135 Otherwise, if positional groups are used, the datetime() constructor 

1136 is called with positional arguments via 

1137 ``*map(int, match_obj.groups(0))``. 

1138 

1139 """ # noqa 

1140 

1141 _storage_format = ( 

1142 "%(year)04d-%(month)02d-%(day)02d " 

1143 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1144 ) 

1145 

1146 def __init__(self, *args, **kwargs): 

1147 truncate_microseconds = kwargs.pop("truncate_microseconds", False) 

1148 super().__init__(*args, **kwargs) 

1149 if truncate_microseconds: 

1150 assert "storage_format" not in kwargs, ( 

1151 "You can specify only " 

1152 "one of truncate_microseconds or storage_format." 

1153 ) 

1154 assert "regexp" not in kwargs, ( 

1155 "You can specify only one of " 

1156 "truncate_microseconds or regexp." 

1157 ) 

1158 self._storage_format = ( 

1159 "%(year)04d-%(month)02d-%(day)02d " 

1160 "%(hour)02d:%(minute)02d:%(second)02d" 

1161 ) 

1162 

1163 def bind_processor(self, dialect): 

1164 datetime_datetime = datetime.datetime 

1165 datetime_date = datetime.date 

1166 format_ = self._storage_format 

1167 

1168 def process(value): 

1169 if value is None: 

1170 return None 

1171 elif isinstance(value, datetime_datetime): 

1172 return format_ % { 

1173 "year": value.year, 

1174 "month": value.month, 

1175 "day": value.day, 

1176 "hour": value.hour, 

1177 "minute": value.minute, 

1178 "second": value.second, 

1179 "microsecond": value.microsecond, 

1180 } 

1181 elif isinstance(value, datetime_date): 

1182 return format_ % { 

1183 "year": value.year, 

1184 "month": value.month, 

1185 "day": value.day, 

1186 "hour": 0, 

1187 "minute": 0, 

1188 "second": 0, 

1189 "microsecond": 0, 

1190 } 

1191 else: 

1192 raise TypeError( 

1193 "SQLite DateTime type only accepts Python " 

1194 "datetime and date objects as input." 

1195 ) 

1196 

1197 return process 

1198 

1199 def result_processor(self, dialect, coltype): 

1200 if self._reg: 

1201 return processors.str_to_datetime_processor_factory( 

1202 self._reg, datetime.datetime 

1203 ) 

1204 else: 

1205 return processors.str_to_datetime 

1206 

1207 

1208class DATE(_DateTimeMixin, sqltypes.Date): 

1209 r"""Represent a Python date object in SQLite using a string. 

1210 

1211 The default string storage format is:: 

1212 

1213 "%(year)04d-%(month)02d-%(day)02d" 

1214 

1215 e.g.: 

1216 

1217 .. sourcecode:: text 

1218 

1219 2011-03-15 

1220 

1221 The incoming storage format is by default parsed using the 

1222 Python ``date.fromisoformat()`` function. 

1223 

1224 .. versionchanged:: 2.0 ``date.fromisoformat()`` is used for default 

1225 date string parsing. 

1226 

1227 

1228 The storage format can be customized to some degree using the 

1229 ``storage_format`` and ``regexp`` parameters, such as:: 

1230 

1231 import re 

1232 from sqlalchemy.dialects.sqlite import DATE 

1233 

1234 d = DATE( 

1235 storage_format="%(month)02d/%(day)02d/%(year)04d", 

1236 regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)"), 

1237 ) 

1238 

1239 :param storage_format: format string which will be applied to the 

1240 dict with keys year, month, and day. 

1241 

1242 :param regexp: regular expression which will be applied to 

1243 incoming result rows, replacing the use of ``date.fromisoformat()`` to 

1244 parse incoming strings. If the regexp contains named groups, the resulting 

1245 match dict is applied to the Python date() constructor as keyword 

1246 arguments. Otherwise, if positional groups are used, the date() 

1247 constructor is called with positional arguments via 

1248 ``*map(int, match_obj.groups(0))``. 

1249 

1250 """ 

1251 

1252 _storage_format = "%(year)04d-%(month)02d-%(day)02d" 

1253 

1254 def bind_processor(self, dialect): 

1255 datetime_date = datetime.date 

1256 format_ = self._storage_format 

1257 

1258 def process(value): 

1259 if value is None: 

1260 return None 

1261 elif isinstance(value, datetime_date): 

1262 return format_ % { 

1263 "year": value.year, 

1264 "month": value.month, 

1265 "day": value.day, 

1266 } 

1267 else: 

1268 raise TypeError( 

1269 "SQLite Date type only accepts Python " 

1270 "date objects as input." 

1271 ) 

1272 

1273 return process 

1274 

1275 def result_processor(self, dialect, coltype): 

1276 if self._reg: 

1277 return processors.str_to_datetime_processor_factory( 

1278 self._reg, datetime.date 

1279 ) 

1280 else: 

1281 return processors.str_to_date 

1282 

1283 

1284class TIME(_DateTimeMixin, sqltypes.Time): 

1285 r"""Represent a Python time object in SQLite using a string. 

1286 

1287 The default string storage format is:: 

1288 

1289 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1290 

1291 e.g.: 

1292 

1293 .. sourcecode:: text 

1294 

1295 12:05:57.10558 

1296 

1297 The incoming storage format is by default parsed using the 

1298 Python ``time.fromisoformat()`` function. 

1299 

1300 .. versionchanged:: 2.0 ``time.fromisoformat()`` is used for default 

1301 time string parsing. 

1302 

1303 The storage format can be customized to some degree using the 

1304 ``storage_format`` and ``regexp`` parameters, such as:: 

1305 

1306 import re 

1307 from sqlalchemy.dialects.sqlite import TIME 

1308 

1309 t = TIME( 

1310 storage_format="%(hour)02d-%(minute)02d-%(second)02d-%(microsecond)06d", 

1311 regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?"), 

1312 ) 

1313 

1314 :param truncate_microseconds: when ``True`` microseconds will be truncated 

1315 from the time. Can't be specified together with ``storage_format`` 

1316 or ``regexp``. 

1317 

1318 :param storage_format: format string which will be applied to the dict 

1319 with keys hour, minute, second, and microsecond. 

1320 

1321 :param regexp: regular expression which will be applied to incoming result 

1322 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming 

1323 strings. If the regexp contains named groups, the resulting match dict is 

1324 applied to the Python time() constructor as keyword arguments. Otherwise, 

1325 if positional groups are used, the time() constructor is called with 

1326 positional arguments via ``*map(int, match_obj.groups(0))``. 

1327 

1328 """ 

1329 

1330 _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1331 

1332 def __init__(self, *args, **kwargs): 

1333 truncate_microseconds = kwargs.pop("truncate_microseconds", False) 

1334 super().__init__(*args, **kwargs) 

1335 if truncate_microseconds: 

1336 assert "storage_format" not in kwargs, ( 

1337 "You can specify only " 

1338 "one of truncate_microseconds or storage_format." 

1339 ) 

1340 assert "regexp" not in kwargs, ( 

1341 "You can specify only one of " 

1342 "truncate_microseconds or regexp." 

1343 ) 

1344 self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d" 

1345 

1346 def bind_processor(self, dialect): 

1347 datetime_time = datetime.time 

1348 format_ = self._storage_format 

1349 

1350 def process(value): 

1351 if value is None: 

1352 return None 

1353 elif isinstance(value, datetime_time): 

1354 return format_ % { 

1355 "hour": value.hour, 

1356 "minute": value.minute, 

1357 "second": value.second, 

1358 "microsecond": value.microsecond, 

1359 } 

1360 else: 

1361 raise TypeError( 

1362 "SQLite Time type only accepts Python " 

1363 "time objects as input." 

1364 ) 

1365 

1366 return process 

1367 

1368 def result_processor(self, dialect, coltype): 

1369 if self._reg: 

1370 return processors.str_to_datetime_processor_factory( 

1371 self._reg, datetime.time 

1372 ) 

1373 else: 

1374 return processors.str_to_time 

1375 

1376 

1377colspecs = { 

1378 sqltypes.Date: DATE, 

1379 sqltypes.DateTime: DATETIME, 

1380 sqltypes.JSON: _SQliteJson, 

1381 sqltypes.JSON.JSONIndexType: JSONIndexType, 

1382 sqltypes.JSON.JSONPathType: JSONPathType, 

1383 sqltypes.Time: TIME, 

1384} 

1385 

1386ischema_names = { 

1387 "BIGINT": sqltypes.BIGINT, 

1388 "BLOB": sqltypes.BLOB, 

1389 "BOOL": sqltypes.BOOLEAN, 

1390 "BOOLEAN": sqltypes.BOOLEAN, 

1391 "CHAR": sqltypes.CHAR, 

1392 "DATE": sqltypes.DATE, 

1393 "DATE_CHAR": sqltypes.DATE, 

1394 "DATETIME": sqltypes.DATETIME, 

1395 "DATETIME_CHAR": sqltypes.DATETIME, 

1396 "DOUBLE": sqltypes.DOUBLE, 

1397 "DECIMAL": sqltypes.DECIMAL, 

1398 "FLOAT": sqltypes.FLOAT, 

1399 "INT": sqltypes.INTEGER, 

1400 "INTEGER": sqltypes.INTEGER, 

1401 "JSON": JSON, 

1402 "NUMERIC": sqltypes.NUMERIC, 

1403 "REAL": sqltypes.REAL, 

1404 "SMALLINT": sqltypes.SMALLINT, 

1405 "TEXT": sqltypes.TEXT, 

1406 "TIME": sqltypes.TIME, 

1407 "TIME_CHAR": sqltypes.TIME, 

1408 "TIMESTAMP": sqltypes.TIMESTAMP, 

1409 "VARCHAR": sqltypes.VARCHAR, 

1410 "NVARCHAR": sqltypes.NVARCHAR, 

1411 "NCHAR": sqltypes.NCHAR, 

1412} 

1413 

1414 

1415class SQLiteCompiler(compiler.SQLCompiler): 

1416 extract_map = util.update_copy( 

1417 compiler.SQLCompiler.extract_map, 

1418 { 

1419 "month": "%m", 

1420 "day": "%d", 

1421 "year": "%Y", 

1422 "second": "%S", 

1423 "hour": "%H", 

1424 "doy": "%j", 

1425 "minute": "%M", 

1426 "epoch": "%s", 

1427 "dow": "%w", 

1428 "week": "%W", 

1429 }, 

1430 ) 

1431 

1432 def visit_truediv_binary(self, binary, operator, **kw): 

1433 return ( 

1434 self.process(binary.left, **kw) 

1435 + " / " 

1436 + "(%s + 0.0)" % self.process(binary.right, **kw) 

1437 ) 

1438 

1439 def visit_now_func(self, fn, **kw): 

1440 return "CURRENT_TIMESTAMP" 

1441 

1442 def visit_localtimestamp_func(self, func, **kw): 

1443 return "DATETIME(CURRENT_TIMESTAMP, 'localtime')" 

1444 

1445 def visit_true(self, expr, **kw): 

1446 return "1" 

1447 

1448 def visit_false(self, expr, **kw): 

1449 return "0" 

1450 

1451 def visit_char_length_func(self, fn, **kw): 

1452 return "length%s" % self.function_argspec(fn) 

1453 

1454 def visit_aggregate_strings_func(self, fn, **kw): 

1455 return "group_concat%s" % self.function_argspec(fn) 

1456 

1457 def visit_cast(self, cast, **kwargs): 

1458 if self.dialect.supports_cast: 

1459 return super().visit_cast(cast, **kwargs) 

1460 else: 

1461 return self.process(cast.clause, **kwargs) 

1462 

1463 def visit_extract(self, extract, **kw): 

1464 try: 

1465 return "CAST(STRFTIME('%s', %s) AS INTEGER)" % ( 

1466 self.extract_map[extract.field], 

1467 self.process(extract.expr, **kw), 

1468 ) 

1469 except KeyError as err: 

1470 raise exc.CompileError( 

1471 "%s is not a valid extract argument." % extract.field 

1472 ) from err 

1473 

1474 def returning_clause( 

1475 self, 

1476 stmt, 

1477 returning_cols, 

1478 *, 

1479 populate_result_map, 

1480 **kw, 

1481 ): 

1482 kw["include_table"] = False 

1483 return super().returning_clause( 

1484 stmt, returning_cols, populate_result_map=populate_result_map, **kw 

1485 ) 

1486 

1487 def limit_clause(self, select, **kw): 

1488 text = "" 

1489 if select._limit_clause is not None: 

1490 text += "\n LIMIT " + self.process(select._limit_clause, **kw) 

1491 if select._offset_clause is not None: 

1492 if select._limit_clause is None: 

1493 text += "\n LIMIT " + self.process(sql.literal(-1)) 

1494 text += " OFFSET " + self.process(select._offset_clause, **kw) 

1495 else: 

1496 text += " OFFSET " + self.process(sql.literal(0), **kw) 

1497 return text 

1498 

1499 def for_update_clause(self, select, **kw): 

1500 # sqlite has no "FOR UPDATE" AFAICT 

1501 return "" 

1502 

1503 def update_from_clause( 

1504 self, update_stmt, from_table, extra_froms, from_hints, **kw 

1505 ): 

1506 kw["asfrom"] = True 

1507 return "FROM " + ", ".join( 

1508 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

1509 for t in extra_froms 

1510 ) 

1511 

1512 def visit_is_distinct_from_binary(self, binary, operator, **kw): 

1513 return "%s IS NOT %s" % ( 

1514 self.process(binary.left), 

1515 self.process(binary.right), 

1516 ) 

1517 

1518 def visit_is_not_distinct_from_binary(self, binary, operator, **kw): 

1519 return "%s IS %s" % ( 

1520 self.process(binary.left), 

1521 self.process(binary.right), 

1522 ) 

1523 

1524 def visit_json_getitem_op_binary( 

1525 self, binary, operator, _cast_applied=False, **kw 

1526 ): 

1527 if ( 

1528 not _cast_applied 

1529 and binary.type._type_affinity is not sqltypes.JSON 

1530 ): 

1531 kw["_cast_applied"] = True 

1532 return self.process(sql.cast(binary, binary.type), **kw) 

1533 

1534 if binary.type._type_affinity is sqltypes.JSON: 

1535 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" 

1536 else: 

1537 expr = "JSON_EXTRACT(%s, %s)" 

1538 

1539 return expr % ( 

1540 self.process(binary.left, **kw), 

1541 self.process(binary.right, **kw), 

1542 ) 

1543 

1544 def visit_json_path_getitem_op_binary( 

1545 self, binary, operator, _cast_applied=False, **kw 

1546 ): 

1547 if ( 

1548 not _cast_applied 

1549 and binary.type._type_affinity is not sqltypes.JSON 

1550 ): 

1551 kw["_cast_applied"] = True 

1552 return self.process(sql.cast(binary, binary.type), **kw) 

1553 

1554 if binary.type._type_affinity is sqltypes.JSON: 

1555 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" 

1556 else: 

1557 expr = "JSON_EXTRACT(%s, %s)" 

1558 

1559 return expr % ( 

1560 self.process(binary.left, **kw), 

1561 self.process(binary.right, **kw), 

1562 ) 

1563 

1564 def visit_empty_set_op_expr(self, type_, expand_op, **kw): 

1565 # slightly old SQLite versions don't seem to be able to handle 

1566 # the empty set impl 

1567 return self.visit_empty_set_expr(type_) 

1568 

1569 def visit_empty_set_expr(self, element_types, **kw): 

1570 return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % ( 

1571 ", ".join("1" for type_ in element_types or [INTEGER()]), 

1572 ", ".join("1" for type_ in element_types or [INTEGER()]), 

1573 ) 

1574 

1575 def visit_regexp_match_op_binary(self, binary, operator, **kw): 

1576 return self._generate_generic_binary(binary, " REGEXP ", **kw) 

1577 

1578 def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 

1579 return self._generate_generic_binary(binary, " NOT REGEXP ", **kw) 

1580 

1581 def _on_conflict_target(self, clause, **kw): 

1582 if clause.inferred_target_elements is not None: 

1583 target_text = "(%s)" % ", ".join( 

1584 ( 

1585 self.preparer.quote(c) 

1586 if isinstance(c, str) 

1587 else self.process(c, include_table=False, use_schema=False) 

1588 ) 

1589 for c in clause.inferred_target_elements 

1590 ) 

1591 if clause.inferred_target_whereclause is not None: 

1592 target_text += " WHERE %s" % self.process( 

1593 clause.inferred_target_whereclause, 

1594 include_table=False, 

1595 use_schema=False, 

1596 literal_execute=True, 

1597 ) 

1598 

1599 else: 

1600 target_text = "" 

1601 

1602 return target_text 

1603 

1604 def visit_on_conflict_do_nothing(self, on_conflict, **kw): 

1605 target_text = self._on_conflict_target(on_conflict, **kw) 

1606 

1607 if target_text: 

1608 return "ON CONFLICT %s DO NOTHING" % target_text 

1609 else: 

1610 return "ON CONFLICT DO NOTHING" 

1611 

1612 def visit_on_conflict_do_update(self, on_conflict, **kw): 

1613 clause = on_conflict 

1614 

1615 target_text = self._on_conflict_target(on_conflict, **kw) 

1616 

1617 action_set_ops = [] 

1618 

1619 set_parameters = dict(clause.update_values_to_set) 

1620 # create a list of column assignment clauses as tuples 

1621 

1622 insert_statement = self.stack[-1]["selectable"] 

1623 cols = insert_statement.table.c 

1624 for c in cols: 

1625 col_key = c.key 

1626 

1627 if col_key in set_parameters: 

1628 value = set_parameters.pop(col_key) 

1629 elif c in set_parameters: 

1630 value = set_parameters.pop(c) 

1631 else: 

1632 continue 

1633 

1634 if ( 

1635 isinstance(value, elements.BindParameter) 

1636 and value.type._isnull 

1637 ): 

1638 value = value._with_binary_element_type(c.type) 

1639 value_text = self.process(value.self_group(), use_schema=False) 

1640 

1641 key_text = self.preparer.quote(c.name) 

1642 action_set_ops.append("%s = %s" % (key_text, value_text)) 

1643 

1644 # check for names that don't match columns 

1645 if set_parameters: 

1646 util.warn( 

1647 "Additional column names not matching " 

1648 "any column keys in table '%s': %s" 

1649 % ( 

1650 self.current_executable.table.name, 

1651 (", ".join("'%s'" % c for c in set_parameters)), 

1652 ) 

1653 ) 

1654 for k, v in set_parameters.items(): 

1655 key_text = ( 

1656 self.preparer.quote(k) 

1657 if isinstance(k, str) 

1658 else self.process(k, use_schema=False) 

1659 ) 

1660 value_text = self.process( 

1661 coercions.expect(roles.ExpressionElementRole, v), 

1662 use_schema=False, 

1663 ) 

1664 action_set_ops.append("%s = %s" % (key_text, value_text)) 

1665 

1666 action_text = ", ".join(action_set_ops) 

1667 if clause.update_whereclause is not None: 

1668 action_text += " WHERE %s" % self.process( 

1669 clause.update_whereclause, include_table=True, use_schema=False 

1670 ) 

1671 

1672 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) 

1673 

1674 def visit_bitwise_xor_op_binary(self, binary, operator, **kw): 

1675 # sqlite has no xor. Use "a XOR b" = "(a | b) - (a & b)". 

1676 kw["eager_grouping"] = True 

1677 or_ = self._generate_generic_binary(binary, " | ", **kw) 

1678 and_ = self._generate_generic_binary(binary, " & ", **kw) 

1679 return f"({or_} - {and_})" 

1680 

1681 

1682class SQLiteDDLCompiler(compiler.DDLCompiler): 

1683 def get_column_specification(self, column, **kwargs): 

1684 coltype = self.dialect.type_compiler_instance.process( 

1685 column.type, type_expression=column 

1686 ) 

1687 colspec = self.preparer.format_column(column) + " " + coltype 

1688 default = self.get_column_default_string(column) 

1689 if default is not None: 

1690 

1691 if not re.match(r"""^\s*[\'\"\(]""", default) and re.match( 

1692 r".*\W.*", default 

1693 ): 

1694 colspec += f" DEFAULT ({default})" 

1695 else: 

1696 colspec += f" DEFAULT {default}" 

1697 

1698 if not column.nullable: 

1699 colspec += " NOT NULL" 

1700 

1701 on_conflict_clause = column.dialect_options["sqlite"][ 

1702 "on_conflict_not_null" 

1703 ] 

1704 if on_conflict_clause is not None: 

1705 colspec += " ON CONFLICT " + on_conflict_clause 

1706 

1707 if column.primary_key: 

1708 if ( 

1709 column.autoincrement is True 

1710 and len(column.table.primary_key.columns) != 1 

1711 ): 

1712 raise exc.CompileError( 

1713 "SQLite does not support autoincrement for " 

1714 "composite primary keys" 

1715 ) 

1716 

1717 if ( 

1718 column.table.dialect_options["sqlite"]["autoincrement"] 

1719 and len(column.table.primary_key.columns) == 1 

1720 and issubclass(column.type._type_affinity, sqltypes.Integer) 

1721 and not column.foreign_keys 

1722 ): 

1723 colspec += " PRIMARY KEY" 

1724 

1725 on_conflict_clause = column.dialect_options["sqlite"][ 

1726 "on_conflict_primary_key" 

1727 ] 

1728 if on_conflict_clause is not None: 

1729 colspec += " ON CONFLICT " + on_conflict_clause 

1730 

1731 colspec += " AUTOINCREMENT" 

1732 

1733 if column.computed is not None: 

1734 colspec += " " + self.process(column.computed) 

1735 

1736 return colspec 

1737 

1738 def visit_primary_key_constraint(self, constraint, **kw): 

1739 # for columns with sqlite_autoincrement=True, 

1740 # the PRIMARY KEY constraint can only be inline 

1741 # with the column itself. 

1742 if len(constraint.columns) == 1: 

1743 c = list(constraint)[0] 

1744 if ( 

1745 c.primary_key 

1746 and c.table.dialect_options["sqlite"]["autoincrement"] 

1747 and issubclass(c.type._type_affinity, sqltypes.Integer) 

1748 and not c.foreign_keys 

1749 ): 

1750 return None 

1751 

1752 text = super().visit_primary_key_constraint(constraint) 

1753 

1754 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1755 "on_conflict" 

1756 ] 

1757 if on_conflict_clause is None and len(constraint.columns) == 1: 

1758 on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][ 

1759 "on_conflict_primary_key" 

1760 ] 

1761 

1762 if on_conflict_clause is not None: 

1763 text += " ON CONFLICT " + on_conflict_clause 

1764 

1765 return text 

1766 

1767 def visit_unique_constraint(self, constraint, **kw): 

1768 text = super().visit_unique_constraint(constraint) 

1769 

1770 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1771 "on_conflict" 

1772 ] 

1773 if on_conflict_clause is None and len(constraint.columns) == 1: 

1774 col1 = list(constraint)[0] 

1775 if isinstance(col1, schema.SchemaItem): 

1776 on_conflict_clause = list(constraint)[0].dialect_options[ 

1777 "sqlite" 

1778 ]["on_conflict_unique"] 

1779 

1780 if on_conflict_clause is not None: 

1781 text += " ON CONFLICT " + on_conflict_clause 

1782 

1783 return text 

1784 

1785 def visit_check_constraint(self, constraint, **kw): 

1786 text = super().visit_check_constraint(constraint) 

1787 

1788 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1789 "on_conflict" 

1790 ] 

1791 

1792 if on_conflict_clause is not None: 

1793 text += " ON CONFLICT " + on_conflict_clause 

1794 

1795 return text 

1796 

1797 def visit_column_check_constraint(self, constraint, **kw): 

1798 text = super().visit_column_check_constraint(constraint) 

1799 

1800 if constraint.dialect_options["sqlite"]["on_conflict"] is not None: 

1801 raise exc.CompileError( 

1802 "SQLite does not support on conflict clause for " 

1803 "column check constraint" 

1804 ) 

1805 

1806 return text 

1807 

1808 def visit_foreign_key_constraint(self, constraint, **kw): 

1809 local_table = constraint.elements[0].parent.table 

1810 remote_table = constraint.elements[0].column.table 

1811 

1812 if local_table.schema != remote_table.schema: 

1813 return None 

1814 else: 

1815 return super().visit_foreign_key_constraint(constraint) 

1816 

1817 def define_constraint_remote_table(self, constraint, table, preparer): 

1818 """Format the remote table clause of a CREATE CONSTRAINT clause.""" 

1819 

1820 return preparer.format_table(table, use_schema=False) 

1821 

1822 def visit_create_index( 

1823 self, create, include_schema=False, include_table_schema=True, **kw 

1824 ): 

1825 index = create.element 

1826 self._verify_index_table(index) 

1827 preparer = self.preparer 

1828 text = "CREATE " 

1829 if index.unique: 

1830 text += "UNIQUE " 

1831 

1832 text += "INDEX " 

1833 

1834 if create.if_not_exists: 

1835 text += "IF NOT EXISTS " 

1836 

1837 text += "%s ON %s (%s)" % ( 

1838 self._prepared_index_name(index, include_schema=True), 

1839 preparer.format_table(index.table, use_schema=False), 

1840 ", ".join( 

1841 self.sql_compiler.process( 

1842 expr, include_table=False, literal_binds=True 

1843 ) 

1844 for expr in index.expressions 

1845 ), 

1846 ) 

1847 

1848 whereclause = index.dialect_options["sqlite"]["where"] 

1849 if whereclause is not None: 

1850 where_compiled = self.sql_compiler.process( 

1851 whereclause, include_table=False, literal_binds=True 

1852 ) 

1853 text += " WHERE " + where_compiled 

1854 

1855 return text 

1856 

1857 def post_create_table(self, table): 

1858 table_options = [] 

1859 

1860 if not table.dialect_options["sqlite"]["with_rowid"]: 

1861 table_options.append("WITHOUT ROWID") 

1862 

1863 if table.dialect_options["sqlite"]["strict"]: 

1864 table_options.append("STRICT") 

1865 

1866 if table_options: 

1867 return "\n " + ",\n ".join(table_options) 

1868 else: 

1869 return "" 

1870 

1871 

1872class SQLiteTypeCompiler(compiler.GenericTypeCompiler): 

1873 def visit_large_binary(self, type_, **kw): 

1874 return self.visit_BLOB(type_) 

1875 

1876 def visit_DATETIME(self, type_, **kw): 

1877 if ( 

1878 not isinstance(type_, _DateTimeMixin) 

1879 or type_.format_is_text_affinity 

1880 ): 

1881 return super().visit_DATETIME(type_) 

1882 else: 

1883 return "DATETIME_CHAR" 

1884 

1885 def visit_DATE(self, type_, **kw): 

1886 if ( 

1887 not isinstance(type_, _DateTimeMixin) 

1888 or type_.format_is_text_affinity 

1889 ): 

1890 return super().visit_DATE(type_) 

1891 else: 

1892 return "DATE_CHAR" 

1893 

1894 def visit_TIME(self, type_, **kw): 

1895 if ( 

1896 not isinstance(type_, _DateTimeMixin) 

1897 or type_.format_is_text_affinity 

1898 ): 

1899 return super().visit_TIME(type_) 

1900 else: 

1901 return "TIME_CHAR" 

1902 

1903 def visit_JSON(self, type_, **kw): 

1904 # note this name provides NUMERIC affinity, not TEXT. 

1905 # should not be an issue unless the JSON value consists of a single 

1906 # numeric value. JSONTEXT can be used if this case is required. 

1907 return "JSON" 

1908 

1909 

1910class SQLiteIdentifierPreparer(compiler.IdentifierPreparer): 

1911 reserved_words = { 

1912 "add", 

1913 "after", 

1914 "all", 

1915 "alter", 

1916 "analyze", 

1917 "and", 

1918 "as", 

1919 "asc", 

1920 "attach", 

1921 "autoincrement", 

1922 "before", 

1923 "begin", 

1924 "between", 

1925 "by", 

1926 "cascade", 

1927 "case", 

1928 "cast", 

1929 "check", 

1930 "collate", 

1931 "column", 

1932 "commit", 

1933 "conflict", 

1934 "constraint", 

1935 "create", 

1936 "cross", 

1937 "current_date", 

1938 "current_time", 

1939 "current_timestamp", 

1940 "database", 

1941 "default", 

1942 "deferrable", 

1943 "deferred", 

1944 "delete", 

1945 "desc", 

1946 "detach", 

1947 "distinct", 

1948 "drop", 

1949 "each", 

1950 "else", 

1951 "end", 

1952 "escape", 

1953 "except", 

1954 "exclusive", 

1955 "exists", 

1956 "explain", 

1957 "false", 

1958 "fail", 

1959 "for", 

1960 "foreign", 

1961 "from", 

1962 "full", 

1963 "glob", 

1964 "group", 

1965 "having", 

1966 "if", 

1967 "ignore", 

1968 "immediate", 

1969 "in", 

1970 "index", 

1971 "indexed", 

1972 "initially", 

1973 "inner", 

1974 "insert", 

1975 "instead", 

1976 "intersect", 

1977 "into", 

1978 "is", 

1979 "isnull", 

1980 "join", 

1981 "key", 

1982 "left", 

1983 "like", 

1984 "limit", 

1985 "match", 

1986 "natural", 

1987 "not", 

1988 "notnull", 

1989 "null", 

1990 "of", 

1991 "offset", 

1992 "on", 

1993 "or", 

1994 "order", 

1995 "outer", 

1996 "plan", 

1997 "pragma", 

1998 "primary", 

1999 "query", 

2000 "raise", 

2001 "references", 

2002 "reindex", 

2003 "rename", 

2004 "replace", 

2005 "restrict", 

2006 "right", 

2007 "rollback", 

2008 "row", 

2009 "select", 

2010 "set", 

2011 "table", 

2012 "temp", 

2013 "temporary", 

2014 "then", 

2015 "to", 

2016 "transaction", 

2017 "trigger", 

2018 "true", 

2019 "union", 

2020 "unique", 

2021 "update", 

2022 "using", 

2023 "vacuum", 

2024 "values", 

2025 "view", 

2026 "virtual", 

2027 "when", 

2028 "where", 

2029 } 

2030 

2031 

2032class SQLiteExecutionContext(default.DefaultExecutionContext): 

2033 @util.memoized_property 

2034 def _preserve_raw_colnames(self): 

2035 return ( 

2036 not self.dialect._broken_dotted_colnames 

2037 or self.execution_options.get("sqlite_raw_colnames", False) 

2038 ) 

2039 

2040 def _translate_colname(self, colname): 

2041 # TODO: detect SQLite version 3.10.0 or greater; 

2042 # see [ticket:3633] 

2043 

2044 # adjust for dotted column names. SQLite 

2045 # in the case of UNION may store col names as 

2046 # "tablename.colname", or if using an attached database, 

2047 # "database.tablename.colname", in cursor.description 

2048 if not self._preserve_raw_colnames and "." in colname: 

2049 return colname.split(".")[-1], colname 

2050 else: 

2051 return colname, None 

2052 

2053 

2054class SQLiteDialect(default.DefaultDialect): 

2055 name = "sqlite" 

2056 supports_alter = False 

2057 

2058 # SQlite supports "DEFAULT VALUES" but *does not* support 

2059 # "VALUES (DEFAULT)" 

2060 supports_default_values = True 

2061 supports_default_metavalue = False 

2062 

2063 # sqlite issue: 

2064 # https://github.com/python/cpython/issues/93421 

2065 # note this parameter is no longer used by the ORM or default dialect 

2066 # see #9414 

2067 supports_sane_rowcount_returning = False 

2068 

2069 supports_empty_insert = False 

2070 supports_cast = True 

2071 supports_multivalues_insert = True 

2072 use_insertmanyvalues = True 

2073 tuple_in_values = True 

2074 supports_statement_cache = True 

2075 insert_null_pk_still_autoincrements = True 

2076 insert_returning = True 

2077 update_returning = True 

2078 update_returning_multifrom = True 

2079 delete_returning = True 

2080 update_returning_multifrom = True 

2081 

2082 supports_default_metavalue = True 

2083 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

2084 

2085 default_metavalue_token = "NULL" 

2086 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

2087 parenthesis.""" 

2088 

2089 default_paramstyle = "qmark" 

2090 execution_ctx_cls = SQLiteExecutionContext 

2091 statement_compiler = SQLiteCompiler 

2092 ddl_compiler = SQLiteDDLCompiler 

2093 type_compiler_cls = SQLiteTypeCompiler 

2094 preparer = SQLiteIdentifierPreparer 

2095 ischema_names = ischema_names 

2096 colspecs = colspecs 

2097 

2098 construct_arguments = [ 

2099 ( 

2100 sa_schema.Table, 

2101 { 

2102 "autoincrement": False, 

2103 "with_rowid": True, 

2104 "strict": False, 

2105 }, 

2106 ), 

2107 (sa_schema.Index, {"where": None}), 

2108 ( 

2109 sa_schema.Column, 

2110 { 

2111 "on_conflict_primary_key": None, 

2112 "on_conflict_not_null": None, 

2113 "on_conflict_unique": None, 

2114 }, 

2115 ), 

2116 (sa_schema.Constraint, {"on_conflict": None}), 

2117 ] 

2118 

2119 _broken_fk_pragma_quotes = False 

2120 _broken_dotted_colnames = False 

2121 

2122 def __init__( 

2123 self, 

2124 native_datetime=False, 

2125 json_serializer=None, 

2126 json_deserializer=None, 

2127 **kwargs, 

2128 ): 

2129 default.DefaultDialect.__init__(self, **kwargs) 

2130 

2131 self._json_serializer = json_serializer 

2132 self._json_deserializer = json_deserializer 

2133 

2134 # this flag used by pysqlite dialect, and perhaps others in the 

2135 # future, to indicate the driver is handling date/timestamp 

2136 # conversions (and perhaps datetime/time as well on some hypothetical 

2137 # driver ?) 

2138 self.native_datetime = native_datetime 

2139 

2140 if self.dbapi is not None: 

2141 if self.dbapi.sqlite_version_info < (3, 7, 16): 

2142 util.warn( 

2143 "SQLite version %s is older than 3.7.16, and will not " 

2144 "support right nested joins, as are sometimes used in " 

2145 "more complex ORM scenarios. SQLAlchemy 1.4 and above " 

2146 "no longer tries to rewrite these joins." 

2147 % (self.dbapi.sqlite_version_info,) 

2148 ) 

2149 

2150 # NOTE: python 3.7 on fedora for me has SQLite 3.34.1. These 

2151 # version checks are getting very stale. 

2152 self._broken_dotted_colnames = self.dbapi.sqlite_version_info < ( 

2153 3, 

2154 10, 

2155 0, 

2156 ) 

2157 self.supports_default_values = self.dbapi.sqlite_version_info >= ( 

2158 3, 

2159 3, 

2160 8, 

2161 ) 

2162 self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3) 

2163 self.supports_multivalues_insert = ( 

2164 # https://www.sqlite.org/releaselog/3_7_11.html 

2165 self.dbapi.sqlite_version_info 

2166 >= (3, 7, 11) 

2167 ) 

2168 # see https://www.sqlalchemy.org/trac/ticket/2568 

2169 # as well as https://www.sqlite.org/src/info/600482d161 

2170 self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < ( 

2171 3, 

2172 6, 

2173 14, 

2174 ) 

2175 

2176 if self.dbapi.sqlite_version_info < (3, 35) or util.pypy: 

2177 self.update_returning = self.delete_returning = ( 

2178 self.insert_returning 

2179 ) = False 

2180 

2181 if self.dbapi.sqlite_version_info < (3, 32, 0): 

2182 # https://www.sqlite.org/limits.html 

2183 self.insertmanyvalues_max_parameters = 999 

2184 

2185 _isolation_lookup = util.immutabledict( 

2186 {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0} 

2187 ) 

2188 

2189 def get_isolation_level_values(self, dbapi_connection): 

2190 return list(self._isolation_lookup) 

2191 

2192 def set_isolation_level(self, dbapi_connection, level): 

2193 isolation_level = self._isolation_lookup[level] 

2194 

2195 cursor = dbapi_connection.cursor() 

2196 cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}") 

2197 cursor.close() 

2198 

2199 def get_isolation_level(self, dbapi_connection): 

2200 cursor = dbapi_connection.cursor() 

2201 cursor.execute("PRAGMA read_uncommitted") 

2202 res = cursor.fetchone() 

2203 if res: 

2204 value = res[0] 

2205 else: 

2206 # https://www.sqlite.org/changes.html#version_3_3_3 

2207 # "Optional READ UNCOMMITTED isolation (instead of the 

2208 # default isolation level of SERIALIZABLE) and 

2209 # table level locking when database connections 

2210 # share a common cache."" 

2211 # pre-SQLite 3.3.0 default to 0 

2212 value = 0 

2213 cursor.close() 

2214 if value == 0: 

2215 return "SERIALIZABLE" 

2216 elif value == 1: 

2217 return "READ UNCOMMITTED" 

2218 else: 

2219 assert False, "Unknown isolation level %s" % value 

2220 

2221 @reflection.cache 

2222 def get_schema_names(self, connection, **kw): 

2223 s = "PRAGMA database_list" 

2224 dl = connection.exec_driver_sql(s) 

2225 

2226 return [db[1] for db in dl if db[1] != "temp"] 

2227 

2228 def _format_schema(self, schema, table_name): 

2229 if schema is not None: 

2230 qschema = self.identifier_preparer.quote_identifier(schema) 

2231 name = f"{qschema}.{table_name}" 

2232 else: 

2233 name = table_name 

2234 return name 

2235 

2236 def _sqlite_main_query( 

2237 self, 

2238 table: str, 

2239 type_: str, 

2240 schema: Optional[str], 

2241 sqlite_include_internal: bool, 

2242 ): 

2243 main = self._format_schema(schema, table) 

2244 if not sqlite_include_internal: 

2245 filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'" 

2246 else: 

2247 filter_table = "" 

2248 query = ( 

2249 f"SELECT name FROM {main} " 

2250 f"WHERE type='{type_}'{filter_table} " 

2251 "ORDER BY name" 

2252 ) 

2253 return query 

2254 

2255 @reflection.cache 

2256 def get_table_names( 

2257 self, connection, schema=None, sqlite_include_internal=False, **kw 

2258 ): 

2259 query = self._sqlite_main_query( 

2260 "sqlite_master", "table", schema, sqlite_include_internal 

2261 ) 

2262 names = connection.exec_driver_sql(query).scalars().all() 

2263 return names 

2264 

2265 @reflection.cache 

2266 def get_temp_table_names( 

2267 self, connection, sqlite_include_internal=False, **kw 

2268 ): 

2269 query = self._sqlite_main_query( 

2270 "sqlite_temp_master", "table", None, sqlite_include_internal 

2271 ) 

2272 names = connection.exec_driver_sql(query).scalars().all() 

2273 return names 

2274 

2275 @reflection.cache 

2276 def get_temp_view_names( 

2277 self, connection, sqlite_include_internal=False, **kw 

2278 ): 

2279 query = self._sqlite_main_query( 

2280 "sqlite_temp_master", "view", None, sqlite_include_internal 

2281 ) 

2282 names = connection.exec_driver_sql(query).scalars().all() 

2283 return names 

2284 

2285 @reflection.cache 

2286 def has_table(self, connection, table_name, schema=None, **kw): 

2287 self._ensure_has_table_connection(connection) 

2288 

2289 if schema is not None and schema not in self.get_schema_names( 

2290 connection, **kw 

2291 ): 

2292 return False 

2293 

2294 info = self._get_table_pragma( 

2295 connection, "table_info", table_name, schema=schema 

2296 ) 

2297 return bool(info) 

2298 

2299 def _get_default_schema_name(self, connection): 

2300 return "main" 

2301 

2302 @reflection.cache 

2303 def get_view_names( 

2304 self, connection, schema=None, sqlite_include_internal=False, **kw 

2305 ): 

2306 query = self._sqlite_main_query( 

2307 "sqlite_master", "view", schema, sqlite_include_internal 

2308 ) 

2309 names = connection.exec_driver_sql(query).scalars().all() 

2310 return names 

2311 

2312 @reflection.cache 

2313 def get_view_definition(self, connection, view_name, schema=None, **kw): 

2314 if schema is not None: 

2315 qschema = self.identifier_preparer.quote_identifier(schema) 

2316 master = f"{qschema}.sqlite_master" 

2317 s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % ( 

2318 master, 

2319 ) 

2320 rs = connection.exec_driver_sql(s, (view_name,)) 

2321 else: 

2322 try: 

2323 s = ( 

2324 "SELECT sql FROM " 

2325 " (SELECT * FROM sqlite_master UNION ALL " 

2326 " SELECT * FROM sqlite_temp_master) " 

2327 "WHERE name = ? " 

2328 "AND type='view'" 

2329 ) 

2330 rs = connection.exec_driver_sql(s, (view_name,)) 

2331 except exc.DBAPIError: 

2332 s = ( 

2333 "SELECT sql FROM sqlite_master WHERE name = ? " 

2334 "AND type='view'" 

2335 ) 

2336 rs = connection.exec_driver_sql(s, (view_name,)) 

2337 

2338 result = rs.fetchall() 

2339 if result: 

2340 return result[0].sql 

2341 else: 

2342 raise exc.NoSuchTableError( 

2343 f"{schema}.{view_name}" if schema else view_name 

2344 ) 

2345 

2346 @reflection.cache 

2347 def get_columns(self, connection, table_name, schema=None, **kw): 

2348 pragma = "table_info" 

2349 # computed columns are threaded as hidden, they require table_xinfo 

2350 if self.server_version_info >= (3, 31): 

2351 pragma = "table_xinfo" 

2352 info = self._get_table_pragma( 

2353 connection, pragma, table_name, schema=schema 

2354 ) 

2355 columns = [] 

2356 tablesql = None 

2357 for row in info: 

2358 name = row[1] 

2359 type_ = row[2].upper() 

2360 nullable = not row[3] 

2361 default = row[4] 

2362 primary_key = row[5] 

2363 hidden = row[6] if pragma == "table_xinfo" else 0 

2364 

2365 # hidden has value 0 for normal columns, 1 for hidden columns, 

2366 # 2 for computed virtual columns and 3 for computed stored columns 

2367 # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b 

2368 if hidden == 1: 

2369 continue 

2370 

2371 generated = bool(hidden) 

2372 persisted = hidden == 3 

2373 

2374 if tablesql is None and generated: 

2375 tablesql = self._get_table_sql( 

2376 connection, table_name, schema, **kw 

2377 ) 

2378 # remove create table 

2379 match = re.match( 

2380 r"create table .*?\((.*)\)$", 

2381 tablesql.strip(), 

2382 re.DOTALL | re.IGNORECASE, 

2383 ) 

2384 assert match, f"create table not found in {tablesql}" 

2385 tablesql = match.group(1).strip() 

2386 

2387 columns.append( 

2388 self._get_column_info( 

2389 name, 

2390 type_, 

2391 nullable, 

2392 default, 

2393 primary_key, 

2394 generated, 

2395 persisted, 

2396 tablesql, 

2397 ) 

2398 ) 

2399 if columns: 

2400 return columns 

2401 elif not self.has_table(connection, table_name, schema): 

2402 raise exc.NoSuchTableError( 

2403 f"{schema}.{table_name}" if schema else table_name 

2404 ) 

2405 else: 

2406 return ReflectionDefaults.columns() 

2407 

2408 def _get_column_info( 

2409 self, 

2410 name, 

2411 type_, 

2412 nullable, 

2413 default, 

2414 primary_key, 

2415 generated, 

2416 persisted, 

2417 tablesql, 

2418 ): 

2419 if generated: 

2420 # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)" 

2421 # somehow is "INTEGER GENERATED ALWAYS" 

2422 type_ = re.sub("generated", "", type_, flags=re.IGNORECASE) 

2423 type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip() 

2424 

2425 coltype = self._resolve_type_affinity(type_) 

2426 

2427 if default is not None: 

2428 default = str(default) 

2429 

2430 colspec = { 

2431 "name": name, 

2432 "type": coltype, 

2433 "nullable": nullable, 

2434 "default": default, 

2435 "primary_key": primary_key, 

2436 } 

2437 if generated: 

2438 sqltext = "" 

2439 if tablesql: 

2440 pattern = ( 

2441 r"[^,]*\s+GENERATED\s+ALWAYS\s+AS" 

2442 r"\s+\((.*)\)\s*(?:virtual|stored)?" 

2443 ) 

2444 match = re.search( 

2445 re.escape(name) + pattern, tablesql, re.IGNORECASE 

2446 ) 

2447 if match: 

2448 sqltext = match.group(1) 

2449 colspec["computed"] = {"sqltext": sqltext, "persisted": persisted} 

2450 return colspec 

2451 

2452 def _resolve_type_affinity(self, type_): 

2453 """Return a data type from a reflected column, using affinity rules. 

2454 

2455 SQLite's goal for universal compatibility introduces some complexity 

2456 during reflection, as a column's defined type might not actually be a 

2457 type that SQLite understands - or indeed, my not be defined *at all*. 

2458 Internally, SQLite handles this with a 'data type affinity' for each 

2459 column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER', 

2460 'REAL', or 'NONE' (raw bits). The algorithm that determines this is 

2461 listed in https://www.sqlite.org/datatype3.html section 2.1. 

2462 

2463 This method allows SQLAlchemy to support that algorithm, while still 

2464 providing access to smarter reflection utilities by recognizing 

2465 column definitions that SQLite only supports through affinity (like 

2466 DATE and DOUBLE). 

2467 

2468 """ 

2469 match = re.match(r"([\w ]+)(\(.*?\))?", type_) 

2470 if match: 

2471 coltype = match.group(1) 

2472 args = match.group(2) 

2473 else: 

2474 coltype = "" 

2475 args = "" 

2476 

2477 if coltype in self.ischema_names: 

2478 coltype = self.ischema_names[coltype] 

2479 elif "INT" in coltype: 

2480 coltype = sqltypes.INTEGER 

2481 elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype: 

2482 coltype = sqltypes.TEXT 

2483 elif "BLOB" in coltype or not coltype: 

2484 coltype = sqltypes.NullType 

2485 elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype: 

2486 coltype = sqltypes.REAL 

2487 else: 

2488 coltype = sqltypes.NUMERIC 

2489 

2490 if args is not None: 

2491 args = re.findall(r"(\d+)", args) 

2492 try: 

2493 coltype = coltype(*[int(a) for a in args]) 

2494 except TypeError: 

2495 util.warn( 

2496 "Could not instantiate type %s with " 

2497 "reflected arguments %s; using no arguments." 

2498 % (coltype, args) 

2499 ) 

2500 coltype = coltype() 

2501 else: 

2502 coltype = coltype() 

2503 

2504 return coltype 

2505 

2506 @reflection.cache 

2507 def get_pk_constraint(self, connection, table_name, schema=None, **kw): 

2508 constraint_name = None 

2509 table_data = self._get_table_sql(connection, table_name, schema=schema) 

2510 if table_data: 

2511 PK_PATTERN = r"CONSTRAINT (\w+) PRIMARY KEY" 

2512 result = re.search(PK_PATTERN, table_data, re.I) 

2513 constraint_name = result.group(1) if result else None 

2514 

2515 cols = self.get_columns(connection, table_name, schema, **kw) 

2516 # consider only pk columns. This also avoids sorting the cached 

2517 # value returned by get_columns 

2518 cols = [col for col in cols if col.get("primary_key", 0) > 0] 

2519 cols.sort(key=lambda col: col.get("primary_key")) 

2520 pkeys = [col["name"] for col in cols] 

2521 

2522 if pkeys: 

2523 return {"constrained_columns": pkeys, "name": constraint_name} 

2524 else: 

2525 return ReflectionDefaults.pk_constraint() 

2526 

2527 @reflection.cache 

2528 def get_foreign_keys(self, connection, table_name, schema=None, **kw): 

2529 # sqlite makes this *extremely difficult*. 

2530 # First, use the pragma to get the actual FKs. 

2531 pragma_fks = self._get_table_pragma( 

2532 connection, "foreign_key_list", table_name, schema=schema 

2533 ) 

2534 

2535 fks = {} 

2536 

2537 for row in pragma_fks: 

2538 (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4]) 

2539 

2540 if not rcol: 

2541 # no referred column, which means it was not named in the 

2542 # original DDL. The referred columns of the foreign key 

2543 # constraint are therefore the primary key of the referred 

2544 # table. 

2545 try: 

2546 referred_pk = self.get_pk_constraint( 

2547 connection, rtbl, schema=schema, **kw 

2548 ) 

2549 referred_columns = referred_pk["constrained_columns"] 

2550 except exc.NoSuchTableError: 

2551 # ignore not existing parents 

2552 referred_columns = [] 

2553 else: 

2554 # note we use this list only if this is the first column 

2555 # in the constraint. for subsequent columns we ignore the 

2556 # list and append "rcol" if present. 

2557 referred_columns = [] 

2558 

2559 if self._broken_fk_pragma_quotes: 

2560 rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl) 

2561 

2562 if numerical_id in fks: 

2563 fk = fks[numerical_id] 

2564 else: 

2565 fk = fks[numerical_id] = { 

2566 "name": None, 

2567 "constrained_columns": [], 

2568 "referred_schema": schema, 

2569 "referred_table": rtbl, 

2570 "referred_columns": referred_columns, 

2571 "options": {}, 

2572 } 

2573 fks[numerical_id] = fk 

2574 

2575 fk["constrained_columns"].append(lcol) 

2576 

2577 if rcol: 

2578 fk["referred_columns"].append(rcol) 

2579 

2580 def fk_sig(constrained_columns, referred_table, referred_columns): 

2581 return ( 

2582 tuple(constrained_columns) 

2583 + (referred_table,) 

2584 + tuple(referred_columns) 

2585 ) 

2586 

2587 # then, parse the actual SQL and attempt to find DDL that matches 

2588 # the names as well. SQLite saves the DDL in whatever format 

2589 # it was typed in as, so need to be liberal here. 

2590 

2591 keys_by_signature = { 

2592 fk_sig( 

2593 fk["constrained_columns"], 

2594 fk["referred_table"], 

2595 fk["referred_columns"], 

2596 ): fk 

2597 for fk in fks.values() 

2598 } 

2599 

2600 table_data = self._get_table_sql(connection, table_name, schema=schema) 

2601 

2602 def parse_fks(): 

2603 if table_data is None: 

2604 # system tables, etc. 

2605 return 

2606 

2607 # note that we already have the FKs from PRAGMA above. This whole 

2608 # regexp thing is trying to locate additional detail about the 

2609 # FKs, namely the name of the constraint and other options. 

2610 # so parsing the columns is really about matching it up to what 

2611 # we already have. 

2612 FK_PATTERN = ( 

2613 r"(?:CONSTRAINT (\w+) +)?" 

2614 r"FOREIGN KEY *\( *(.+?) *\) +" 

2615 r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *' # noqa: E501 

2616 r"((?:ON (?:DELETE|UPDATE) " 

2617 r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)" 

2618 r"((?:NOT +)?DEFERRABLE)?" 

2619 r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?" 

2620 ) 

2621 for match in re.finditer(FK_PATTERN, table_data, re.I): 

2622 ( 

2623 constraint_name, 

2624 constrained_columns, 

2625 referred_quoted_name, 

2626 referred_name, 

2627 referred_columns, 

2628 onupdatedelete, 

2629 deferrable, 

2630 initially, 

2631 ) = match.group(1, 2, 3, 4, 5, 6, 7, 8) 

2632 constrained_columns = list( 

2633 self._find_cols_in_sig(constrained_columns) 

2634 ) 

2635 if not referred_columns: 

2636 referred_columns = constrained_columns 

2637 else: 

2638 referred_columns = list( 

2639 self._find_cols_in_sig(referred_columns) 

2640 ) 

2641 referred_name = referred_quoted_name or referred_name 

2642 options = {} 

2643 

2644 for token in re.split(r" *\bON\b *", onupdatedelete.upper()): 

2645 if token.startswith("DELETE"): 

2646 ondelete = token[6:].strip() 

2647 if ondelete and ondelete != "NO ACTION": 

2648 options["ondelete"] = ondelete 

2649 elif token.startswith("UPDATE"): 

2650 onupdate = token[6:].strip() 

2651 if onupdate and onupdate != "NO ACTION": 

2652 options["onupdate"] = onupdate 

2653 

2654 if deferrable: 

2655 options["deferrable"] = "NOT" not in deferrable.upper() 

2656 if initially: 

2657 options["initially"] = initially.upper() 

2658 

2659 yield ( 

2660 constraint_name, 

2661 constrained_columns, 

2662 referred_name, 

2663 referred_columns, 

2664 options, 

2665 ) 

2666 

2667 fkeys = [] 

2668 

2669 for ( 

2670 constraint_name, 

2671 constrained_columns, 

2672 referred_name, 

2673 referred_columns, 

2674 options, 

2675 ) in parse_fks(): 

2676 sig = fk_sig(constrained_columns, referred_name, referred_columns) 

2677 if sig not in keys_by_signature: 

2678 util.warn( 

2679 "WARNING: SQL-parsed foreign key constraint " 

2680 "'%s' could not be located in PRAGMA " 

2681 "foreign_keys for table %s" % (sig, table_name) 

2682 ) 

2683 continue 

2684 key = keys_by_signature.pop(sig) 

2685 key["name"] = constraint_name 

2686 key["options"] = options 

2687 fkeys.append(key) 

2688 # assume the remainders are the unnamed, inline constraints, just 

2689 # use them as is as it's extremely difficult to parse inline 

2690 # constraints 

2691 fkeys.extend(keys_by_signature.values()) 

2692 if fkeys: 

2693 return fkeys 

2694 else: 

2695 return ReflectionDefaults.foreign_keys() 

2696 

2697 def _find_cols_in_sig(self, sig): 

2698 for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I): 

2699 yield match.group(1) or match.group(2) 

2700 

2701 @reflection.cache 

2702 def get_unique_constraints( 

2703 self, connection, table_name, schema=None, **kw 

2704 ): 

2705 auto_index_by_sig = {} 

2706 for idx in self.get_indexes( 

2707 connection, 

2708 table_name, 

2709 schema=schema, 

2710 include_auto_indexes=True, 

2711 **kw, 

2712 ): 

2713 if not idx["name"].startswith("sqlite_autoindex"): 

2714 continue 

2715 sig = tuple(idx["column_names"]) 

2716 auto_index_by_sig[sig] = idx 

2717 

2718 table_data = self._get_table_sql( 

2719 connection, table_name, schema=schema, **kw 

2720 ) 

2721 unique_constraints = [] 

2722 

2723 def parse_uqs(): 

2724 if table_data is None: 

2725 return 

2726 UNIQUE_PATTERN = r'(?:CONSTRAINT "?(.+?)"? +)?UNIQUE *\((.+?)\)' 

2727 INLINE_UNIQUE_PATTERN = ( 

2728 r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?)[\t ]' 

2729 r"+[a-z0-9_ ]+?[\t ]+UNIQUE" 

2730 ) 

2731 

2732 for match in re.finditer(UNIQUE_PATTERN, table_data, re.I): 

2733 name, cols = match.group(1, 2) 

2734 yield name, list(self._find_cols_in_sig(cols)) 

2735 

2736 # we need to match inlines as well, as we seek to differentiate 

2737 # a UNIQUE constraint from a UNIQUE INDEX, even though these 

2738 # are kind of the same thing :) 

2739 for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I): 

2740 cols = list( 

2741 self._find_cols_in_sig(match.group(1) or match.group(2)) 

2742 ) 

2743 yield None, cols 

2744 

2745 for name, cols in parse_uqs(): 

2746 sig = tuple(cols) 

2747 if sig in auto_index_by_sig: 

2748 auto_index_by_sig.pop(sig) 

2749 parsed_constraint = {"name": name, "column_names": cols} 

2750 unique_constraints.append(parsed_constraint) 

2751 # NOTE: auto_index_by_sig might not be empty here, 

2752 # the PRIMARY KEY may have an entry. 

2753 if unique_constraints: 

2754 return unique_constraints 

2755 else: 

2756 return ReflectionDefaults.unique_constraints() 

2757 

2758 @reflection.cache 

2759 def get_check_constraints(self, connection, table_name, schema=None, **kw): 

2760 table_data = self._get_table_sql( 

2761 connection, table_name, schema=schema, **kw 

2762 ) 

2763 

2764 # NOTE NOTE NOTE 

2765 # DO NOT CHANGE THIS REGULAR EXPRESSION. There is no known way 

2766 # to parse CHECK constraints that contain newlines themselves using 

2767 # regular expressions, and the approach here relies upon each 

2768 # individual 

2769 # CHECK constraint being on a single line by itself. This 

2770 # necessarily makes assumptions as to how the CREATE TABLE 

2771 # was emitted. A more comprehensive DDL parsing solution would be 

2772 # needed to improve upon the current situation. See #11840 for 

2773 # background 

2774 CHECK_PATTERN = r"(?:CONSTRAINT (.+) +)?CHECK *\( *(.+) *\),? *" 

2775 cks = [] 

2776 

2777 for match in re.finditer(CHECK_PATTERN, table_data or "", re.I): 

2778 

2779 name = match.group(1) 

2780 

2781 if name: 

2782 name = re.sub(r'^"|"$', "", name) 

2783 

2784 cks.append({"sqltext": match.group(2), "name": name}) 

2785 cks.sort(key=lambda d: d["name"] or "~") # sort None as last 

2786 if cks: 

2787 return cks 

2788 else: 

2789 return ReflectionDefaults.check_constraints() 

2790 

2791 @reflection.cache 

2792 def get_indexes(self, connection, table_name, schema=None, **kw): 

2793 pragma_indexes = self._get_table_pragma( 

2794 connection, "index_list", table_name, schema=schema 

2795 ) 

2796 indexes = [] 

2797 

2798 # regular expression to extract the filter predicate of a partial 

2799 # index. this could fail to extract the predicate correctly on 

2800 # indexes created like 

2801 # CREATE INDEX i ON t (col || ') where') WHERE col <> '' 

2802 # but as this function does not support expression-based indexes 

2803 # this case does not occur. 

2804 partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE) 

2805 

2806 if schema: 

2807 schema_expr = "%s." % self.identifier_preparer.quote_identifier( 

2808 schema 

2809 ) 

2810 else: 

2811 schema_expr = "" 

2812 

2813 include_auto_indexes = kw.pop("include_auto_indexes", False) 

2814 for row in pragma_indexes: 

2815 # ignore implicit primary key index. 

2816 # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html 

2817 if not include_auto_indexes and row[1].startswith( 

2818 "sqlite_autoindex" 

2819 ): 

2820 continue 

2821 indexes.append( 

2822 dict( 

2823 name=row[1], 

2824 column_names=[], 

2825 unique=row[2], 

2826 dialect_options={}, 

2827 ) 

2828 ) 

2829 

2830 # check partial indexes 

2831 if len(row) >= 5 and row[4]: 

2832 s = ( 

2833 "SELECT sql FROM %(schema)ssqlite_master " 

2834 "WHERE name = ? " 

2835 "AND type = 'index'" % {"schema": schema_expr} 

2836 ) 

2837 rs = connection.exec_driver_sql(s, (row[1],)) 

2838 index_sql = rs.scalar() 

2839 predicate_match = partial_pred_re.search(index_sql) 

2840 if predicate_match is None: 

2841 # unless the regex is broken this case shouldn't happen 

2842 # because we know this is a partial index, so the 

2843 # definition sql should match the regex 

2844 util.warn( 

2845 "Failed to look up filter predicate of " 

2846 "partial index %s" % row[1] 

2847 ) 

2848 else: 

2849 predicate = predicate_match.group(1) 

2850 indexes[-1]["dialect_options"]["sqlite_where"] = text( 

2851 predicate 

2852 ) 

2853 

2854 # loop thru unique indexes to get the column names. 

2855 for idx in list(indexes): 

2856 pragma_index = self._get_table_pragma( 

2857 connection, "index_info", idx["name"], schema=schema 

2858 ) 

2859 

2860 for row in pragma_index: 

2861 if row[2] is None: 

2862 util.warn( 

2863 "Skipped unsupported reflection of " 

2864 "expression-based index %s" % idx["name"] 

2865 ) 

2866 indexes.remove(idx) 

2867 break 

2868 else: 

2869 idx["column_names"].append(row[2]) 

2870 

2871 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last 

2872 if indexes: 

2873 return indexes 

2874 elif not self.has_table(connection, table_name, schema): 

2875 raise exc.NoSuchTableError( 

2876 f"{schema}.{table_name}" if schema else table_name 

2877 ) 

2878 else: 

2879 return ReflectionDefaults.indexes() 

2880 

2881 def _is_sys_table(self, table_name): 

2882 return table_name in { 

2883 "sqlite_schema", 

2884 "sqlite_master", 

2885 "sqlite_temp_schema", 

2886 "sqlite_temp_master", 

2887 } 

2888 

2889 @reflection.cache 

2890 def _get_table_sql(self, connection, table_name, schema=None, **kw): 

2891 if schema: 

2892 schema_expr = "%s." % ( 

2893 self.identifier_preparer.quote_identifier(schema) 

2894 ) 

2895 else: 

2896 schema_expr = "" 

2897 try: 

2898 s = ( 

2899 "SELECT sql FROM " 

2900 " (SELECT * FROM %(schema)ssqlite_master UNION ALL " 

2901 " SELECT * FROM %(schema)ssqlite_temp_master) " 

2902 "WHERE name = ? " 

2903 "AND type in ('table', 'view')" % {"schema": schema_expr} 

2904 ) 

2905 rs = connection.exec_driver_sql(s, (table_name,)) 

2906 except exc.DBAPIError: 

2907 s = ( 

2908 "SELECT sql FROM %(schema)ssqlite_master " 

2909 "WHERE name = ? " 

2910 "AND type in ('table', 'view')" % {"schema": schema_expr} 

2911 ) 

2912 rs = connection.exec_driver_sql(s, (table_name,)) 

2913 value = rs.scalar() 

2914 if value is None and not self._is_sys_table(table_name): 

2915 raise exc.NoSuchTableError(f"{schema_expr}{table_name}") 

2916 return value 

2917 

2918 def _get_table_pragma(self, connection, pragma, table_name, schema=None): 

2919 quote = self.identifier_preparer.quote_identifier 

2920 if schema is not None: 

2921 statements = [f"PRAGMA {quote(schema)}."] 

2922 else: 

2923 # because PRAGMA looks in all attached databases if no schema 

2924 # given, need to specify "main" schema, however since we want 

2925 # 'temp' tables in the same namespace as 'main', need to run 

2926 # the PRAGMA twice 

2927 statements = ["PRAGMA main.", "PRAGMA temp."] 

2928 

2929 qtable = quote(table_name) 

2930 for statement in statements: 

2931 statement = f"{statement}{pragma}({qtable})" 

2932 cursor = connection.exec_driver_sql(statement) 

2933 if not cursor._soft_closed: 

2934 # work around SQLite issue whereby cursor.description 

2935 # is blank when PRAGMA returns no rows: 

2936 # https://www.sqlite.org/cvstrac/tktview?tn=1884 

2937 result = cursor.fetchall() 

2938 else: 

2939 result = [] 

2940 if result: 

2941 return result 

2942 else: 

2943 return []