Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/base.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

783 statements  

1# dialects/sqlite/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9 

10r''' 

11.. dialect:: sqlite 

12 :name: SQLite 

13 :normal_support: 3.12+ 

14 :best_effort: 3.7.16+ 

15 

16.. _sqlite_datetime: 

17 

18Date and Time Types 

19------------------- 

20 

21SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does 

22not provide out of the box functionality for translating values between Python 

23`datetime` objects and a SQLite-supported format. SQLAlchemy's own 

24:class:`~sqlalchemy.types.DateTime` and related types provide date formatting 

25and parsing functionality when SQLite is used. The implementation classes are 

26:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`. 

27These types represent dates and times as ISO formatted strings, which also 

28nicely support ordering. There's no reliance on typical "libc" internals for 

29these functions so historical dates are fully supported. 

30 

31Ensuring Text affinity 

32^^^^^^^^^^^^^^^^^^^^^^ 

33 

34The DDL rendered for these types is the standard ``DATE``, ``TIME`` 

35and ``DATETIME`` indicators. However, custom storage formats can also be 

36applied to these types. When the 

37storage format is detected as containing no alpha characters, the DDL for 

38these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``, 

39so that the column continues to have textual affinity. 

40 

41.. seealso:: 

42 

43 `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ - 

44 in the SQLite documentation 

45 

46.. _sqlite_autoincrement: 

47 

48SQLite Auto Incrementing Behavior 

49---------------------------------- 

50 

51Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html 

52 

53Key concepts: 

54 

55* SQLite has an implicit "auto increment" feature that takes place for any 

56 non-composite primary-key column that is specifically created using 

57 "INTEGER PRIMARY KEY" for the type + primary key. 

58 

59* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not** 

60 equivalent to the implicit autoincrement feature; this keyword is not 

61 recommended for general use. SQLAlchemy does not render this keyword 

62 unless a special SQLite-specific directive is used (see below). However, 

63 it still requires that the column's type is named "INTEGER". 

64 

65Using the AUTOINCREMENT Keyword 

66^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

67 

68To specifically render the AUTOINCREMENT keyword on the primary key column 

69when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table 

70construct:: 

71 

72 Table( 

73 "sometable", 

74 metadata, 

75 Column("id", Integer, primary_key=True), 

76 sqlite_autoincrement=True, 

77 ) 

78 

79Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER 

80^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

81 

82SQLite's typing model is based on naming conventions. Among other things, this 

83means that any type name which contains the substring ``"INT"`` will be 

84determined to be of "integer affinity". A type named ``"BIGINT"``, 

85``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be 

86of "integer" affinity. However, **the SQLite autoincrement feature, whether 

87implicitly or explicitly enabled, requires that the name of the column's type 

88is exactly the string "INTEGER"**. Therefore, if an application uses a type 

89like :class:`.BigInteger` for a primary key, on SQLite this type will need to 

90be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE 

91TABLE`` statement in order for the autoincrement behavior to be available. 

92 

93One approach to achieve this is to use :class:`.Integer` on SQLite 

94only using :meth:`.TypeEngine.with_variant`:: 

95 

96 table = Table( 

97 "my_table", 

98 metadata, 

99 Column( 

100 "id", 

101 BigInteger().with_variant(Integer, "sqlite"), 

102 primary_key=True, 

103 ), 

104 ) 

105 

106Another is to use a subclass of :class:`.BigInteger` that overrides its DDL 

107name to be ``INTEGER`` when compiled against SQLite:: 

108 

109 from sqlalchemy import BigInteger 

110 from sqlalchemy.ext.compiler import compiles 

111 

112 

113 class SLBigInteger(BigInteger): 

114 pass 

115 

116 

117 @compiles(SLBigInteger, "sqlite") 

118 def bi_c(element, compiler, **kw): 

119 return "INTEGER" 

120 

121 

122 @compiles(SLBigInteger) 

123 def bi_c(element, compiler, **kw): 

124 return compiler.visit_BIGINT(element, **kw) 

125 

126 

127 table = Table( 

128 "my_table", metadata, Column("id", SLBigInteger(), primary_key=True) 

129 ) 

130 

131.. seealso:: 

132 

133 :meth:`.TypeEngine.with_variant` 

134 

135 :ref:`sqlalchemy.ext.compiler_toplevel` 

136 

137 `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_ 

138 

139.. _sqlite_transactions: 

140 

141Transactions with SQLite and the sqlite3 driver 

142----------------------------------------------- 

143 

144As a file-based database, SQLite's approach to transactions differs from 

145traditional databases in many ways. Additionally, the ``sqlite3`` driver 

146standard with Python (as well as the async version ``aiosqlite`` which builds 

147on top of it) has several quirks, workarounds, and API features in the 

148area of transaction control, all of which generally need to be addressed when 

149constructing a SQLAlchemy application that uses SQLite. 

150 

151Legacy Transaction Mode with the sqlite3 driver 

152^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

153 

154The most important aspect of transaction handling with the sqlite3 driver is 

155that it defaults (which will continue through Python 3.15 before being 

156removed in Python 3.16) to legacy transactional behavior which does 

157not strictly follow :pep:`249`. The way in which the driver diverges from the 

158PEP is that it does not "begin" a transaction automatically as dictated by 

159:pep:`249` except in the case of DML statements, e.g. INSERT, UPDATE, and 

160DELETE. Normally, :pep:`249` dictates that a BEGIN must be emitted upon 

161the first SQL statement of any kind, so that all subsequent operations will 

162be established within a transaction until ``connection.commit()`` has been 

163called. The ``sqlite3`` driver, in an effort to be easier to use in 

164highly concurrent environments, skips this step for DQL (e.g. SELECT) statements, 

165and also skips it for DDL (e.g. CREATE TABLE etc.) statements for more legacy 

166reasons. Statements such as SAVEPOINT are also skipped. 

167 

168In modern versions of the ``sqlite3`` driver as of Python 3.12, this legacy 

169mode of operation is referred to as 

170`"legacy transaction control" <https://docs.python.org/3/library/sqlite3.html#sqlite3-transaction-control-isolation-level>`_, and is in 

171effect by default due to the ``Connection.autocommit`` parameter being set to 

172the constant ``sqlite3.LEGACY_TRANSACTION_CONTROL``. Prior to Python 3.12, 

173the ``Connection.autocommit`` attribute did not exist. 

174 

175The implications of legacy transaction mode include: 

176 

177* **Incorrect support for transactional DDL** - statements like CREATE TABLE, ALTER TABLE, 

178 CREATE INDEX etc. will not automatically BEGIN a transaction if one were not 

179 started already, leading to the changes by each statement being 

180 "autocommitted" immediately unless BEGIN were otherwise emitted first. Very 

181 old (pre Python 3.6) versions of SQLite would also force a COMMIT for these 

182 operations even if a transaction were present, however this is no longer the 

183 case. 

184* **SERIALIZABLE behavior not fully functional** - SQLite's transaction isolation 

185 behavior is normally consistent with SERIALIZABLE isolation, as it is a file- 

186 based system that locks the database file entirely for write operations, 

187 preventing COMMIT until all reader transactions (and associated file locks) 

188 have completed. However, sqlite3's legacy transaction mode fails to emit BEGIN for SELECT 

189 statements, which causes these SELECT statements to no longer be "repeatable", 

190 failing one of the consistency guarantees of SERIALIZABLE. 

191* **Incorrect behavior for SAVEPOINT** - as the SAVEPOINT statement does not 

192 imply a BEGIN, a new SAVEPOINT emitted before a BEGIN will function on its 

193 own but fails to participate in the enclosing transaction, meaning a ROLLBACK 

194 of the transaction will not rollback elements that were part of a released 

195 savepoint. 

196 

197Legacy transaction mode first existed in order to faciliate working around 

198SQLite's file locks. Because SQLite relies upon whole-file locks, it is easy to 

199get "database is locked" errors, particularly when newer features like "write 

200ahead logging" are disabled. This is a key reason why ``sqlite3``'s legacy 

201transaction mode is still the default mode of operation; disabling it will 

202produce behavior that is more susceptible to locked database errors. However 

203note that **legacy transaction mode will no longer be the default** in a future 

204Python version (3.16 as of this writing). 

205 

206.. _sqlite_enabling_transactions: 

207 

208Enabling Non-Legacy SQLite Transactional Modes with the sqlite3 or aiosqlite driver 

209^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

210 

211Current SQLAlchemy support allows either for setting the 

212``.Connection.autocommit`` attribute, most directly by using a 

213:func:`._sa.create_engine` parameter, or if on an older version of Python where 

214the attribute is not available, using event hooks to control the behavior of 

215BEGIN. 

216 

217* **Enabling modern sqlite3 transaction control via the autocommit connect parameter** (Python 3.12 and above) 

218 

219 To use SQLite in the mode described at `Transaction control via the autocommit attribute <https://docs.python.org/3/library/sqlite3.html#transaction-control-via-the-autocommit-attribute>`_, 

220 the most straightforward approach is to set the attribute to its recommended value 

221 of ``False`` at the connect level using :paramref:`_sa.create_engine.connect_args``:: 

222 

223 from sqlalchemy import create_engine 

224 

225 engine = create_engine( 

226 "sqlite:///myfile.db", connect_args={"autocommit": False} 

227 ) 

228 

229 This parameter is also passed through when using the aiosqlite driver:: 

230 

231 from sqlalchemy.ext.asyncio import create_async_engine 

232 

233 engine = create_async_engine( 

234 "sqlite+aiosqlite:///myfile.db", connect_args={"autocommit": False} 

235 ) 

236 

237 The parameter can also be set at the attribute level using the :meth:`.PoolEvents.connect` 

238 event hook, however this will only work for sqlite3, as aiosqlite does not yet expose this 

239 attribute on its ``Connection`` object:: 

240 

241 from sqlalchemy import create_engine, event 

242 

243 engine = create_engine("sqlite:///myfile.db") 

244 

245 

246 @event.listens_for(engine, "connect") 

247 def do_connect(dbapi_connection, connection_record): 

248 # enable autocommit=False mode 

249 dbapi_connection.autocommit = False 

250 

251* **Using SQLAlchemy to emit BEGIN in lieu of SQLite's transaction control** (all Python versions, sqlite3 and aiosqlite) 

252 

253 For older versions of ``sqlite3`` or for cross-compatiblity with older and 

254 newer versions, SQLAlchemy can also take over the job of transaction control. 

255 This is achieved by using the :meth:`.ConnectionEvents.begin` hook 

256 to emit the "BEGIN" command directly, while also disabling SQLite's control 

257 of this command using the :meth:`.PoolEvents.connect` event hook to set the 

258 ``Connection.isolation_level`` attribute to ``None``:: 

259 

260 

261 from sqlalchemy import create_engine, event 

262 

263 engine = create_engine("sqlite:///myfile.db") 

264 

265 

266 @event.listens_for(engine, "connect") 

267 def do_connect(dbapi_connection, connection_record): 

268 # disable sqlite3's emitting of the BEGIN statement entirely. 

269 dbapi_connection.isolation_level = None 

270 

271 

272 @event.listens_for(engine, "begin") 

273 def do_begin(conn): 

274 # emit our own BEGIN. sqlite3 still emits COMMIT/ROLLBACK correctly 

275 conn.exec_driver_sql("BEGIN") 

276 

277 When using the asyncio variant ``aiosqlite``, refer to ``engine.sync_engine`` 

278 as in the example below:: 

279 

280 from sqlalchemy import create_engine, event 

281 from sqlalchemy.ext.asyncio import create_async_engine 

282 

283 engine = create_async_engine("sqlite+aiosqlite:///myfile.db") 

284 

285 

286 @event.listens_for(engine.sync_engine, "connect") 

287 def do_connect(dbapi_connection, connection_record): 

288 # disable aiosqlite's emitting of the BEGIN statement entirely. 

289 dbapi_connection.isolation_level = None 

290 

291 

292 @event.listens_for(engine.sync_engine, "begin") 

293 def do_begin(conn): 

294 # emit our own BEGIN. aiosqlite still emits COMMIT/ROLLBACK correctly 

295 conn.exec_driver_sql("BEGIN") 

296 

297.. _sqlite_isolation_level: 

298 

299Using SQLAlchemy's Driver Level AUTOCOMMIT Feature with SQLite 

300^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

301 

302SQLAlchemy has a comprehensive database isolation feature with optional 

303autocommit support that is introduced in the section :ref:`dbapi_autocommit`. 

304 

305For the ``sqlite3`` and ``aiosqlite`` drivers, SQLAlchemy only includes 

306built-in support for "AUTOCOMMIT". Note that this mode is currently incompatible 

307with the non-legacy isolation mode hooks documented in the previous 

308section at :ref:`sqlite_enabling_transactions`. 

309 

310To use the ``sqlite3`` driver with SQLAlchemy driver-level autocommit, 

311create an engine setting the :paramref:`_sa.create_engine.isolation_level` 

312parameter to "AUTOCOMMIT":: 

313 

314 eng = create_engine("sqlite:///myfile.db", isolation_level="AUTOCOMMIT") 

315 

316When using the above mode, any event hooks that set the sqlite3 ``Connection.autocommit`` 

317parameter away from its default of ``sqlite3.LEGACY_TRANSACTION_CONTROL`` 

318as well as hooks that emit ``BEGIN`` should be disabled. 

319 

320Additional Reading for SQLite / sqlite3 transaction control 

321^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

322 

323Links with important information on SQLite, the sqlite3 driver, 

324as well as long historical conversations on how things got to their current state: 

325 

326* `Isolation in SQLite <https://www.sqlite.org/isolation.html>`_ - on the SQLite website 

327* `Transaction control <https://docs.python.org/3/library/sqlite3.html#transaction-control>`_ - describes the sqlite3 autocommit attribute as well 

328 as the legacy isolation_level attribute. 

329* `sqlite3 SELECT does not BEGIN a transaction, but should according to spec <https://github.com/python/cpython/issues/54133>`_ - imported Python standard library issue on github 

330* `sqlite3 module breaks transactions and potentially corrupts data <https://github.com/python/cpython/issues/54949>`_ - imported Python standard library issue on github 

331 

332 

333INSERT/UPDATE/DELETE...RETURNING 

334--------------------------------- 

335 

336The SQLite dialect supports SQLite 3.35's ``INSERT|UPDATE|DELETE..RETURNING`` 

337syntax. ``INSERT..RETURNING`` may be used 

338automatically in some cases in order to fetch newly generated identifiers in 

339place of the traditional approach of using ``cursor.lastrowid``, however 

340``cursor.lastrowid`` is currently still preferred for simple single-statement 

341cases for its better performance. 

342 

343To specify an explicit ``RETURNING`` clause, use the 

344:meth:`._UpdateBase.returning` method on a per-statement basis:: 

345 

346 # INSERT..RETURNING 

347 result = connection.execute( 

348 table.insert().values(name="foo").returning(table.c.col1, table.c.col2) 

349 ) 

350 print(result.all()) 

351 

352 # UPDATE..RETURNING 

353 result = connection.execute( 

354 table.update() 

355 .where(table.c.name == "foo") 

356 .values(name="bar") 

357 .returning(table.c.col1, table.c.col2) 

358 ) 

359 print(result.all()) 

360 

361 # DELETE..RETURNING 

362 result = connection.execute( 

363 table.delete() 

364 .where(table.c.name == "foo") 

365 .returning(table.c.col1, table.c.col2) 

366 ) 

367 print(result.all()) 

368 

369.. versionadded:: 2.0 Added support for SQLite RETURNING 

370 

371 

372.. _sqlite_foreign_keys: 

373 

374Foreign Key Support 

375------------------- 

376 

377SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables, 

378however by default these constraints have no effect on the operation of the 

379table. 

380 

381Constraint checking on SQLite has three prerequisites: 

382 

383* At least version 3.6.19 of SQLite must be in use 

384* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY 

385 or SQLITE_OMIT_TRIGGER symbols enabled. 

386* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all 

387 connections before use -- including the initial call to 

388 :meth:`sqlalchemy.schema.MetaData.create_all`. 

389 

390SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for 

391new connections through the usage of events:: 

392 

393 from sqlalchemy.engine import Engine 

394 from sqlalchemy import event 

395 

396 

397 @event.listens_for(Engine, "connect") 

398 def set_sqlite_pragma(dbapi_connection, connection_record): 

399 cursor = dbapi_connection.cursor() 

400 cursor.execute("PRAGMA foreign_keys=ON") 

401 cursor.close() 

402 

403.. warning:: 

404 

405 When SQLite foreign keys are enabled, it is **not possible** 

406 to emit CREATE or DROP statements for tables that contain 

407 mutually-dependent foreign key constraints; 

408 to emit the DDL for these tables requires that ALTER TABLE be used to 

409 create or drop these constraints separately, for which SQLite has 

410 no support. 

411 

412.. seealso:: 

413 

414 `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_ 

415 - on the SQLite web site. 

416 

417 :ref:`event_toplevel` - SQLAlchemy event API. 

418 

419 :ref:`use_alter` - more information on SQLAlchemy's facilities for handling 

420 mutually-dependent foreign key constraints. 

421 

422.. _sqlite_on_conflict_ddl: 

423 

424ON CONFLICT support for constraints 

425----------------------------------- 

426 

427.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for 

428 SQLite, which occurs within a CREATE TABLE statement. For "ON CONFLICT" as 

429 applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`. 

430 

431SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied 

432to primary key, unique, check, and not null constraints. In DDL, it is 

433rendered either within the "CONSTRAINT" clause or within the column definition 

434itself depending on the location of the target constraint. To render this 

435clause within DDL, the extension parameter ``sqlite_on_conflict`` can be 

436specified with a string conflict resolution algorithm within the 

437:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`, 

438:class:`.CheckConstraint` objects. Within the :class:`_schema.Column` object, 

439there 

440are individual parameters ``sqlite_on_conflict_not_null``, 

441``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each 

442correspond to the three types of relevant constraint types that can be 

443indicated from a :class:`_schema.Column` object. 

444 

445.. seealso:: 

446 

447 `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite 

448 documentation 

449 

450.. versionadded:: 1.3 

451 

452 

453The ``sqlite_on_conflict`` parameters accept a string argument which is just 

454the resolution name to be chosen, which on SQLite can be one of ROLLBACK, 

455ABORT, FAIL, IGNORE, and REPLACE. For example, to add a UNIQUE constraint 

456that specifies the IGNORE algorithm:: 

457 

458 some_table = Table( 

459 "some_table", 

460 metadata, 

461 Column("id", Integer, primary_key=True), 

462 Column("data", Integer), 

463 UniqueConstraint("id", "data", sqlite_on_conflict="IGNORE"), 

464 ) 

465 

466The above renders CREATE TABLE DDL as: 

467 

468.. sourcecode:: sql 

469 

470 CREATE TABLE some_table ( 

471 id INTEGER NOT NULL, 

472 data INTEGER, 

473 PRIMARY KEY (id), 

474 UNIQUE (id, data) ON CONFLICT IGNORE 

475 ) 

476 

477 

478When using the :paramref:`_schema.Column.unique` 

479flag to add a UNIQUE constraint 

480to a single column, the ``sqlite_on_conflict_unique`` parameter can 

481be added to the :class:`_schema.Column` as well, which will be added to the 

482UNIQUE constraint in the DDL:: 

483 

484 some_table = Table( 

485 "some_table", 

486 metadata, 

487 Column("id", Integer, primary_key=True), 

488 Column( 

489 "data", Integer, unique=True, sqlite_on_conflict_unique="IGNORE" 

490 ), 

491 ) 

492 

493rendering: 

494 

495.. sourcecode:: sql 

496 

497 CREATE TABLE some_table ( 

498 id INTEGER NOT NULL, 

499 data INTEGER, 

500 PRIMARY KEY (id), 

501 UNIQUE (data) ON CONFLICT IGNORE 

502 ) 

503 

504To apply the FAIL algorithm for a NOT NULL constraint, 

505``sqlite_on_conflict_not_null`` is used:: 

506 

507 some_table = Table( 

508 "some_table", 

509 metadata, 

510 Column("id", Integer, primary_key=True), 

511 Column( 

512 "data", Integer, nullable=False, sqlite_on_conflict_not_null="FAIL" 

513 ), 

514 ) 

515 

516this renders the column inline ON CONFLICT phrase: 

517 

518.. sourcecode:: sql 

519 

520 CREATE TABLE some_table ( 

521 id INTEGER NOT NULL, 

522 data INTEGER NOT NULL ON CONFLICT FAIL, 

523 PRIMARY KEY (id) 

524 ) 

525 

526 

527Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``:: 

528 

529 some_table = Table( 

530 "some_table", 

531 metadata, 

532 Column( 

533 "id", 

534 Integer, 

535 primary_key=True, 

536 sqlite_on_conflict_primary_key="FAIL", 

537 ), 

538 ) 

539 

540SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict 

541resolution algorithm is applied to the constraint itself: 

542 

543.. sourcecode:: sql 

544 

545 CREATE TABLE some_table ( 

546 id INTEGER NOT NULL, 

547 PRIMARY KEY (id) ON CONFLICT FAIL 

548 ) 

549 

550.. _sqlite_on_conflict_insert: 

551 

552INSERT...ON CONFLICT (Upsert) 

553----------------------------- 

554 

555.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for 

556 SQLite, which occurs within an INSERT statement. For "ON CONFLICT" as 

557 applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`. 

558 

559From version 3.24.0 onwards, SQLite supports "upserts" (update or insert) 

560of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` 

561statement. A candidate row will only be inserted if that row does not violate 

562any unique or primary key constraints. In the case of a unique constraint violation, a 

563secondary action can occur which can be either "DO UPDATE", indicating that 

564the data in the target row should be updated, or "DO NOTHING", which indicates 

565to silently skip this row. 

566 

567Conflicts are determined using columns that are part of existing unique 

568constraints and indexes. These constraints are identified by stating the 

569columns and conditions that comprise the indexes. 

570 

571SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific 

572:func:`_sqlite.insert()` function, which provides 

573the generative methods :meth:`_sqlite.Insert.on_conflict_do_update` 

574and :meth:`_sqlite.Insert.on_conflict_do_nothing`: 

575 

576.. sourcecode:: pycon+sql 

577 

578 >>> from sqlalchemy.dialects.sqlite import insert 

579 

580 >>> insert_stmt = insert(my_table).values( 

581 ... id="some_existing_id", data="inserted value" 

582 ... ) 

583 

584 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

585 ... index_elements=["id"], set_=dict(data="updated value") 

586 ... ) 

587 

588 >>> print(do_update_stmt) 

589 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

590 ON CONFLICT (id) DO UPDATE SET data = ?{stop} 

591 

592 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"]) 

593 

594 >>> print(do_nothing_stmt) 

595 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

596 ON CONFLICT (id) DO NOTHING 

597 

598.. versionadded:: 1.4 

599 

600.. seealso:: 

601 

602 `Upsert 

603 <https://sqlite.org/lang_UPSERT.html>`_ 

604 - in the SQLite documentation. 

605 

606 

607Specifying the Target 

608^^^^^^^^^^^^^^^^^^^^^ 

609 

610Both methods supply the "target" of the conflict using column inference: 

611 

612* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument 

613 specifies a sequence containing string column names, :class:`_schema.Column` 

614 objects, and/or SQL expression elements, which would identify a unique index 

615 or unique constraint. 

616 

617* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` 

618 to infer an index, a partial index can be inferred by also specifying the 

619 :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter: 

620 

621 .. sourcecode:: pycon+sql 

622 

623 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data") 

624 

625 >>> do_update_stmt = stmt.on_conflict_do_update( 

626 ... index_elements=[my_table.c.user_email], 

627 ... index_where=my_table.c.user_email.like("%@gmail.com"), 

628 ... set_=dict(data=stmt.excluded.data), 

629 ... ) 

630 

631 >>> print(do_update_stmt) 

632 {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?) 

633 ON CONFLICT (user_email) 

634 WHERE user_email LIKE '%@gmail.com' 

635 DO UPDATE SET data = excluded.data 

636 

637The SET Clause 

638^^^^^^^^^^^^^^^ 

639 

640``ON CONFLICT...DO UPDATE`` is used to perform an update of the already 

641existing row, using any combination of new values as well as values 

642from the proposed insertion. These values are specified using the 

643:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter. This 

644parameter accepts a dictionary which consists of direct values 

645for UPDATE: 

646 

647.. sourcecode:: pycon+sql 

648 

649 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

650 

651 >>> do_update_stmt = stmt.on_conflict_do_update( 

652 ... index_elements=["id"], set_=dict(data="updated value") 

653 ... ) 

654 

655 >>> print(do_update_stmt) 

656 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

657 ON CONFLICT (id) DO UPDATE SET data = ? 

658 

659.. warning:: 

660 

661 The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take 

662 into account Python-side default UPDATE values or generation functions, 

663 e.g. those specified using :paramref:`_schema.Column.onupdate`. These 

664 values will not be exercised for an ON CONFLICT style of UPDATE, unless 

665 they are manually specified in the 

666 :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary. 

667 

668Updating using the Excluded INSERT Values 

669^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

670 

671In order to refer to the proposed insertion row, the special alias 

672:attr:`~.sqlite.Insert.excluded` is available as an attribute on 

673the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix 

674on a column, that informs the DO UPDATE to update the row with the value that 

675would have been inserted had the constraint not failed: 

676 

677.. sourcecode:: pycon+sql 

678 

679 >>> stmt = insert(my_table).values( 

680 ... id="some_id", data="inserted value", author="jlh" 

681 ... ) 

682 

683 >>> do_update_stmt = stmt.on_conflict_do_update( 

684 ... index_elements=["id"], 

685 ... set_=dict(data="updated value", author=stmt.excluded.author), 

686 ... ) 

687 

688 >>> print(do_update_stmt) 

689 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) 

690 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author 

691 

692Additional WHERE Criteria 

693^^^^^^^^^^^^^^^^^^^^^^^^^ 

694 

695The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts 

696a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where` 

697parameter, which will limit those rows which receive an UPDATE: 

698 

699.. sourcecode:: pycon+sql 

700 

701 >>> stmt = insert(my_table).values( 

702 ... id="some_id", data="inserted value", author="jlh" 

703 ... ) 

704 

705 >>> on_update_stmt = stmt.on_conflict_do_update( 

706 ... index_elements=["id"], 

707 ... set_=dict(data="updated value", author=stmt.excluded.author), 

708 ... where=(my_table.c.status == 2), 

709 ... ) 

710 >>> print(on_update_stmt) 

711 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) 

712 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author 

713 WHERE my_table.status = ? 

714 

715 

716Skipping Rows with DO NOTHING 

717^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

718 

719``ON CONFLICT`` may be used to skip inserting a row entirely 

720if any conflict with a unique constraint occurs; below this is illustrated 

721using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method: 

722 

723.. sourcecode:: pycon+sql 

724 

725 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

726 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) 

727 >>> print(stmt) 

728 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING 

729 

730 

731If ``DO NOTHING`` is used without specifying any columns or constraint, 

732it has the effect of skipping the INSERT for any unique violation which 

733occurs: 

734 

735.. sourcecode:: pycon+sql 

736 

737 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

738 >>> stmt = stmt.on_conflict_do_nothing() 

739 >>> print(stmt) 

740 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING 

741 

742.. _sqlite_type_reflection: 

743 

744Type Reflection 

745--------------- 

746 

747SQLite types are unlike those of most other database backends, in that 

748the string name of the type usually does not correspond to a "type" in a 

749one-to-one fashion. Instead, SQLite links per-column typing behavior 

750to one of five so-called "type affinities" based on a string matching 

751pattern for the type. 

752 

753SQLAlchemy's reflection process, when inspecting types, uses a simple 

754lookup table to link the keywords returned to provided SQLAlchemy types. 

755This lookup table is present within the SQLite dialect as it is for all 

756other dialects. However, the SQLite dialect has a different "fallback" 

757routine for when a particular type name is not located in the lookup map; 

758it instead implements the SQLite "type affinity" scheme located at 

759https://www.sqlite.org/datatype3.html section 2.1. 

760 

761The provided typemap will make direct associations from an exact string 

762name match for the following types: 

763 

764:class:`_types.BIGINT`, :class:`_types.BLOB`, 

765:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`, 

766:class:`_types.CHAR`, :class:`_types.DATE`, 

767:class:`_types.DATETIME`, :class:`_types.FLOAT`, 

768:class:`_types.DECIMAL`, :class:`_types.FLOAT`, 

769:class:`_types.INTEGER`, :class:`_types.INTEGER`, 

770:class:`_types.NUMERIC`, :class:`_types.REAL`, 

771:class:`_types.SMALLINT`, :class:`_types.TEXT`, 

772:class:`_types.TIME`, :class:`_types.TIMESTAMP`, 

773:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`, 

774:class:`_types.NCHAR` 

775 

776When a type name does not match one of the above types, the "type affinity" 

777lookup is used instead: 

778 

779* :class:`_types.INTEGER` is returned if the type name includes the 

780 string ``INT`` 

781* :class:`_types.TEXT` is returned if the type name includes the 

782 string ``CHAR``, ``CLOB`` or ``TEXT`` 

783* :class:`_types.NullType` is returned if the type name includes the 

784 string ``BLOB`` 

785* :class:`_types.REAL` is returned if the type name includes the string 

786 ``REAL``, ``FLOA`` or ``DOUB``. 

787* Otherwise, the :class:`_types.NUMERIC` type is used. 

788 

789.. _sqlite_partial_index: 

790 

791Partial Indexes 

792--------------- 

793 

794A partial index, e.g. one which uses a WHERE clause, can be specified 

795with the DDL system using the argument ``sqlite_where``:: 

796 

797 tbl = Table("testtbl", m, Column("data", Integer)) 

798 idx = Index( 

799 "test_idx1", 

800 tbl.c.data, 

801 sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10), 

802 ) 

803 

804The index will be rendered at create time as: 

805 

806.. sourcecode:: sql 

807 

808 CREATE INDEX test_idx1 ON testtbl (data) 

809 WHERE data > 5 AND data < 10 

810 

811.. _sqlite_dotted_column_names: 

812 

813Dotted Column Names 

814------------------- 

815 

816Using table or column names that explicitly have periods in them is 

817**not recommended**. While this is generally a bad idea for relational 

818databases in general, as the dot is a syntactically significant character, 

819the SQLite driver up until version **3.10.0** of SQLite has a bug which 

820requires that SQLAlchemy filter out these dots in result sets. 

821 

822The bug, entirely outside of SQLAlchemy, can be illustrated thusly:: 

823 

824 import sqlite3 

825 

826 assert sqlite3.sqlite_version_info < ( 

827 3, 

828 10, 

829 0, 

830 ), "bug is fixed in this version" 

831 

832 conn = sqlite3.connect(":memory:") 

833 cursor = conn.cursor() 

834 

835 cursor.execute("create table x (a integer, b integer)") 

836 cursor.execute("insert into x (a, b) values (1, 1)") 

837 cursor.execute("insert into x (a, b) values (2, 2)") 

838 

839 cursor.execute("select x.a, x.b from x") 

840 assert [c[0] for c in cursor.description] == ["a", "b"] 

841 

842 cursor.execute( 

843 """ 

844 select x.a, x.b from x where a=1 

845 union 

846 select x.a, x.b from x where a=2 

847 """ 

848 ) 

849 assert [c[0] for c in cursor.description] == ["a", "b"], [ 

850 c[0] for c in cursor.description 

851 ] 

852 

853The second assertion fails: 

854 

855.. sourcecode:: text 

856 

857 Traceback (most recent call last): 

858 File "test.py", line 19, in <module> 

859 [c[0] for c in cursor.description] 

860 AssertionError: ['x.a', 'x.b'] 

861 

862Where above, the driver incorrectly reports the names of the columns 

863including the name of the table, which is entirely inconsistent vs. 

864when the UNION is not present. 

865 

866SQLAlchemy relies upon column names being predictable in how they match 

867to the original statement, so the SQLAlchemy dialect has no choice but 

868to filter these out:: 

869 

870 

871 from sqlalchemy import create_engine 

872 

873 eng = create_engine("sqlite://") 

874 conn = eng.connect() 

875 

876 conn.exec_driver_sql("create table x (a integer, b integer)") 

877 conn.exec_driver_sql("insert into x (a, b) values (1, 1)") 

878 conn.exec_driver_sql("insert into x (a, b) values (2, 2)") 

879 

880 result = conn.exec_driver_sql("select x.a, x.b from x") 

881 assert result.keys() == ["a", "b"] 

882 

883 result = conn.exec_driver_sql( 

884 """ 

885 select x.a, x.b from x where a=1 

886 union 

887 select x.a, x.b from x where a=2 

888 """ 

889 ) 

890 assert result.keys() == ["a", "b"] 

891 

892Note that above, even though SQLAlchemy filters out the dots, *both 

893names are still addressable*:: 

894 

895 >>> row = result.first() 

896 >>> row["a"] 

897 1 

898 >>> row["x.a"] 

899 1 

900 >>> row["b"] 

901 1 

902 >>> row["x.b"] 

903 1 

904 

905Therefore, the workaround applied by SQLAlchemy only impacts 

906:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In 

907the very specific case where an application is forced to use column names that 

908contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and 

909:meth:`.Row.keys()` is required to return these dotted names unmodified, 

910the ``sqlite_raw_colnames`` execution option may be provided, either on a 

911per-:class:`_engine.Connection` basis:: 

912 

913 result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql( 

914 """ 

915 select x.a, x.b from x where a=1 

916 union 

917 select x.a, x.b from x where a=2 

918 """ 

919 ) 

920 assert result.keys() == ["x.a", "x.b"] 

921 

922or on a per-:class:`_engine.Engine` basis:: 

923 

924 engine = create_engine( 

925 "sqlite://", execution_options={"sqlite_raw_colnames": True} 

926 ) 

927 

928When using the per-:class:`_engine.Engine` execution option, note that 

929**Core and ORM queries that use UNION may not function properly**. 

930 

931SQLite-specific table options 

932----------------------------- 

933 

934One option for CREATE TABLE is supported directly by the SQLite 

935dialect in conjunction with the :class:`_schema.Table` construct: 

936 

937* ``WITHOUT ROWID``:: 

938 

939 Table("some_table", metadata, ..., sqlite_with_rowid=False) 

940 

941* 

942 ``STRICT``:: 

943 

944 Table("some_table", metadata, ..., sqlite_strict=True) 

945 

946 .. versionadded:: 2.0.37 

947 

948.. seealso:: 

949 

950 `SQLite CREATE TABLE options 

951 <https://www.sqlite.org/lang_createtable.html>`_ 

952 

953.. _sqlite_include_internal: 

954 

955Reflecting internal schema tables 

956---------------------------------- 

957 

958Reflection methods that return lists of tables will omit so-called 

959"SQLite internal schema object" names, which are considered by SQLite 

960as any object name that is prefixed with ``sqlite_``. An example of 

961such an object is the ``sqlite_sequence`` table that's generated when 

962the ``AUTOINCREMENT`` column parameter is used. In order to return 

963these objects, the parameter ``sqlite_include_internal=True`` may be 

964passed to methods such as :meth:`_schema.MetaData.reflect` or 

965:meth:`.Inspector.get_table_names`. 

966 

967.. versionadded:: 2.0 Added the ``sqlite_include_internal=True`` parameter. 

968 Previously, these tables were not ignored by SQLAlchemy reflection 

969 methods. 

970 

971.. note:: 

972 

973 The ``sqlite_include_internal`` parameter does not refer to the 

974 "system" tables that are present in schemas such as ``sqlite_master``. 

975 

976.. seealso:: 

977 

978 `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite 

979 documentation. 

980 

981''' # noqa 

982from __future__ import annotations 

983 

984import datetime 

985import numbers 

986import re 

987from typing import Optional 

988 

989from .json import JSON 

990from .json import JSONIndexType 

991from .json import JSONPathType 

992from ... import exc 

993from ... import schema as sa_schema 

994from ... import sql 

995from ... import text 

996from ... import types as sqltypes 

997from ... import util 

998from ...engine import default 

999from ...engine import processors 

1000from ...engine import reflection 

1001from ...engine.reflection import ReflectionDefaults 

1002from ...sql import coercions 

1003from ...sql import compiler 

1004from ...sql import elements 

1005from ...sql import roles 

1006from ...sql import schema 

1007from ...types import BLOB # noqa 

1008from ...types import BOOLEAN # noqa 

1009from ...types import CHAR # noqa 

1010from ...types import DECIMAL # noqa 

1011from ...types import FLOAT # noqa 

1012from ...types import INTEGER # noqa 

1013from ...types import NUMERIC # noqa 

1014from ...types import REAL # noqa 

1015from ...types import SMALLINT # noqa 

1016from ...types import TEXT # noqa 

1017from ...types import TIMESTAMP # noqa 

1018from ...types import VARCHAR # noqa 

1019 

1020 

1021class _SQliteJson(JSON): 

1022 def result_processor(self, dialect, coltype): 

1023 default_processor = super().result_processor(dialect, coltype) 

1024 

1025 def process(value): 

1026 try: 

1027 return default_processor(value) 

1028 except TypeError: 

1029 if isinstance(value, numbers.Number): 

1030 return value 

1031 else: 

1032 raise 

1033 

1034 return process 

1035 

1036 

1037class _DateTimeMixin: 

1038 _reg = None 

1039 _storage_format = None 

1040 

1041 def __init__(self, storage_format=None, regexp=None, **kw): 

1042 super().__init__(**kw) 

1043 if regexp is not None: 

1044 self._reg = re.compile(regexp) 

1045 if storage_format is not None: 

1046 self._storage_format = storage_format 

1047 

1048 @property 

1049 def format_is_text_affinity(self): 

1050 """return True if the storage format will automatically imply 

1051 a TEXT affinity. 

1052 

1053 If the storage format contains no non-numeric characters, 

1054 it will imply a NUMERIC storage format on SQLite; in this case, 

1055 the type will generate its DDL as DATE_CHAR, DATETIME_CHAR, 

1056 TIME_CHAR. 

1057 

1058 """ 

1059 spec = self._storage_format % { 

1060 "year": 0, 

1061 "month": 0, 

1062 "day": 0, 

1063 "hour": 0, 

1064 "minute": 0, 

1065 "second": 0, 

1066 "microsecond": 0, 

1067 } 

1068 return bool(re.search(r"[^0-9]", spec)) 

1069 

1070 def adapt(self, cls, **kw): 

1071 if issubclass(cls, _DateTimeMixin): 

1072 if self._storage_format: 

1073 kw["storage_format"] = self._storage_format 

1074 if self._reg: 

1075 kw["regexp"] = self._reg 

1076 return super().adapt(cls, **kw) 

1077 

1078 def literal_processor(self, dialect): 

1079 bp = self.bind_processor(dialect) 

1080 

1081 def process(value): 

1082 return "'%s'" % bp(value) 

1083 

1084 return process 

1085 

1086 

1087class DATETIME(_DateTimeMixin, sqltypes.DateTime): 

1088 r"""Represent a Python datetime object in SQLite using a string. 

1089 

1090 The default string storage format is:: 

1091 

1092 "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1093 

1094 e.g.: 

1095 

1096 .. sourcecode:: text 

1097 

1098 2021-03-15 12:05:57.105542 

1099 

1100 The incoming storage format is by default parsed using the 

1101 Python ``datetime.fromisoformat()`` function. 

1102 

1103 .. versionchanged:: 2.0 ``datetime.fromisoformat()`` is used for default 

1104 datetime string parsing. 

1105 

1106 The storage format can be customized to some degree using the 

1107 ``storage_format`` and ``regexp`` parameters, such as:: 

1108 

1109 import re 

1110 from sqlalchemy.dialects.sqlite import DATETIME 

1111 

1112 dt = DATETIME( 

1113 storage_format=( 

1114 "%(year)04d/%(month)02d/%(day)02d %(hour)02d:%(minute)02d:%(second)02d" 

1115 ), 

1116 regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)", 

1117 ) 

1118 

1119 :param truncate_microseconds: when ``True`` microseconds will be truncated 

1120 from the datetime. Can't be specified together with ``storage_format`` 

1121 or ``regexp``. 

1122 

1123 :param storage_format: format string which will be applied to the dict 

1124 with keys year, month, day, hour, minute, second, and microsecond. 

1125 

1126 :param regexp: regular expression which will be applied to incoming result 

1127 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming 

1128 strings. If the regexp contains named groups, the resulting match dict is 

1129 applied to the Python datetime() constructor as keyword arguments. 

1130 Otherwise, if positional groups are used, the datetime() constructor 

1131 is called with positional arguments via 

1132 ``*map(int, match_obj.groups(0))``. 

1133 

1134 """ # noqa 

1135 

1136 _storage_format = ( 

1137 "%(year)04d-%(month)02d-%(day)02d " 

1138 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1139 ) 

1140 

1141 def __init__(self, *args, **kwargs): 

1142 truncate_microseconds = kwargs.pop("truncate_microseconds", False) 

1143 super().__init__(*args, **kwargs) 

1144 if truncate_microseconds: 

1145 assert "storage_format" not in kwargs, ( 

1146 "You can specify only " 

1147 "one of truncate_microseconds or storage_format." 

1148 ) 

1149 assert "regexp" not in kwargs, ( 

1150 "You can specify only one of " 

1151 "truncate_microseconds or regexp." 

1152 ) 

1153 self._storage_format = ( 

1154 "%(year)04d-%(month)02d-%(day)02d " 

1155 "%(hour)02d:%(minute)02d:%(second)02d" 

1156 ) 

1157 

1158 def bind_processor(self, dialect): 

1159 datetime_datetime = datetime.datetime 

1160 datetime_date = datetime.date 

1161 format_ = self._storage_format 

1162 

1163 def process(value): 

1164 if value is None: 

1165 return None 

1166 elif isinstance(value, datetime_datetime): 

1167 return format_ % { 

1168 "year": value.year, 

1169 "month": value.month, 

1170 "day": value.day, 

1171 "hour": value.hour, 

1172 "minute": value.minute, 

1173 "second": value.second, 

1174 "microsecond": value.microsecond, 

1175 } 

1176 elif isinstance(value, datetime_date): 

1177 return format_ % { 

1178 "year": value.year, 

1179 "month": value.month, 

1180 "day": value.day, 

1181 "hour": 0, 

1182 "minute": 0, 

1183 "second": 0, 

1184 "microsecond": 0, 

1185 } 

1186 else: 

1187 raise TypeError( 

1188 "SQLite DateTime type only accepts Python " 

1189 "datetime and date objects as input." 

1190 ) 

1191 

1192 return process 

1193 

1194 def result_processor(self, dialect, coltype): 

1195 if self._reg: 

1196 return processors.str_to_datetime_processor_factory( 

1197 self._reg, datetime.datetime 

1198 ) 

1199 else: 

1200 return processors.str_to_datetime 

1201 

1202 

1203class DATE(_DateTimeMixin, sqltypes.Date): 

1204 r"""Represent a Python date object in SQLite using a string. 

1205 

1206 The default string storage format is:: 

1207 

1208 "%(year)04d-%(month)02d-%(day)02d" 

1209 

1210 e.g.: 

1211 

1212 .. sourcecode:: text 

1213 

1214 2011-03-15 

1215 

1216 The incoming storage format is by default parsed using the 

1217 Python ``date.fromisoformat()`` function. 

1218 

1219 .. versionchanged:: 2.0 ``date.fromisoformat()`` is used for default 

1220 date string parsing. 

1221 

1222 

1223 The storage format can be customized to some degree using the 

1224 ``storage_format`` and ``regexp`` parameters, such as:: 

1225 

1226 import re 

1227 from sqlalchemy.dialects.sqlite import DATE 

1228 

1229 d = DATE( 

1230 storage_format="%(month)02d/%(day)02d/%(year)04d", 

1231 regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)"), 

1232 ) 

1233 

1234 :param storage_format: format string which will be applied to the 

1235 dict with keys year, month, and day. 

1236 

1237 :param regexp: regular expression which will be applied to 

1238 incoming result rows, replacing the use of ``date.fromisoformat()`` to 

1239 parse incoming strings. If the regexp contains named groups, the resulting 

1240 match dict is applied to the Python date() constructor as keyword 

1241 arguments. Otherwise, if positional groups are used, the date() 

1242 constructor is called with positional arguments via 

1243 ``*map(int, match_obj.groups(0))``. 

1244 

1245 """ 

1246 

1247 _storage_format = "%(year)04d-%(month)02d-%(day)02d" 

1248 

1249 def bind_processor(self, dialect): 

1250 datetime_date = datetime.date 

1251 format_ = self._storage_format 

1252 

1253 def process(value): 

1254 if value is None: 

1255 return None 

1256 elif isinstance(value, datetime_date): 

1257 return format_ % { 

1258 "year": value.year, 

1259 "month": value.month, 

1260 "day": value.day, 

1261 } 

1262 else: 

1263 raise TypeError( 

1264 "SQLite Date type only accepts Python " 

1265 "date objects as input." 

1266 ) 

1267 

1268 return process 

1269 

1270 def result_processor(self, dialect, coltype): 

1271 if self._reg: 

1272 return processors.str_to_datetime_processor_factory( 

1273 self._reg, datetime.date 

1274 ) 

1275 else: 

1276 return processors.str_to_date 

1277 

1278 

1279class TIME(_DateTimeMixin, sqltypes.Time): 

1280 r"""Represent a Python time object in SQLite using a string. 

1281 

1282 The default string storage format is:: 

1283 

1284 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1285 

1286 e.g.: 

1287 

1288 .. sourcecode:: text 

1289 

1290 12:05:57.10558 

1291 

1292 The incoming storage format is by default parsed using the 

1293 Python ``time.fromisoformat()`` function. 

1294 

1295 .. versionchanged:: 2.0 ``time.fromisoformat()`` is used for default 

1296 time string parsing. 

1297 

1298 The storage format can be customized to some degree using the 

1299 ``storage_format`` and ``regexp`` parameters, such as:: 

1300 

1301 import re 

1302 from sqlalchemy.dialects.sqlite import TIME 

1303 

1304 t = TIME( 

1305 storage_format="%(hour)02d-%(minute)02d-%(second)02d-%(microsecond)06d", 

1306 regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?"), 

1307 ) 

1308 

1309 :param truncate_microseconds: when ``True`` microseconds will be truncated 

1310 from the time. Can't be specified together with ``storage_format`` 

1311 or ``regexp``. 

1312 

1313 :param storage_format: format string which will be applied to the dict 

1314 with keys hour, minute, second, and microsecond. 

1315 

1316 :param regexp: regular expression which will be applied to incoming result 

1317 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming 

1318 strings. If the regexp contains named groups, the resulting match dict is 

1319 applied to the Python time() constructor as keyword arguments. Otherwise, 

1320 if positional groups are used, the time() constructor is called with 

1321 positional arguments via ``*map(int, match_obj.groups(0))``. 

1322 

1323 """ 

1324 

1325 _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1326 

1327 def __init__(self, *args, **kwargs): 

1328 truncate_microseconds = kwargs.pop("truncate_microseconds", False) 

1329 super().__init__(*args, **kwargs) 

1330 if truncate_microseconds: 

1331 assert "storage_format" not in kwargs, ( 

1332 "You can specify only " 

1333 "one of truncate_microseconds or storage_format." 

1334 ) 

1335 assert "regexp" not in kwargs, ( 

1336 "You can specify only one of " 

1337 "truncate_microseconds or regexp." 

1338 ) 

1339 self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d" 

1340 

1341 def bind_processor(self, dialect): 

1342 datetime_time = datetime.time 

1343 format_ = self._storage_format 

1344 

1345 def process(value): 

1346 if value is None: 

1347 return None 

1348 elif isinstance(value, datetime_time): 

1349 return format_ % { 

1350 "hour": value.hour, 

1351 "minute": value.minute, 

1352 "second": value.second, 

1353 "microsecond": value.microsecond, 

1354 } 

1355 else: 

1356 raise TypeError( 

1357 "SQLite Time type only accepts Python " 

1358 "time objects as input." 

1359 ) 

1360 

1361 return process 

1362 

1363 def result_processor(self, dialect, coltype): 

1364 if self._reg: 

1365 return processors.str_to_datetime_processor_factory( 

1366 self._reg, datetime.time 

1367 ) 

1368 else: 

1369 return processors.str_to_time 

1370 

1371 

1372colspecs = { 

1373 sqltypes.Date: DATE, 

1374 sqltypes.DateTime: DATETIME, 

1375 sqltypes.JSON: _SQliteJson, 

1376 sqltypes.JSON.JSONIndexType: JSONIndexType, 

1377 sqltypes.JSON.JSONPathType: JSONPathType, 

1378 sqltypes.Time: TIME, 

1379} 

1380 

1381ischema_names = { 

1382 "BIGINT": sqltypes.BIGINT, 

1383 "BLOB": sqltypes.BLOB, 

1384 "BOOL": sqltypes.BOOLEAN, 

1385 "BOOLEAN": sqltypes.BOOLEAN, 

1386 "CHAR": sqltypes.CHAR, 

1387 "DATE": sqltypes.DATE, 

1388 "DATE_CHAR": sqltypes.DATE, 

1389 "DATETIME": sqltypes.DATETIME, 

1390 "DATETIME_CHAR": sqltypes.DATETIME, 

1391 "DOUBLE": sqltypes.DOUBLE, 

1392 "DECIMAL": sqltypes.DECIMAL, 

1393 "FLOAT": sqltypes.FLOAT, 

1394 "INT": sqltypes.INTEGER, 

1395 "INTEGER": sqltypes.INTEGER, 

1396 "JSON": JSON, 

1397 "NUMERIC": sqltypes.NUMERIC, 

1398 "REAL": sqltypes.REAL, 

1399 "SMALLINT": sqltypes.SMALLINT, 

1400 "TEXT": sqltypes.TEXT, 

1401 "TIME": sqltypes.TIME, 

1402 "TIME_CHAR": sqltypes.TIME, 

1403 "TIMESTAMP": sqltypes.TIMESTAMP, 

1404 "VARCHAR": sqltypes.VARCHAR, 

1405 "NVARCHAR": sqltypes.NVARCHAR, 

1406 "NCHAR": sqltypes.NCHAR, 

1407} 

1408 

1409 

1410class SQLiteCompiler(compiler.SQLCompiler): 

1411 extract_map = util.update_copy( 

1412 compiler.SQLCompiler.extract_map, 

1413 { 

1414 "month": "%m", 

1415 "day": "%d", 

1416 "year": "%Y", 

1417 "second": "%S", 

1418 "hour": "%H", 

1419 "doy": "%j", 

1420 "minute": "%M", 

1421 "epoch": "%s", 

1422 "dow": "%w", 

1423 "week": "%W", 

1424 }, 

1425 ) 

1426 

1427 def visit_truediv_binary(self, binary, operator, **kw): 

1428 return ( 

1429 self.process(binary.left, **kw) 

1430 + " / " 

1431 + "(%s + 0.0)" % self.process(binary.right, **kw) 

1432 ) 

1433 

1434 def visit_now_func(self, fn, **kw): 

1435 return "CURRENT_TIMESTAMP" 

1436 

1437 def visit_localtimestamp_func(self, func, **kw): 

1438 return "DATETIME(CURRENT_TIMESTAMP, 'localtime')" 

1439 

1440 def visit_true(self, expr, **kw): 

1441 return "1" 

1442 

1443 def visit_false(self, expr, **kw): 

1444 return "0" 

1445 

1446 def visit_char_length_func(self, fn, **kw): 

1447 return "length%s" % self.function_argspec(fn) 

1448 

1449 def visit_aggregate_strings_func(self, fn, **kw): 

1450 return "group_concat%s" % self.function_argspec(fn) 

1451 

1452 def visit_cast(self, cast, **kwargs): 

1453 if self.dialect.supports_cast: 

1454 return super().visit_cast(cast, **kwargs) 

1455 else: 

1456 return self.process(cast.clause, **kwargs) 

1457 

1458 def visit_extract(self, extract, **kw): 

1459 try: 

1460 return "CAST(STRFTIME('%s', %s) AS INTEGER)" % ( 

1461 self.extract_map[extract.field], 

1462 self.process(extract.expr, **kw), 

1463 ) 

1464 except KeyError as err: 

1465 raise exc.CompileError( 

1466 "%s is not a valid extract argument." % extract.field 

1467 ) from err 

1468 

1469 def returning_clause( 

1470 self, 

1471 stmt, 

1472 returning_cols, 

1473 *, 

1474 populate_result_map, 

1475 **kw, 

1476 ): 

1477 kw["include_table"] = False 

1478 return super().returning_clause( 

1479 stmt, returning_cols, populate_result_map=populate_result_map, **kw 

1480 ) 

1481 

1482 def limit_clause(self, select, **kw): 

1483 text = "" 

1484 if select._limit_clause is not None: 

1485 text += "\n LIMIT " + self.process(select._limit_clause, **kw) 

1486 if select._offset_clause is not None: 

1487 if select._limit_clause is None: 

1488 text += "\n LIMIT " + self.process(sql.literal(-1)) 

1489 text += " OFFSET " + self.process(select._offset_clause, **kw) 

1490 else: 

1491 text += " OFFSET " + self.process(sql.literal(0), **kw) 

1492 return text 

1493 

1494 def for_update_clause(self, select, **kw): 

1495 # sqlite has no "FOR UPDATE" AFAICT 

1496 return "" 

1497 

1498 def update_from_clause( 

1499 self, update_stmt, from_table, extra_froms, from_hints, **kw 

1500 ): 

1501 kw["asfrom"] = True 

1502 return "FROM " + ", ".join( 

1503 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

1504 for t in extra_froms 

1505 ) 

1506 

1507 def visit_is_distinct_from_binary(self, binary, operator, **kw): 

1508 return "%s IS NOT %s" % ( 

1509 self.process(binary.left), 

1510 self.process(binary.right), 

1511 ) 

1512 

1513 def visit_is_not_distinct_from_binary(self, binary, operator, **kw): 

1514 return "%s IS %s" % ( 

1515 self.process(binary.left), 

1516 self.process(binary.right), 

1517 ) 

1518 

1519 def visit_json_getitem_op_binary(self, binary, operator, **kw): 

1520 if binary.type._type_affinity is sqltypes.JSON: 

1521 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" 

1522 else: 

1523 expr = "JSON_EXTRACT(%s, %s)" 

1524 

1525 return expr % ( 

1526 self.process(binary.left, **kw), 

1527 self.process(binary.right, **kw), 

1528 ) 

1529 

1530 def visit_json_path_getitem_op_binary(self, binary, operator, **kw): 

1531 if binary.type._type_affinity is sqltypes.JSON: 

1532 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" 

1533 else: 

1534 expr = "JSON_EXTRACT(%s, %s)" 

1535 

1536 return expr % ( 

1537 self.process(binary.left, **kw), 

1538 self.process(binary.right, **kw), 

1539 ) 

1540 

1541 def visit_empty_set_op_expr(self, type_, expand_op, **kw): 

1542 # slightly old SQLite versions don't seem to be able to handle 

1543 # the empty set impl 

1544 return self.visit_empty_set_expr(type_) 

1545 

1546 def visit_empty_set_expr(self, element_types, **kw): 

1547 return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % ( 

1548 ", ".join("1" for type_ in element_types or [INTEGER()]), 

1549 ", ".join("1" for type_ in element_types or [INTEGER()]), 

1550 ) 

1551 

1552 def visit_regexp_match_op_binary(self, binary, operator, **kw): 

1553 return self._generate_generic_binary(binary, " REGEXP ", **kw) 

1554 

1555 def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 

1556 return self._generate_generic_binary(binary, " NOT REGEXP ", **kw) 

1557 

1558 def _on_conflict_target(self, clause, **kw): 

1559 if clause.inferred_target_elements is not None: 

1560 target_text = "(%s)" % ", ".join( 

1561 ( 

1562 self.preparer.quote(c) 

1563 if isinstance(c, str) 

1564 else self.process(c, include_table=False, use_schema=False) 

1565 ) 

1566 for c in clause.inferred_target_elements 

1567 ) 

1568 if clause.inferred_target_whereclause is not None: 

1569 target_text += " WHERE %s" % self.process( 

1570 clause.inferred_target_whereclause, 

1571 include_table=False, 

1572 use_schema=False, 

1573 literal_execute=True, 

1574 ) 

1575 

1576 else: 

1577 target_text = "" 

1578 

1579 return target_text 

1580 

1581 def visit_on_conflict_do_nothing(self, on_conflict, **kw): 

1582 target_text = self._on_conflict_target(on_conflict, **kw) 

1583 

1584 if target_text: 

1585 return "ON CONFLICT %s DO NOTHING" % target_text 

1586 else: 

1587 return "ON CONFLICT DO NOTHING" 

1588 

1589 def visit_on_conflict_do_update(self, on_conflict, **kw): 

1590 clause = on_conflict 

1591 

1592 target_text = self._on_conflict_target(on_conflict, **kw) 

1593 

1594 action_set_ops = [] 

1595 

1596 set_parameters = dict(clause.update_values_to_set) 

1597 # create a list of column assignment clauses as tuples 

1598 

1599 insert_statement = self.stack[-1]["selectable"] 

1600 cols = insert_statement.table.c 

1601 for c in cols: 

1602 col_key = c.key 

1603 

1604 if col_key in set_parameters: 

1605 value = set_parameters.pop(col_key) 

1606 elif c in set_parameters: 

1607 value = set_parameters.pop(c) 

1608 else: 

1609 continue 

1610 

1611 if coercions._is_literal(value): 

1612 value = elements.BindParameter(None, value, type_=c.type) 

1613 

1614 else: 

1615 if ( 

1616 isinstance(value, elements.BindParameter) 

1617 and value.type._isnull 

1618 ): 

1619 value = value._clone() 

1620 value.type = c.type 

1621 value_text = self.process(value.self_group(), use_schema=False) 

1622 

1623 key_text = self.preparer.quote(c.name) 

1624 action_set_ops.append("%s = %s" % (key_text, value_text)) 

1625 

1626 # check for names that don't match columns 

1627 if set_parameters: 

1628 util.warn( 

1629 "Additional column names not matching " 

1630 "any column keys in table '%s': %s" 

1631 % ( 

1632 self.current_executable.table.name, 

1633 (", ".join("'%s'" % c for c in set_parameters)), 

1634 ) 

1635 ) 

1636 for k, v in set_parameters.items(): 

1637 key_text = ( 

1638 self.preparer.quote(k) 

1639 if isinstance(k, str) 

1640 else self.process(k, use_schema=False) 

1641 ) 

1642 value_text = self.process( 

1643 coercions.expect(roles.ExpressionElementRole, v), 

1644 use_schema=False, 

1645 ) 

1646 action_set_ops.append("%s = %s" % (key_text, value_text)) 

1647 

1648 action_text = ", ".join(action_set_ops) 

1649 if clause.update_whereclause is not None: 

1650 action_text += " WHERE %s" % self.process( 

1651 clause.update_whereclause, include_table=True, use_schema=False 

1652 ) 

1653 

1654 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) 

1655 

1656 def visit_bitwise_xor_op_binary(self, binary, operator, **kw): 

1657 # sqlite has no xor. Use "a XOR b" = "(a | b) - (a & b)". 

1658 kw["eager_grouping"] = True 

1659 or_ = self._generate_generic_binary(binary, " | ", **kw) 

1660 and_ = self._generate_generic_binary(binary, " & ", **kw) 

1661 return f"({or_} - {and_})" 

1662 

1663 

1664class SQLiteDDLCompiler(compiler.DDLCompiler): 

1665 def get_column_specification(self, column, **kwargs): 

1666 coltype = self.dialect.type_compiler_instance.process( 

1667 column.type, type_expression=column 

1668 ) 

1669 colspec = self.preparer.format_column(column) + " " + coltype 

1670 default = self.get_column_default_string(column) 

1671 if default is not None: 

1672 

1673 if not re.match(r"""^\s*[\'\"\(]""", default) and re.match( 

1674 r".*\W.*", default 

1675 ): 

1676 colspec += f" DEFAULT ({default})" 

1677 else: 

1678 colspec += f" DEFAULT {default}" 

1679 

1680 if not column.nullable: 

1681 colspec += " NOT NULL" 

1682 

1683 on_conflict_clause = column.dialect_options["sqlite"][ 

1684 "on_conflict_not_null" 

1685 ] 

1686 if on_conflict_clause is not None: 

1687 colspec += " ON CONFLICT " + on_conflict_clause 

1688 

1689 if column.primary_key: 

1690 if ( 

1691 column.autoincrement is True 

1692 and len(column.table.primary_key.columns) != 1 

1693 ): 

1694 raise exc.CompileError( 

1695 "SQLite does not support autoincrement for " 

1696 "composite primary keys" 

1697 ) 

1698 

1699 if ( 

1700 column.table.dialect_options["sqlite"]["autoincrement"] 

1701 and len(column.table.primary_key.columns) == 1 

1702 and issubclass(column.type._type_affinity, sqltypes.Integer) 

1703 and not column.foreign_keys 

1704 ): 

1705 colspec += " PRIMARY KEY" 

1706 

1707 on_conflict_clause = column.dialect_options["sqlite"][ 

1708 "on_conflict_primary_key" 

1709 ] 

1710 if on_conflict_clause is not None: 

1711 colspec += " ON CONFLICT " + on_conflict_clause 

1712 

1713 colspec += " AUTOINCREMENT" 

1714 

1715 if column.computed is not None: 

1716 colspec += " " + self.process(column.computed) 

1717 

1718 return colspec 

1719 

1720 def visit_primary_key_constraint(self, constraint, **kw): 

1721 # for columns with sqlite_autoincrement=True, 

1722 # the PRIMARY KEY constraint can only be inline 

1723 # with the column itself. 

1724 if len(constraint.columns) == 1: 

1725 c = list(constraint)[0] 

1726 if ( 

1727 c.primary_key 

1728 and c.table.dialect_options["sqlite"]["autoincrement"] 

1729 and issubclass(c.type._type_affinity, sqltypes.Integer) 

1730 and not c.foreign_keys 

1731 ): 

1732 return None 

1733 

1734 text = super().visit_primary_key_constraint(constraint) 

1735 

1736 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1737 "on_conflict" 

1738 ] 

1739 if on_conflict_clause is None and len(constraint.columns) == 1: 

1740 on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][ 

1741 "on_conflict_primary_key" 

1742 ] 

1743 

1744 if on_conflict_clause is not None: 

1745 text += " ON CONFLICT " + on_conflict_clause 

1746 

1747 return text 

1748 

1749 def visit_unique_constraint(self, constraint, **kw): 

1750 text = super().visit_unique_constraint(constraint) 

1751 

1752 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1753 "on_conflict" 

1754 ] 

1755 if on_conflict_clause is None and len(constraint.columns) == 1: 

1756 col1 = list(constraint)[0] 

1757 if isinstance(col1, schema.SchemaItem): 

1758 on_conflict_clause = list(constraint)[0].dialect_options[ 

1759 "sqlite" 

1760 ]["on_conflict_unique"] 

1761 

1762 if on_conflict_clause is not None: 

1763 text += " ON CONFLICT " + on_conflict_clause 

1764 

1765 return text 

1766 

1767 def visit_check_constraint(self, constraint, **kw): 

1768 text = super().visit_check_constraint(constraint) 

1769 

1770 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1771 "on_conflict" 

1772 ] 

1773 

1774 if on_conflict_clause is not None: 

1775 text += " ON CONFLICT " + on_conflict_clause 

1776 

1777 return text 

1778 

1779 def visit_column_check_constraint(self, constraint, **kw): 

1780 text = super().visit_column_check_constraint(constraint) 

1781 

1782 if constraint.dialect_options["sqlite"]["on_conflict"] is not None: 

1783 raise exc.CompileError( 

1784 "SQLite does not support on conflict clause for " 

1785 "column check constraint" 

1786 ) 

1787 

1788 return text 

1789 

1790 def visit_foreign_key_constraint(self, constraint, **kw): 

1791 local_table = constraint.elements[0].parent.table 

1792 remote_table = constraint.elements[0].column.table 

1793 

1794 if local_table.schema != remote_table.schema: 

1795 return None 

1796 else: 

1797 return super().visit_foreign_key_constraint(constraint) 

1798 

1799 def define_constraint_remote_table(self, constraint, table, preparer): 

1800 """Format the remote table clause of a CREATE CONSTRAINT clause.""" 

1801 

1802 return preparer.format_table(table, use_schema=False) 

1803 

1804 def visit_create_index( 

1805 self, create, include_schema=False, include_table_schema=True, **kw 

1806 ): 

1807 index = create.element 

1808 self._verify_index_table(index) 

1809 preparer = self.preparer 

1810 text = "CREATE " 

1811 if index.unique: 

1812 text += "UNIQUE " 

1813 

1814 text += "INDEX " 

1815 

1816 if create.if_not_exists: 

1817 text += "IF NOT EXISTS " 

1818 

1819 text += "%s ON %s (%s)" % ( 

1820 self._prepared_index_name(index, include_schema=True), 

1821 preparer.format_table(index.table, use_schema=False), 

1822 ", ".join( 

1823 self.sql_compiler.process( 

1824 expr, include_table=False, literal_binds=True 

1825 ) 

1826 for expr in index.expressions 

1827 ), 

1828 ) 

1829 

1830 whereclause = index.dialect_options["sqlite"]["where"] 

1831 if whereclause is not None: 

1832 where_compiled = self.sql_compiler.process( 

1833 whereclause, include_table=False, literal_binds=True 

1834 ) 

1835 text += " WHERE " + where_compiled 

1836 

1837 return text 

1838 

1839 def post_create_table(self, table): 

1840 table_options = [] 

1841 

1842 if not table.dialect_options["sqlite"]["with_rowid"]: 

1843 table_options.append("WITHOUT ROWID") 

1844 

1845 if table.dialect_options["sqlite"]["strict"]: 

1846 table_options.append("STRICT") 

1847 

1848 if table_options: 

1849 return "\n " + ",\n ".join(table_options) 

1850 else: 

1851 return "" 

1852 

1853 

1854class SQLiteTypeCompiler(compiler.GenericTypeCompiler): 

1855 def visit_large_binary(self, type_, **kw): 

1856 return self.visit_BLOB(type_) 

1857 

1858 def visit_DATETIME(self, type_, **kw): 

1859 if ( 

1860 not isinstance(type_, _DateTimeMixin) 

1861 or type_.format_is_text_affinity 

1862 ): 

1863 return super().visit_DATETIME(type_) 

1864 else: 

1865 return "DATETIME_CHAR" 

1866 

1867 def visit_DATE(self, type_, **kw): 

1868 if ( 

1869 not isinstance(type_, _DateTimeMixin) 

1870 or type_.format_is_text_affinity 

1871 ): 

1872 return super().visit_DATE(type_) 

1873 else: 

1874 return "DATE_CHAR" 

1875 

1876 def visit_TIME(self, type_, **kw): 

1877 if ( 

1878 not isinstance(type_, _DateTimeMixin) 

1879 or type_.format_is_text_affinity 

1880 ): 

1881 return super().visit_TIME(type_) 

1882 else: 

1883 return "TIME_CHAR" 

1884 

1885 def visit_JSON(self, type_, **kw): 

1886 # note this name provides NUMERIC affinity, not TEXT. 

1887 # should not be an issue unless the JSON value consists of a single 

1888 # numeric value. JSONTEXT can be used if this case is required. 

1889 return "JSON" 

1890 

1891 

1892class SQLiteIdentifierPreparer(compiler.IdentifierPreparer): 

1893 reserved_words = { 

1894 "add", 

1895 "after", 

1896 "all", 

1897 "alter", 

1898 "analyze", 

1899 "and", 

1900 "as", 

1901 "asc", 

1902 "attach", 

1903 "autoincrement", 

1904 "before", 

1905 "begin", 

1906 "between", 

1907 "by", 

1908 "cascade", 

1909 "case", 

1910 "cast", 

1911 "check", 

1912 "collate", 

1913 "column", 

1914 "commit", 

1915 "conflict", 

1916 "constraint", 

1917 "create", 

1918 "cross", 

1919 "current_date", 

1920 "current_time", 

1921 "current_timestamp", 

1922 "database", 

1923 "default", 

1924 "deferrable", 

1925 "deferred", 

1926 "delete", 

1927 "desc", 

1928 "detach", 

1929 "distinct", 

1930 "drop", 

1931 "each", 

1932 "else", 

1933 "end", 

1934 "escape", 

1935 "except", 

1936 "exclusive", 

1937 "exists", 

1938 "explain", 

1939 "false", 

1940 "fail", 

1941 "for", 

1942 "foreign", 

1943 "from", 

1944 "full", 

1945 "glob", 

1946 "group", 

1947 "having", 

1948 "if", 

1949 "ignore", 

1950 "immediate", 

1951 "in", 

1952 "index", 

1953 "indexed", 

1954 "initially", 

1955 "inner", 

1956 "insert", 

1957 "instead", 

1958 "intersect", 

1959 "into", 

1960 "is", 

1961 "isnull", 

1962 "join", 

1963 "key", 

1964 "left", 

1965 "like", 

1966 "limit", 

1967 "match", 

1968 "natural", 

1969 "not", 

1970 "notnull", 

1971 "null", 

1972 "of", 

1973 "offset", 

1974 "on", 

1975 "or", 

1976 "order", 

1977 "outer", 

1978 "plan", 

1979 "pragma", 

1980 "primary", 

1981 "query", 

1982 "raise", 

1983 "references", 

1984 "reindex", 

1985 "rename", 

1986 "replace", 

1987 "restrict", 

1988 "right", 

1989 "rollback", 

1990 "row", 

1991 "select", 

1992 "set", 

1993 "table", 

1994 "temp", 

1995 "temporary", 

1996 "then", 

1997 "to", 

1998 "transaction", 

1999 "trigger", 

2000 "true", 

2001 "union", 

2002 "unique", 

2003 "update", 

2004 "using", 

2005 "vacuum", 

2006 "values", 

2007 "view", 

2008 "virtual", 

2009 "when", 

2010 "where", 

2011 } 

2012 

2013 

2014class SQLiteExecutionContext(default.DefaultExecutionContext): 

2015 @util.memoized_property 

2016 def _preserve_raw_colnames(self): 

2017 return ( 

2018 not self.dialect._broken_dotted_colnames 

2019 or self.execution_options.get("sqlite_raw_colnames", False) 

2020 ) 

2021 

2022 def _translate_colname(self, colname): 

2023 # TODO: detect SQLite version 3.10.0 or greater; 

2024 # see [ticket:3633] 

2025 

2026 # adjust for dotted column names. SQLite 

2027 # in the case of UNION may store col names as 

2028 # "tablename.colname", or if using an attached database, 

2029 # "database.tablename.colname", in cursor.description 

2030 if not self._preserve_raw_colnames and "." in colname: 

2031 return colname.split(".")[-1], colname 

2032 else: 

2033 return colname, None 

2034 

2035 

2036class SQLiteDialect(default.DefaultDialect): 

2037 name = "sqlite" 

2038 supports_alter = False 

2039 

2040 # SQlite supports "DEFAULT VALUES" but *does not* support 

2041 # "VALUES (DEFAULT)" 

2042 supports_default_values = True 

2043 supports_default_metavalue = False 

2044 

2045 # sqlite issue: 

2046 # https://github.com/python/cpython/issues/93421 

2047 # note this parameter is no longer used by the ORM or default dialect 

2048 # see #9414 

2049 supports_sane_rowcount_returning = False 

2050 

2051 supports_empty_insert = False 

2052 supports_cast = True 

2053 supports_multivalues_insert = True 

2054 use_insertmanyvalues = True 

2055 tuple_in_values = True 

2056 supports_statement_cache = True 

2057 insert_null_pk_still_autoincrements = True 

2058 insert_returning = True 

2059 update_returning = True 

2060 update_returning_multifrom = True 

2061 delete_returning = True 

2062 update_returning_multifrom = True 

2063 

2064 supports_default_metavalue = True 

2065 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

2066 

2067 default_metavalue_token = "NULL" 

2068 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

2069 parenthesis.""" 

2070 

2071 default_paramstyle = "qmark" 

2072 execution_ctx_cls = SQLiteExecutionContext 

2073 statement_compiler = SQLiteCompiler 

2074 ddl_compiler = SQLiteDDLCompiler 

2075 type_compiler_cls = SQLiteTypeCompiler 

2076 preparer = SQLiteIdentifierPreparer 

2077 ischema_names = ischema_names 

2078 colspecs = colspecs 

2079 

2080 construct_arguments = [ 

2081 ( 

2082 sa_schema.Table, 

2083 { 

2084 "autoincrement": False, 

2085 "with_rowid": True, 

2086 "strict": False, 

2087 }, 

2088 ), 

2089 (sa_schema.Index, {"where": None}), 

2090 ( 

2091 sa_schema.Column, 

2092 { 

2093 "on_conflict_primary_key": None, 

2094 "on_conflict_not_null": None, 

2095 "on_conflict_unique": None, 

2096 }, 

2097 ), 

2098 (sa_schema.Constraint, {"on_conflict": None}), 

2099 ] 

2100 

2101 _broken_fk_pragma_quotes = False 

2102 _broken_dotted_colnames = False 

2103 

2104 @util.deprecated_params( 

2105 _json_serializer=( 

2106 "1.3.7", 

2107 "The _json_serializer argument to the SQLite dialect has " 

2108 "been renamed to the correct name of json_serializer. The old " 

2109 "argument name will be removed in a future release.", 

2110 ), 

2111 _json_deserializer=( 

2112 "1.3.7", 

2113 "The _json_deserializer argument to the SQLite dialect has " 

2114 "been renamed to the correct name of json_deserializer. The old " 

2115 "argument name will be removed in a future release.", 

2116 ), 

2117 ) 

2118 def __init__( 

2119 self, 

2120 native_datetime=False, 

2121 json_serializer=None, 

2122 json_deserializer=None, 

2123 _json_serializer=None, 

2124 _json_deserializer=None, 

2125 **kwargs, 

2126 ): 

2127 default.DefaultDialect.__init__(self, **kwargs) 

2128 

2129 if _json_serializer: 

2130 json_serializer = _json_serializer 

2131 if _json_deserializer: 

2132 json_deserializer = _json_deserializer 

2133 self._json_serializer = json_serializer 

2134 self._json_deserializer = json_deserializer 

2135 

2136 # this flag used by pysqlite dialect, and perhaps others in the 

2137 # future, to indicate the driver is handling date/timestamp 

2138 # conversions (and perhaps datetime/time as well on some hypothetical 

2139 # driver ?) 

2140 self.native_datetime = native_datetime 

2141 

2142 if self.dbapi is not None: 

2143 if self.dbapi.sqlite_version_info < (3, 7, 16): 

2144 util.warn( 

2145 "SQLite version %s is older than 3.7.16, and will not " 

2146 "support right nested joins, as are sometimes used in " 

2147 "more complex ORM scenarios. SQLAlchemy 1.4 and above " 

2148 "no longer tries to rewrite these joins." 

2149 % (self.dbapi.sqlite_version_info,) 

2150 ) 

2151 

2152 # NOTE: python 3.7 on fedora for me has SQLite 3.34.1. These 

2153 # version checks are getting very stale. 

2154 self._broken_dotted_colnames = self.dbapi.sqlite_version_info < ( 

2155 3, 

2156 10, 

2157 0, 

2158 ) 

2159 self.supports_default_values = self.dbapi.sqlite_version_info >= ( 

2160 3, 

2161 3, 

2162 8, 

2163 ) 

2164 self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3) 

2165 self.supports_multivalues_insert = ( 

2166 # https://www.sqlite.org/releaselog/3_7_11.html 

2167 self.dbapi.sqlite_version_info 

2168 >= (3, 7, 11) 

2169 ) 

2170 # see https://www.sqlalchemy.org/trac/ticket/2568 

2171 # as well as https://www.sqlite.org/src/info/600482d161 

2172 self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < ( 

2173 3, 

2174 6, 

2175 14, 

2176 ) 

2177 

2178 if self.dbapi.sqlite_version_info < (3, 35) or util.pypy: 

2179 self.update_returning = self.delete_returning = ( 

2180 self.insert_returning 

2181 ) = False 

2182 

2183 if self.dbapi.sqlite_version_info < (3, 32, 0): 

2184 # https://www.sqlite.org/limits.html 

2185 self.insertmanyvalues_max_parameters = 999 

2186 

2187 _isolation_lookup = util.immutabledict( 

2188 {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0} 

2189 ) 

2190 

2191 def get_isolation_level_values(self, dbapi_connection): 

2192 return list(self._isolation_lookup) 

2193 

2194 def set_isolation_level(self, dbapi_connection, level): 

2195 isolation_level = self._isolation_lookup[level] 

2196 

2197 cursor = dbapi_connection.cursor() 

2198 cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}") 

2199 cursor.close() 

2200 

2201 def get_isolation_level(self, dbapi_connection): 

2202 cursor = dbapi_connection.cursor() 

2203 cursor.execute("PRAGMA read_uncommitted") 

2204 res = cursor.fetchone() 

2205 if res: 

2206 value = res[0] 

2207 else: 

2208 # https://www.sqlite.org/changes.html#version_3_3_3 

2209 # "Optional READ UNCOMMITTED isolation (instead of the 

2210 # default isolation level of SERIALIZABLE) and 

2211 # table level locking when database connections 

2212 # share a common cache."" 

2213 # pre-SQLite 3.3.0 default to 0 

2214 value = 0 

2215 cursor.close() 

2216 if value == 0: 

2217 return "SERIALIZABLE" 

2218 elif value == 1: 

2219 return "READ UNCOMMITTED" 

2220 else: 

2221 assert False, "Unknown isolation level %s" % value 

2222 

2223 @reflection.cache 

2224 def get_schema_names(self, connection, **kw): 

2225 s = "PRAGMA database_list" 

2226 dl = connection.exec_driver_sql(s) 

2227 

2228 return [db[1] for db in dl if db[1] != "temp"] 

2229 

2230 def _format_schema(self, schema, table_name): 

2231 if schema is not None: 

2232 qschema = self.identifier_preparer.quote_identifier(schema) 

2233 name = f"{qschema}.{table_name}" 

2234 else: 

2235 name = table_name 

2236 return name 

2237 

2238 def _sqlite_main_query( 

2239 self, 

2240 table: str, 

2241 type_: str, 

2242 schema: Optional[str], 

2243 sqlite_include_internal: bool, 

2244 ): 

2245 main = self._format_schema(schema, table) 

2246 if not sqlite_include_internal: 

2247 filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'" 

2248 else: 

2249 filter_table = "" 

2250 query = ( 

2251 f"SELECT name FROM {main} " 

2252 f"WHERE type='{type_}'{filter_table} " 

2253 "ORDER BY name" 

2254 ) 

2255 return query 

2256 

2257 @reflection.cache 

2258 def get_table_names( 

2259 self, connection, schema=None, sqlite_include_internal=False, **kw 

2260 ): 

2261 query = self._sqlite_main_query( 

2262 "sqlite_master", "table", schema, sqlite_include_internal 

2263 ) 

2264 names = connection.exec_driver_sql(query).scalars().all() 

2265 return names 

2266 

2267 @reflection.cache 

2268 def get_temp_table_names( 

2269 self, connection, sqlite_include_internal=False, **kw 

2270 ): 

2271 query = self._sqlite_main_query( 

2272 "sqlite_temp_master", "table", None, sqlite_include_internal 

2273 ) 

2274 names = connection.exec_driver_sql(query).scalars().all() 

2275 return names 

2276 

2277 @reflection.cache 

2278 def get_temp_view_names( 

2279 self, connection, sqlite_include_internal=False, **kw 

2280 ): 

2281 query = self._sqlite_main_query( 

2282 "sqlite_temp_master", "view", None, sqlite_include_internal 

2283 ) 

2284 names = connection.exec_driver_sql(query).scalars().all() 

2285 return names 

2286 

2287 @reflection.cache 

2288 def has_table(self, connection, table_name, schema=None, **kw): 

2289 self._ensure_has_table_connection(connection) 

2290 

2291 if schema is not None and schema not in self.get_schema_names( 

2292 connection, **kw 

2293 ): 

2294 return False 

2295 

2296 info = self._get_table_pragma( 

2297 connection, "table_info", table_name, schema=schema 

2298 ) 

2299 return bool(info) 

2300 

2301 def _get_default_schema_name(self, connection): 

2302 return "main" 

2303 

2304 @reflection.cache 

2305 def get_view_names( 

2306 self, connection, schema=None, sqlite_include_internal=False, **kw 

2307 ): 

2308 query = self._sqlite_main_query( 

2309 "sqlite_master", "view", schema, sqlite_include_internal 

2310 ) 

2311 names = connection.exec_driver_sql(query).scalars().all() 

2312 return names 

2313 

2314 @reflection.cache 

2315 def get_view_definition(self, connection, view_name, schema=None, **kw): 

2316 if schema is not None: 

2317 qschema = self.identifier_preparer.quote_identifier(schema) 

2318 master = f"{qschema}.sqlite_master" 

2319 s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % ( 

2320 master, 

2321 ) 

2322 rs = connection.exec_driver_sql(s, (view_name,)) 

2323 else: 

2324 try: 

2325 s = ( 

2326 "SELECT sql FROM " 

2327 " (SELECT * FROM sqlite_master UNION ALL " 

2328 " SELECT * FROM sqlite_temp_master) " 

2329 "WHERE name = ? " 

2330 "AND type='view'" 

2331 ) 

2332 rs = connection.exec_driver_sql(s, (view_name,)) 

2333 except exc.DBAPIError: 

2334 s = ( 

2335 "SELECT sql FROM sqlite_master WHERE name = ? " 

2336 "AND type='view'" 

2337 ) 

2338 rs = connection.exec_driver_sql(s, (view_name,)) 

2339 

2340 result = rs.fetchall() 

2341 if result: 

2342 return result[0].sql 

2343 else: 

2344 raise exc.NoSuchTableError( 

2345 f"{schema}.{view_name}" if schema else view_name 

2346 ) 

2347 

2348 @reflection.cache 

2349 def get_columns(self, connection, table_name, schema=None, **kw): 

2350 pragma = "table_info" 

2351 # computed columns are threaded as hidden, they require table_xinfo 

2352 if self.server_version_info >= (3, 31): 

2353 pragma = "table_xinfo" 

2354 info = self._get_table_pragma( 

2355 connection, pragma, table_name, schema=schema 

2356 ) 

2357 columns = [] 

2358 tablesql = None 

2359 for row in info: 

2360 name = row[1] 

2361 type_ = row[2].upper() 

2362 nullable = not row[3] 

2363 default = row[4] 

2364 primary_key = row[5] 

2365 hidden = row[6] if pragma == "table_xinfo" else 0 

2366 

2367 # hidden has value 0 for normal columns, 1 for hidden columns, 

2368 # 2 for computed virtual columns and 3 for computed stored columns 

2369 # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b 

2370 if hidden == 1: 

2371 continue 

2372 

2373 generated = bool(hidden) 

2374 persisted = hidden == 3 

2375 

2376 if tablesql is None and generated: 

2377 tablesql = self._get_table_sql( 

2378 connection, table_name, schema, **kw 

2379 ) 

2380 # remove create table 

2381 match = re.match( 

2382 r"create table .*?\((.*)\)$", 

2383 tablesql.strip(), 

2384 re.DOTALL | re.IGNORECASE, 

2385 ) 

2386 assert match, f"create table not found in {tablesql}" 

2387 tablesql = match.group(1).strip() 

2388 

2389 columns.append( 

2390 self._get_column_info( 

2391 name, 

2392 type_, 

2393 nullable, 

2394 default, 

2395 primary_key, 

2396 generated, 

2397 persisted, 

2398 tablesql, 

2399 ) 

2400 ) 

2401 if columns: 

2402 return columns 

2403 elif not self.has_table(connection, table_name, schema): 

2404 raise exc.NoSuchTableError( 

2405 f"{schema}.{table_name}" if schema else table_name 

2406 ) 

2407 else: 

2408 return ReflectionDefaults.columns() 

2409 

2410 def _get_column_info( 

2411 self, 

2412 name, 

2413 type_, 

2414 nullable, 

2415 default, 

2416 primary_key, 

2417 generated, 

2418 persisted, 

2419 tablesql, 

2420 ): 

2421 if generated: 

2422 # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)" 

2423 # somehow is "INTEGER GENERATED ALWAYS" 

2424 type_ = re.sub("generated", "", type_, flags=re.IGNORECASE) 

2425 type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip() 

2426 

2427 coltype = self._resolve_type_affinity(type_) 

2428 

2429 if default is not None: 

2430 default = str(default) 

2431 

2432 colspec = { 

2433 "name": name, 

2434 "type": coltype, 

2435 "nullable": nullable, 

2436 "default": default, 

2437 "primary_key": primary_key, 

2438 } 

2439 if generated: 

2440 sqltext = "" 

2441 if tablesql: 

2442 pattern = ( 

2443 r"[^,]*\s+GENERATED\s+ALWAYS\s+AS" 

2444 r"\s+\((.*)\)\s*(?:virtual|stored)?" 

2445 ) 

2446 match = re.search( 

2447 re.escape(name) + pattern, tablesql, re.IGNORECASE 

2448 ) 

2449 if match: 

2450 sqltext = match.group(1) 

2451 colspec["computed"] = {"sqltext": sqltext, "persisted": persisted} 

2452 return colspec 

2453 

2454 def _resolve_type_affinity(self, type_): 

2455 """Return a data type from a reflected column, using affinity rules. 

2456 

2457 SQLite's goal for universal compatibility introduces some complexity 

2458 during reflection, as a column's defined type might not actually be a 

2459 type that SQLite understands - or indeed, my not be defined *at all*. 

2460 Internally, SQLite handles this with a 'data type affinity' for each 

2461 column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER', 

2462 'REAL', or 'NONE' (raw bits). The algorithm that determines this is 

2463 listed in https://www.sqlite.org/datatype3.html section 2.1. 

2464 

2465 This method allows SQLAlchemy to support that algorithm, while still 

2466 providing access to smarter reflection utilities by recognizing 

2467 column definitions that SQLite only supports through affinity (like 

2468 DATE and DOUBLE). 

2469 

2470 """ 

2471 match = re.match(r"([\w ]+)(\(.*?\))?", type_) 

2472 if match: 

2473 coltype = match.group(1) 

2474 args = match.group(2) 

2475 else: 

2476 coltype = "" 

2477 args = "" 

2478 

2479 if coltype in self.ischema_names: 

2480 coltype = self.ischema_names[coltype] 

2481 elif "INT" in coltype: 

2482 coltype = sqltypes.INTEGER 

2483 elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype: 

2484 coltype = sqltypes.TEXT 

2485 elif "BLOB" in coltype or not coltype: 

2486 coltype = sqltypes.NullType 

2487 elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype: 

2488 coltype = sqltypes.REAL 

2489 else: 

2490 coltype = sqltypes.NUMERIC 

2491 

2492 if args is not None: 

2493 args = re.findall(r"(\d+)", args) 

2494 try: 

2495 coltype = coltype(*[int(a) for a in args]) 

2496 except TypeError: 

2497 util.warn( 

2498 "Could not instantiate type %s with " 

2499 "reflected arguments %s; using no arguments." 

2500 % (coltype, args) 

2501 ) 

2502 coltype = coltype() 

2503 else: 

2504 coltype = coltype() 

2505 

2506 return coltype 

2507 

2508 @reflection.cache 

2509 def get_pk_constraint(self, connection, table_name, schema=None, **kw): 

2510 constraint_name = None 

2511 table_data = self._get_table_sql(connection, table_name, schema=schema) 

2512 if table_data: 

2513 PK_PATTERN = r"CONSTRAINT (\w+) PRIMARY KEY" 

2514 result = re.search(PK_PATTERN, table_data, re.I) 

2515 constraint_name = result.group(1) if result else None 

2516 

2517 cols = self.get_columns(connection, table_name, schema, **kw) 

2518 # consider only pk columns. This also avoids sorting the cached 

2519 # value returned by get_columns 

2520 cols = [col for col in cols if col.get("primary_key", 0) > 0] 

2521 cols.sort(key=lambda col: col.get("primary_key")) 

2522 pkeys = [col["name"] for col in cols] 

2523 

2524 if pkeys: 

2525 return {"constrained_columns": pkeys, "name": constraint_name} 

2526 else: 

2527 return ReflectionDefaults.pk_constraint() 

2528 

2529 @reflection.cache 

2530 def get_foreign_keys(self, connection, table_name, schema=None, **kw): 

2531 # sqlite makes this *extremely difficult*. 

2532 # First, use the pragma to get the actual FKs. 

2533 pragma_fks = self._get_table_pragma( 

2534 connection, "foreign_key_list", table_name, schema=schema 

2535 ) 

2536 

2537 fks = {} 

2538 

2539 for row in pragma_fks: 

2540 (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4]) 

2541 

2542 if not rcol: 

2543 # no referred column, which means it was not named in the 

2544 # original DDL. The referred columns of the foreign key 

2545 # constraint are therefore the primary key of the referred 

2546 # table. 

2547 try: 

2548 referred_pk = self.get_pk_constraint( 

2549 connection, rtbl, schema=schema, **kw 

2550 ) 

2551 referred_columns = referred_pk["constrained_columns"] 

2552 except exc.NoSuchTableError: 

2553 # ignore not existing parents 

2554 referred_columns = [] 

2555 else: 

2556 # note we use this list only if this is the first column 

2557 # in the constraint. for subsequent columns we ignore the 

2558 # list and append "rcol" if present. 

2559 referred_columns = [] 

2560 

2561 if self._broken_fk_pragma_quotes: 

2562 rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl) 

2563 

2564 if numerical_id in fks: 

2565 fk = fks[numerical_id] 

2566 else: 

2567 fk = fks[numerical_id] = { 

2568 "name": None, 

2569 "constrained_columns": [], 

2570 "referred_schema": schema, 

2571 "referred_table": rtbl, 

2572 "referred_columns": referred_columns, 

2573 "options": {}, 

2574 } 

2575 fks[numerical_id] = fk 

2576 

2577 fk["constrained_columns"].append(lcol) 

2578 

2579 if rcol: 

2580 fk["referred_columns"].append(rcol) 

2581 

2582 def fk_sig(constrained_columns, referred_table, referred_columns): 

2583 return ( 

2584 tuple(constrained_columns) 

2585 + (referred_table,) 

2586 + tuple(referred_columns) 

2587 ) 

2588 

2589 # then, parse the actual SQL and attempt to find DDL that matches 

2590 # the names as well. SQLite saves the DDL in whatever format 

2591 # it was typed in as, so need to be liberal here. 

2592 

2593 keys_by_signature = { 

2594 fk_sig( 

2595 fk["constrained_columns"], 

2596 fk["referred_table"], 

2597 fk["referred_columns"], 

2598 ): fk 

2599 for fk in fks.values() 

2600 } 

2601 

2602 table_data = self._get_table_sql(connection, table_name, schema=schema) 

2603 

2604 def parse_fks(): 

2605 if table_data is None: 

2606 # system tables, etc. 

2607 return 

2608 

2609 # note that we already have the FKs from PRAGMA above. This whole 

2610 # regexp thing is trying to locate additional detail about the 

2611 # FKs, namely the name of the constraint and other options. 

2612 # so parsing the columns is really about matching it up to what 

2613 # we already have. 

2614 FK_PATTERN = ( 

2615 r"(?:CONSTRAINT (\w+) +)?" 

2616 r"FOREIGN KEY *\( *(.+?) *\) +" 

2617 r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *' # noqa: E501 

2618 r"((?:ON (?:DELETE|UPDATE) " 

2619 r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)" 

2620 r"((?:NOT +)?DEFERRABLE)?" 

2621 r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?" 

2622 ) 

2623 for match in re.finditer(FK_PATTERN, table_data, re.I): 

2624 ( 

2625 constraint_name, 

2626 constrained_columns, 

2627 referred_quoted_name, 

2628 referred_name, 

2629 referred_columns, 

2630 onupdatedelete, 

2631 deferrable, 

2632 initially, 

2633 ) = match.group(1, 2, 3, 4, 5, 6, 7, 8) 

2634 constrained_columns = list( 

2635 self._find_cols_in_sig(constrained_columns) 

2636 ) 

2637 if not referred_columns: 

2638 referred_columns = constrained_columns 

2639 else: 

2640 referred_columns = list( 

2641 self._find_cols_in_sig(referred_columns) 

2642 ) 

2643 referred_name = referred_quoted_name or referred_name 

2644 options = {} 

2645 

2646 for token in re.split(r" *\bON\b *", onupdatedelete.upper()): 

2647 if token.startswith("DELETE"): 

2648 ondelete = token[6:].strip() 

2649 if ondelete and ondelete != "NO ACTION": 

2650 options["ondelete"] = ondelete 

2651 elif token.startswith("UPDATE"): 

2652 onupdate = token[6:].strip() 

2653 if onupdate and onupdate != "NO ACTION": 

2654 options["onupdate"] = onupdate 

2655 

2656 if deferrable: 

2657 options["deferrable"] = "NOT" not in deferrable.upper() 

2658 if initially: 

2659 options["initially"] = initially.upper() 

2660 

2661 yield ( 

2662 constraint_name, 

2663 constrained_columns, 

2664 referred_name, 

2665 referred_columns, 

2666 options, 

2667 ) 

2668 

2669 fkeys = [] 

2670 

2671 for ( 

2672 constraint_name, 

2673 constrained_columns, 

2674 referred_name, 

2675 referred_columns, 

2676 options, 

2677 ) in parse_fks(): 

2678 sig = fk_sig(constrained_columns, referred_name, referred_columns) 

2679 if sig not in keys_by_signature: 

2680 util.warn( 

2681 "WARNING: SQL-parsed foreign key constraint " 

2682 "'%s' could not be located in PRAGMA " 

2683 "foreign_keys for table %s" % (sig, table_name) 

2684 ) 

2685 continue 

2686 key = keys_by_signature.pop(sig) 

2687 key["name"] = constraint_name 

2688 key["options"] = options 

2689 fkeys.append(key) 

2690 # assume the remainders are the unnamed, inline constraints, just 

2691 # use them as is as it's extremely difficult to parse inline 

2692 # constraints 

2693 fkeys.extend(keys_by_signature.values()) 

2694 if fkeys: 

2695 return fkeys 

2696 else: 

2697 return ReflectionDefaults.foreign_keys() 

2698 

2699 def _find_cols_in_sig(self, sig): 

2700 for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I): 

2701 yield match.group(1) or match.group(2) 

2702 

2703 @reflection.cache 

2704 def get_unique_constraints( 

2705 self, connection, table_name, schema=None, **kw 

2706 ): 

2707 auto_index_by_sig = {} 

2708 for idx in self.get_indexes( 

2709 connection, 

2710 table_name, 

2711 schema=schema, 

2712 include_auto_indexes=True, 

2713 **kw, 

2714 ): 

2715 if not idx["name"].startswith("sqlite_autoindex"): 

2716 continue 

2717 sig = tuple(idx["column_names"]) 

2718 auto_index_by_sig[sig] = idx 

2719 

2720 table_data = self._get_table_sql( 

2721 connection, table_name, schema=schema, **kw 

2722 ) 

2723 unique_constraints = [] 

2724 

2725 def parse_uqs(): 

2726 if table_data is None: 

2727 return 

2728 UNIQUE_PATTERN = r'(?:CONSTRAINT "?(.+?)"? +)?UNIQUE *\((.+?)\)' 

2729 INLINE_UNIQUE_PATTERN = ( 

2730 r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?)[\t ]' 

2731 r"+[a-z0-9_ ]+?[\t ]+UNIQUE" 

2732 ) 

2733 

2734 for match in re.finditer(UNIQUE_PATTERN, table_data, re.I): 

2735 name, cols = match.group(1, 2) 

2736 yield name, list(self._find_cols_in_sig(cols)) 

2737 

2738 # we need to match inlines as well, as we seek to differentiate 

2739 # a UNIQUE constraint from a UNIQUE INDEX, even though these 

2740 # are kind of the same thing :) 

2741 for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I): 

2742 cols = list( 

2743 self._find_cols_in_sig(match.group(1) or match.group(2)) 

2744 ) 

2745 yield None, cols 

2746 

2747 for name, cols in parse_uqs(): 

2748 sig = tuple(cols) 

2749 if sig in auto_index_by_sig: 

2750 auto_index_by_sig.pop(sig) 

2751 parsed_constraint = {"name": name, "column_names": cols} 

2752 unique_constraints.append(parsed_constraint) 

2753 # NOTE: auto_index_by_sig might not be empty here, 

2754 # the PRIMARY KEY may have an entry. 

2755 if unique_constraints: 

2756 return unique_constraints 

2757 else: 

2758 return ReflectionDefaults.unique_constraints() 

2759 

2760 @reflection.cache 

2761 def get_check_constraints(self, connection, table_name, schema=None, **kw): 

2762 table_data = self._get_table_sql( 

2763 connection, table_name, schema=schema, **kw 

2764 ) 

2765 

2766 # NOTE NOTE NOTE 

2767 # DO NOT CHANGE THIS REGULAR EXPRESSION. There is no known way 

2768 # to parse CHECK constraints that contain newlines themselves using 

2769 # regular expressions, and the approach here relies upon each 

2770 # individual 

2771 # CHECK constraint being on a single line by itself. This 

2772 # necessarily makes assumptions as to how the CREATE TABLE 

2773 # was emitted. A more comprehensive DDL parsing solution would be 

2774 # needed to improve upon the current situation. See #11840 for 

2775 # background 

2776 CHECK_PATTERN = r"(?:CONSTRAINT (.+) +)?CHECK *\( *(.+) *\),? *" 

2777 cks = [] 

2778 

2779 for match in re.finditer(CHECK_PATTERN, table_data or "", re.I): 

2780 

2781 name = match.group(1) 

2782 

2783 if name: 

2784 name = re.sub(r'^"|"$', "", name) 

2785 

2786 cks.append({"sqltext": match.group(2), "name": name}) 

2787 cks.sort(key=lambda d: d["name"] or "~") # sort None as last 

2788 if cks: 

2789 return cks 

2790 else: 

2791 return ReflectionDefaults.check_constraints() 

2792 

2793 @reflection.cache 

2794 def get_indexes(self, connection, table_name, schema=None, **kw): 

2795 pragma_indexes = self._get_table_pragma( 

2796 connection, "index_list", table_name, schema=schema 

2797 ) 

2798 indexes = [] 

2799 

2800 # regular expression to extract the filter predicate of a partial 

2801 # index. this could fail to extract the predicate correctly on 

2802 # indexes created like 

2803 # CREATE INDEX i ON t (col || ') where') WHERE col <> '' 

2804 # but as this function does not support expression-based indexes 

2805 # this case does not occur. 

2806 partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE) 

2807 

2808 if schema: 

2809 schema_expr = "%s." % self.identifier_preparer.quote_identifier( 

2810 schema 

2811 ) 

2812 else: 

2813 schema_expr = "" 

2814 

2815 include_auto_indexes = kw.pop("include_auto_indexes", False) 

2816 for row in pragma_indexes: 

2817 # ignore implicit primary key index. 

2818 # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html 

2819 if not include_auto_indexes and row[1].startswith( 

2820 "sqlite_autoindex" 

2821 ): 

2822 continue 

2823 indexes.append( 

2824 dict( 

2825 name=row[1], 

2826 column_names=[], 

2827 unique=row[2], 

2828 dialect_options={}, 

2829 ) 

2830 ) 

2831 

2832 # check partial indexes 

2833 if len(row) >= 5 and row[4]: 

2834 s = ( 

2835 "SELECT sql FROM %(schema)ssqlite_master " 

2836 "WHERE name = ? " 

2837 "AND type = 'index'" % {"schema": schema_expr} 

2838 ) 

2839 rs = connection.exec_driver_sql(s, (row[1],)) 

2840 index_sql = rs.scalar() 

2841 predicate_match = partial_pred_re.search(index_sql) 

2842 if predicate_match is None: 

2843 # unless the regex is broken this case shouldn't happen 

2844 # because we know this is a partial index, so the 

2845 # definition sql should match the regex 

2846 util.warn( 

2847 "Failed to look up filter predicate of " 

2848 "partial index %s" % row[1] 

2849 ) 

2850 else: 

2851 predicate = predicate_match.group(1) 

2852 indexes[-1]["dialect_options"]["sqlite_where"] = text( 

2853 predicate 

2854 ) 

2855 

2856 # loop thru unique indexes to get the column names. 

2857 for idx in list(indexes): 

2858 pragma_index = self._get_table_pragma( 

2859 connection, "index_info", idx["name"], schema=schema 

2860 ) 

2861 

2862 for row in pragma_index: 

2863 if row[2] is None: 

2864 util.warn( 

2865 "Skipped unsupported reflection of " 

2866 "expression-based index %s" % idx["name"] 

2867 ) 

2868 indexes.remove(idx) 

2869 break 

2870 else: 

2871 idx["column_names"].append(row[2]) 

2872 

2873 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last 

2874 if indexes: 

2875 return indexes 

2876 elif not self.has_table(connection, table_name, schema): 

2877 raise exc.NoSuchTableError( 

2878 f"{schema}.{table_name}" if schema else table_name 

2879 ) 

2880 else: 

2881 return ReflectionDefaults.indexes() 

2882 

2883 def _is_sys_table(self, table_name): 

2884 return table_name in { 

2885 "sqlite_schema", 

2886 "sqlite_master", 

2887 "sqlite_temp_schema", 

2888 "sqlite_temp_master", 

2889 } 

2890 

2891 @reflection.cache 

2892 def _get_table_sql(self, connection, table_name, schema=None, **kw): 

2893 if schema: 

2894 schema_expr = "%s." % ( 

2895 self.identifier_preparer.quote_identifier(schema) 

2896 ) 

2897 else: 

2898 schema_expr = "" 

2899 try: 

2900 s = ( 

2901 "SELECT sql FROM " 

2902 " (SELECT * FROM %(schema)ssqlite_master UNION ALL " 

2903 " SELECT * FROM %(schema)ssqlite_temp_master) " 

2904 "WHERE name = ? " 

2905 "AND type in ('table', 'view')" % {"schema": schema_expr} 

2906 ) 

2907 rs = connection.exec_driver_sql(s, (table_name,)) 

2908 except exc.DBAPIError: 

2909 s = ( 

2910 "SELECT sql FROM %(schema)ssqlite_master " 

2911 "WHERE name = ? " 

2912 "AND type in ('table', 'view')" % {"schema": schema_expr} 

2913 ) 

2914 rs = connection.exec_driver_sql(s, (table_name,)) 

2915 value = rs.scalar() 

2916 if value is None and not self._is_sys_table(table_name): 

2917 raise exc.NoSuchTableError(f"{schema_expr}{table_name}") 

2918 return value 

2919 

2920 def _get_table_pragma(self, connection, pragma, table_name, schema=None): 

2921 quote = self.identifier_preparer.quote_identifier 

2922 if schema is not None: 

2923 statements = [f"PRAGMA {quote(schema)}."] 

2924 else: 

2925 # because PRAGMA looks in all attached databases if no schema 

2926 # given, need to specify "main" schema, however since we want 

2927 # 'temp' tables in the same namespace as 'main', need to run 

2928 # the PRAGMA twice 

2929 statements = ["PRAGMA main.", "PRAGMA temp."] 

2930 

2931 qtable = quote(table_name) 

2932 for statement in statements: 

2933 statement = f"{statement}{pragma}({qtable})" 

2934 cursor = connection.exec_driver_sql(statement) 

2935 if not cursor._soft_closed: 

2936 # work around SQLite issue whereby cursor.description 

2937 # is blank when PRAGMA returns no rows: 

2938 # https://www.sqlite.org/cvstrac/tktview?tn=1884 

2939 result = cursor.fetchall() 

2940 else: 

2941 result = [] 

2942 if result: 

2943 return result 

2944 else: 

2945 return []