Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/base.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

783 statements  

1# dialects/sqlite/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9 

10r''' 

11.. dialect:: sqlite 

12 :name: SQLite 

13 :normal_support: 3.12+ 

14 :best_effort: 3.7.16+ 

15 

16.. _sqlite_datetime: 

17 

18Date and Time Types 

19------------------- 

20 

21SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does 

22not provide out of the box functionality for translating values between Python 

23`datetime` objects and a SQLite-supported format. SQLAlchemy's own 

24:class:`~sqlalchemy.types.DateTime` and related types provide date formatting 

25and parsing functionality when SQLite is used. The implementation classes are 

26:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`. 

27These types represent dates and times as ISO formatted strings, which also 

28nicely support ordering. There's no reliance on typical "libc" internals for 

29these functions so historical dates are fully supported. 

30 

31Ensuring Text affinity 

32^^^^^^^^^^^^^^^^^^^^^^ 

33 

34The DDL rendered for these types is the standard ``DATE``, ``TIME`` 

35and ``DATETIME`` indicators. However, custom storage formats can also be 

36applied to these types. When the 

37storage format is detected as containing no alpha characters, the DDL for 

38these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``, 

39so that the column continues to have textual affinity. 

40 

41.. seealso:: 

42 

43 `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ - 

44 in the SQLite documentation 

45 

46.. _sqlite_autoincrement: 

47 

48SQLite Auto Incrementing Behavior 

49---------------------------------- 

50 

51Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html 

52 

53Key concepts: 

54 

55* SQLite has an implicit "auto increment" feature that takes place for any 

56 non-composite primary-key column that is specifically created using 

57 "INTEGER PRIMARY KEY" for the type + primary key. 

58 

59* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not** 

60 equivalent to the implicit autoincrement feature; this keyword is not 

61 recommended for general use. SQLAlchemy does not render this keyword 

62 unless a special SQLite-specific directive is used (see below). However, 

63 it still requires that the column's type is named "INTEGER". 

64 

65Using the AUTOINCREMENT Keyword 

66^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

67 

68To specifically render the AUTOINCREMENT keyword on the primary key column 

69when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table 

70construct:: 

71 

72 Table( 

73 "sometable", 

74 metadata, 

75 Column("id", Integer, primary_key=True), 

76 sqlite_autoincrement=True, 

77 ) 

78 

79Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER 

80^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

81 

82SQLite's typing model is based on naming conventions. Among other things, this 

83means that any type name which contains the substring ``"INT"`` will be 

84determined to be of "integer affinity". A type named ``"BIGINT"``, 

85``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be 

86of "integer" affinity. However, **the SQLite autoincrement feature, whether 

87implicitly or explicitly enabled, requires that the name of the column's type 

88is exactly the string "INTEGER"**. Therefore, if an application uses a type 

89like :class:`.BigInteger` for a primary key, on SQLite this type will need to 

90be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE 

91TABLE`` statement in order for the autoincrement behavior to be available. 

92 

93One approach to achieve this is to use :class:`.Integer` on SQLite 

94only using :meth:`.TypeEngine.with_variant`:: 

95 

96 table = Table( 

97 "my_table", 

98 metadata, 

99 Column( 

100 "id", 

101 BigInteger().with_variant(Integer, "sqlite"), 

102 primary_key=True, 

103 ), 

104 ) 

105 

106Another is to use a subclass of :class:`.BigInteger` that overrides its DDL 

107name to be ``INTEGER`` when compiled against SQLite:: 

108 

109 from sqlalchemy import BigInteger 

110 from sqlalchemy.ext.compiler import compiles 

111 

112 

113 class SLBigInteger(BigInteger): 

114 pass 

115 

116 

117 @compiles(SLBigInteger, "sqlite") 

118 def bi_c(element, compiler, **kw): 

119 return "INTEGER" 

120 

121 

122 @compiles(SLBigInteger) 

123 def bi_c(element, compiler, **kw): 

124 return compiler.visit_BIGINT(element, **kw) 

125 

126 

127 table = Table( 

128 "my_table", metadata, Column("id", SLBigInteger(), primary_key=True) 

129 ) 

130 

131.. seealso:: 

132 

133 :meth:`.TypeEngine.with_variant` 

134 

135 :ref:`sqlalchemy.ext.compiler_toplevel` 

136 

137 `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_ 

138 

139.. _sqlite_transactions: 

140 

141Transactions with SQLite and the sqlite3 driver 

142----------------------------------------------- 

143 

144As a file-based database, SQLite's approach to transactions differs from 

145traditional databases in many ways. Additionally, the ``sqlite3`` driver 

146standard with Python (as well as the async version ``aiosqlite`` which builds 

147on top of it) has several quirks, workarounds, and API features in the 

148area of transaction control, all of which generally need to be addressed when 

149constructing a SQLAlchemy application that uses SQLite. 

150 

151Legacy Transaction Mode with the sqlite3 driver 

152^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

153 

154The most important aspect of transaction handling with the sqlite3 driver is 

155that it defaults (which will continue through Python 3.15 before being 

156removed in Python 3.16) to legacy transactional behavior which does 

157not strictly follow :pep:`249`. The way in which the driver diverges from the 

158PEP is that it does not "begin" a transaction automatically as dictated by 

159:pep:`249` except in the case of DML statements, e.g. INSERT, UPDATE, and 

160DELETE. Normally, :pep:`249` dictates that a BEGIN must be emitted upon 

161the first SQL statement of any kind, so that all subsequent operations will 

162be established within a transaction until ``connection.commit()`` has been 

163called. The ``sqlite3`` driver, in an effort to be easier to use in 

164highly concurrent environments, skips this step for DQL (e.g. SELECT) statements, 

165and also skips it for DDL (e.g. CREATE TABLE etc.) statements for more legacy 

166reasons. Statements such as SAVEPOINT are also skipped. 

167 

168In modern versions of the ``sqlite3`` driver as of Python 3.12, this legacy 

169mode of operation is referred to as 

170`"legacy transaction control" <https://docs.python.org/3/library/sqlite3.html#sqlite3-transaction-control-isolation-level>`_, and is in 

171effect by default due to the ``Connection.autocommit`` parameter being set to 

172the constant ``sqlite3.LEGACY_TRANSACTION_CONTROL``. Prior to Python 3.12, 

173the ``Connection.autocommit`` attribute did not exist. 

174 

175The implications of legacy transaction mode include: 

176 

177* **Incorrect support for transactional DDL** - statements like CREATE TABLE, ALTER TABLE, 

178 CREATE INDEX etc. will not automatically BEGIN a transaction if one were not 

179 started already, leading to the changes by each statement being 

180 "autocommitted" immediately unless BEGIN were otherwise emitted first. Very 

181 old (pre Python 3.6) versions of SQLite would also force a COMMIT for these 

182 operations even if a transaction were present, however this is no longer the 

183 case. 

184* **SERIALIZABLE behavior not fully functional** - SQLite's transaction isolation 

185 behavior is normally consistent with SERIALIZABLE isolation, as it is a file- 

186 based system that locks the database file entirely for write operations, 

187 preventing COMMIT until all reader transactions (and associated file locks) 

188 have completed. However, sqlite3's legacy transaction mode fails to emit BEGIN for SELECT 

189 statements, which causes these SELECT statements to no longer be "repeatable", 

190 failing one of the consistency guarantees of SERIALIZABLE. 

191* **Incorrect behavior for SAVEPOINT** - as the SAVEPOINT statement does not 

192 imply a BEGIN, a new SAVEPOINT emitted before a BEGIN will function on its 

193 own but fails to participate in the enclosing transaction, meaning a ROLLBACK 

194 of the transaction will not rollback elements that were part of a released 

195 savepoint. 

196 

197Legacy transaction mode first existed in order to faciliate working around 

198SQLite's file locks. Because SQLite relies upon whole-file locks, it is easy to 

199get "database is locked" errors, particularly when newer features like "write 

200ahead logging" are disabled. This is a key reason why ``sqlite3``'s legacy 

201transaction mode is still the default mode of operation; disabling it will 

202produce behavior that is more susceptible to locked database errors. However 

203note that **legacy transaction mode will no longer be the default** in a future 

204Python version (3.16 as of this writing). 

205 

206.. _sqlite_enabling_transactions: 

207 

208Enabling Non-Legacy SQLite Transactional Modes with the sqlite3 or aiosqlite driver 

209^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

210 

211Current SQLAlchemy support allows either for setting the 

212``.Connection.autocommit`` attribute, most directly by using a 

213:func:`._sa.create_engine` parameter, or if on an older version of Python where 

214the attribute is not available, using event hooks to control the behavior of 

215BEGIN. 

216 

217* **Enabling modern sqlite3 transaction control via the autocommit connect parameter** (Python 3.12 and above) 

218 

219 To use SQLite in the mode described at `Transaction control via the autocommit attribute <https://docs.python.org/3/library/sqlite3.html#transaction-control-via-the-autocommit-attribute>`_, 

220 the most straightforward approach is to set the attribute to its recommended value 

221 of ``False`` at the connect level using :paramref:`_sa.create_engine.connect_args``:: 

222 

223 from sqlalchemy import create_engine 

224 

225 engine = create_engine( 

226 "sqlite:///myfile.db", connect_args={"autocommit": False} 

227 ) 

228 

229 This parameter is also passed through when using the aiosqlite driver:: 

230 

231 from sqlalchemy.ext.asyncio import create_async_engine 

232 

233 engine = create_async_engine( 

234 "sqlite+aiosqlite:///myfile.db", connect_args={"autocommit": False} 

235 ) 

236 

237 The parameter can also be set at the attribute level using the :meth:`.PoolEvents.connect` 

238 event hook, however this will only work for sqlite3, as aiosqlite does not yet expose this 

239 attribute on its ``Connection`` object:: 

240 

241 from sqlalchemy import create_engine, event 

242 

243 engine = create_engine("sqlite:///myfile.db") 

244 

245 

246 @event.listens_for(engine, "connect") 

247 def do_connect(dbapi_connection, connection_record): 

248 # enable autocommit=False mode 

249 dbapi_connection.autocommit = False 

250 

251* **Using SQLAlchemy to emit BEGIN in lieu of SQLite's transaction control** (all Python versions, sqlite3 and aiosqlite) 

252 

253 For older versions of ``sqlite3`` or for cross-compatiblity with older and 

254 newer versions, SQLAlchemy can also take over the job of transaction control. 

255 This is achieved by using the :meth:`.ConnectionEvents.begin` hook 

256 to emit the "BEGIN" command directly, while also disabling SQLite's control 

257 of this command using the :meth:`.PoolEvents.connect` event hook to set the 

258 ``Connection.isolation_level`` attribute to ``None``:: 

259 

260 

261 from sqlalchemy import create_engine, event 

262 

263 engine = create_engine("sqlite:///myfile.db") 

264 

265 

266 @event.listens_for(engine, "connect") 

267 def do_connect(dbapi_connection, connection_record): 

268 # disable sqlite3's emitting of the BEGIN statement entirely. 

269 dbapi_connection.isolation_level = None 

270 

271 

272 @event.listens_for(engine, "begin") 

273 def do_begin(conn): 

274 # emit our own BEGIN. sqlite3 still emits COMMIT/ROLLBACK correctly 

275 conn.exec_driver_sql("BEGIN") 

276 

277 When using the asyncio variant ``aiosqlite``, refer to ``engine.sync_engine`` 

278 as in the example below:: 

279 

280 from sqlalchemy import create_engine, event 

281 from sqlalchemy.ext.asyncio import create_async_engine 

282 

283 engine = create_async_engine("sqlite+aiosqlite:///myfile.db") 

284 

285 

286 @event.listens_for(engine.sync_engine, "connect") 

287 def do_connect(dbapi_connection, connection_record): 

288 # disable aiosqlite's emitting of the BEGIN statement entirely. 

289 dbapi_connection.isolation_level = None 

290 

291 

292 @event.listens_for(engine.sync_engine, "begin") 

293 def do_begin(conn): 

294 # emit our own BEGIN. aiosqlite still emits COMMIT/ROLLBACK correctly 

295 conn.exec_driver_sql("BEGIN") 

296 

297.. _sqlite_isolation_level: 

298 

299Using SQLAlchemy's Driver Level AUTOCOMMIT Feature with SQLite 

300^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

301 

302SQLAlchemy has a comprehensive database isolation feature with optional 

303autocommit support that is introduced in the section :ref:`dbapi_autocommit`. 

304 

305For the ``sqlite3`` and ``aiosqlite`` drivers, SQLAlchemy only includes 

306built-in support for "AUTOCOMMIT". Note that this mode is currently incompatible 

307with the non-legacy isolation mode hooks documented in the previous 

308section at :ref:`sqlite_enabling_transactions`. 

309 

310To use the ``sqlite3`` driver with SQLAlchemy driver-level autocommit, 

311create an engine setting the :paramref:`_sa.create_engine.isolation_level` 

312parameter to "AUTOCOMMIT":: 

313 

314 eng = create_engine("sqlite:///myfile.db", isolation_level="AUTOCOMMIT") 

315 

316When using the above mode, any event hooks that set the sqlite3 ``Connection.autocommit`` 

317parameter away from its default of ``sqlite3.LEGACY_TRANSACTION_CONTROL`` 

318as well as hooks that emit ``BEGIN`` should be disabled. 

319 

320Additional Reading for SQLite / sqlite3 transaction control 

321^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

322 

323Links with important information on SQLite, the sqlite3 driver, 

324as well as long historical conversations on how things got to their current state: 

325 

326* `Isolation in SQLite <https://www.sqlite.org/isolation.html>`_ - on the SQLite website 

327* `Transaction control <https://docs.python.org/3/library/sqlite3.html#transaction-control>`_ - describes the sqlite3 autocommit attribute as well 

328 as the legacy isolation_level attribute. 

329* `sqlite3 SELECT does not BEGIN a transaction, but should according to spec <https://github.com/python/cpython/issues/54133>`_ - imported Python standard library issue on github 

330* `sqlite3 module breaks transactions and potentially corrupts data <https://github.com/python/cpython/issues/54949>`_ - imported Python standard library issue on github 

331 

332 

333INSERT/UPDATE/DELETE...RETURNING 

334--------------------------------- 

335 

336The SQLite dialect supports SQLite 3.35's ``INSERT|UPDATE|DELETE..RETURNING`` 

337syntax. ``INSERT..RETURNING`` may be used 

338automatically in some cases in order to fetch newly generated identifiers in 

339place of the traditional approach of using ``cursor.lastrowid``, however 

340``cursor.lastrowid`` is currently still preferred for simple single-statement 

341cases for its better performance. 

342 

343To specify an explicit ``RETURNING`` clause, use the 

344:meth:`._UpdateBase.returning` method on a per-statement basis:: 

345 

346 # INSERT..RETURNING 

347 result = connection.execute( 

348 table.insert().values(name="foo").returning(table.c.col1, table.c.col2) 

349 ) 

350 print(result.all()) 

351 

352 # UPDATE..RETURNING 

353 result = connection.execute( 

354 table.update() 

355 .where(table.c.name == "foo") 

356 .values(name="bar") 

357 .returning(table.c.col1, table.c.col2) 

358 ) 

359 print(result.all()) 

360 

361 # DELETE..RETURNING 

362 result = connection.execute( 

363 table.delete() 

364 .where(table.c.name == "foo") 

365 .returning(table.c.col1, table.c.col2) 

366 ) 

367 print(result.all()) 

368 

369.. versionadded:: 2.0 Added support for SQLite RETURNING 

370 

371 

372.. _sqlite_foreign_keys: 

373 

374Foreign Key Support 

375------------------- 

376 

377SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables, 

378however by default these constraints have no effect on the operation of the 

379table. 

380 

381Constraint checking on SQLite has three prerequisites: 

382 

383* At least version 3.6.19 of SQLite must be in use 

384* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY 

385 or SQLITE_OMIT_TRIGGER symbols enabled. 

386* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all 

387 connections before use -- including the initial call to 

388 :meth:`sqlalchemy.schema.MetaData.create_all`. 

389 

390SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for 

391new connections through the usage of events:: 

392 

393 from sqlalchemy.engine import Engine 

394 from sqlalchemy import event 

395 

396 

397 @event.listens_for(Engine, "connect") 

398 def set_sqlite_pragma(dbapi_connection, connection_record): 

399 # the sqlite3 driver will not set PRAGMA foreign_keys 

400 # if autocommit=False; set to True temporarily 

401 ac = dbapi_connection.autocommit 

402 dbapi_connection.autocommit = True 

403 

404 cursor = dbapi_connection.cursor() 

405 cursor.execute("PRAGMA foreign_keys=ON") 

406 cursor.close() 

407 

408 # restore previous autocommit setting 

409 dbapi_connection.autocommit = ac 

410 

411.. warning:: 

412 

413 When SQLite foreign keys are enabled, it is **not possible** 

414 to emit CREATE or DROP statements for tables that contain 

415 mutually-dependent foreign key constraints; 

416 to emit the DDL for these tables requires that ALTER TABLE be used to 

417 create or drop these constraints separately, for which SQLite has 

418 no support. 

419 

420.. seealso:: 

421 

422 `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_ 

423 - on the SQLite web site. 

424 

425 :ref:`event_toplevel` - SQLAlchemy event API. 

426 

427 :ref:`use_alter` - more information on SQLAlchemy's facilities for handling 

428 mutually-dependent foreign key constraints. 

429 

430.. _sqlite_on_conflict_ddl: 

431 

432ON CONFLICT support for constraints 

433----------------------------------- 

434 

435.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for 

436 SQLite, which occurs within a CREATE TABLE statement. For "ON CONFLICT" as 

437 applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`. 

438 

439SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied 

440to primary key, unique, check, and not null constraints. In DDL, it is 

441rendered either within the "CONSTRAINT" clause or within the column definition 

442itself depending on the location of the target constraint. To render this 

443clause within DDL, the extension parameter ``sqlite_on_conflict`` can be 

444specified with a string conflict resolution algorithm within the 

445:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`, 

446:class:`.CheckConstraint` objects. Within the :class:`_schema.Column` object, 

447there 

448are individual parameters ``sqlite_on_conflict_not_null``, 

449``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each 

450correspond to the three types of relevant constraint types that can be 

451indicated from a :class:`_schema.Column` object. 

452 

453.. seealso:: 

454 

455 `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite 

456 documentation 

457 

458.. versionadded:: 1.3 

459 

460 

461The ``sqlite_on_conflict`` parameters accept a string argument which is just 

462the resolution name to be chosen, which on SQLite can be one of ROLLBACK, 

463ABORT, FAIL, IGNORE, and REPLACE. For example, to add a UNIQUE constraint 

464that specifies the IGNORE algorithm:: 

465 

466 some_table = Table( 

467 "some_table", 

468 metadata, 

469 Column("id", Integer, primary_key=True), 

470 Column("data", Integer), 

471 UniqueConstraint("id", "data", sqlite_on_conflict="IGNORE"), 

472 ) 

473 

474The above renders CREATE TABLE DDL as: 

475 

476.. sourcecode:: sql 

477 

478 CREATE TABLE some_table ( 

479 id INTEGER NOT NULL, 

480 data INTEGER, 

481 PRIMARY KEY (id), 

482 UNIQUE (id, data) ON CONFLICT IGNORE 

483 ) 

484 

485 

486When using the :paramref:`_schema.Column.unique` 

487flag to add a UNIQUE constraint 

488to a single column, the ``sqlite_on_conflict_unique`` parameter can 

489be added to the :class:`_schema.Column` as well, which will be added to the 

490UNIQUE constraint in the DDL:: 

491 

492 some_table = Table( 

493 "some_table", 

494 metadata, 

495 Column("id", Integer, primary_key=True), 

496 Column( 

497 "data", Integer, unique=True, sqlite_on_conflict_unique="IGNORE" 

498 ), 

499 ) 

500 

501rendering: 

502 

503.. sourcecode:: sql 

504 

505 CREATE TABLE some_table ( 

506 id INTEGER NOT NULL, 

507 data INTEGER, 

508 PRIMARY KEY (id), 

509 UNIQUE (data) ON CONFLICT IGNORE 

510 ) 

511 

512To apply the FAIL algorithm for a NOT NULL constraint, 

513``sqlite_on_conflict_not_null`` is used:: 

514 

515 some_table = Table( 

516 "some_table", 

517 metadata, 

518 Column("id", Integer, primary_key=True), 

519 Column( 

520 "data", Integer, nullable=False, sqlite_on_conflict_not_null="FAIL" 

521 ), 

522 ) 

523 

524this renders the column inline ON CONFLICT phrase: 

525 

526.. sourcecode:: sql 

527 

528 CREATE TABLE some_table ( 

529 id INTEGER NOT NULL, 

530 data INTEGER NOT NULL ON CONFLICT FAIL, 

531 PRIMARY KEY (id) 

532 ) 

533 

534 

535Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``:: 

536 

537 some_table = Table( 

538 "some_table", 

539 metadata, 

540 Column( 

541 "id", 

542 Integer, 

543 primary_key=True, 

544 sqlite_on_conflict_primary_key="FAIL", 

545 ), 

546 ) 

547 

548SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict 

549resolution algorithm is applied to the constraint itself: 

550 

551.. sourcecode:: sql 

552 

553 CREATE TABLE some_table ( 

554 id INTEGER NOT NULL, 

555 PRIMARY KEY (id) ON CONFLICT FAIL 

556 ) 

557 

558.. _sqlite_on_conflict_insert: 

559 

560INSERT...ON CONFLICT (Upsert) 

561----------------------------- 

562 

563.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for 

564 SQLite, which occurs within an INSERT statement. For "ON CONFLICT" as 

565 applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`. 

566 

567From version 3.24.0 onwards, SQLite supports "upserts" (update or insert) 

568of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` 

569statement. A candidate row will only be inserted if that row does not violate 

570any unique or primary key constraints. In the case of a unique constraint violation, a 

571secondary action can occur which can be either "DO UPDATE", indicating that 

572the data in the target row should be updated, or "DO NOTHING", which indicates 

573to silently skip this row. 

574 

575Conflicts are determined using columns that are part of existing unique 

576constraints and indexes. These constraints are identified by stating the 

577columns and conditions that comprise the indexes. 

578 

579SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific 

580:func:`_sqlite.insert()` function, which provides 

581the generative methods :meth:`_sqlite.Insert.on_conflict_do_update` 

582and :meth:`_sqlite.Insert.on_conflict_do_nothing`: 

583 

584.. sourcecode:: pycon+sql 

585 

586 >>> from sqlalchemy.dialects.sqlite import insert 

587 

588 >>> insert_stmt = insert(my_table).values( 

589 ... id="some_existing_id", data="inserted value" 

590 ... ) 

591 

592 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

593 ... index_elements=["id"], set_=dict(data="updated value") 

594 ... ) 

595 

596 >>> print(do_update_stmt) 

597 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

598 ON CONFLICT (id) DO UPDATE SET data = ?{stop} 

599 

600 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"]) 

601 

602 >>> print(do_nothing_stmt) 

603 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

604 ON CONFLICT (id) DO NOTHING 

605 

606.. versionadded:: 1.4 

607 

608.. seealso:: 

609 

610 `Upsert 

611 <https://sqlite.org/lang_UPSERT.html>`_ 

612 - in the SQLite documentation. 

613 

614 

615Specifying the Target 

616^^^^^^^^^^^^^^^^^^^^^ 

617 

618Both methods supply the "target" of the conflict using column inference: 

619 

620* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument 

621 specifies a sequence containing string column names, :class:`_schema.Column` 

622 objects, and/or SQL expression elements, which would identify a unique index 

623 or unique constraint. 

624 

625* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` 

626 to infer an index, a partial index can be inferred by also specifying the 

627 :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter: 

628 

629 .. sourcecode:: pycon+sql 

630 

631 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data") 

632 

633 >>> do_update_stmt = stmt.on_conflict_do_update( 

634 ... index_elements=[my_table.c.user_email], 

635 ... index_where=my_table.c.user_email.like("%@gmail.com"), 

636 ... set_=dict(data=stmt.excluded.data), 

637 ... ) 

638 

639 >>> print(do_update_stmt) 

640 {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?) 

641 ON CONFLICT (user_email) 

642 WHERE user_email LIKE '%@gmail.com' 

643 DO UPDATE SET data = excluded.data 

644 

645The SET Clause 

646^^^^^^^^^^^^^^^ 

647 

648``ON CONFLICT...DO UPDATE`` is used to perform an update of the already 

649existing row, using any combination of new values as well as values 

650from the proposed insertion. These values are specified using the 

651:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter. This 

652parameter accepts a dictionary which consists of direct values 

653for UPDATE: 

654 

655.. sourcecode:: pycon+sql 

656 

657 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

658 

659 >>> do_update_stmt = stmt.on_conflict_do_update( 

660 ... index_elements=["id"], set_=dict(data="updated value") 

661 ... ) 

662 

663 >>> print(do_update_stmt) 

664 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 

665 ON CONFLICT (id) DO UPDATE SET data = ? 

666 

667.. warning:: 

668 

669 The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take 

670 into account Python-side default UPDATE values or generation functions, 

671 e.g. those specified using :paramref:`_schema.Column.onupdate`. These 

672 values will not be exercised for an ON CONFLICT style of UPDATE, unless 

673 they are manually specified in the 

674 :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary. 

675 

676Updating using the Excluded INSERT Values 

677^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

678 

679In order to refer to the proposed insertion row, the special alias 

680:attr:`~.sqlite.Insert.excluded` is available as an attribute on 

681the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix 

682on a column, that informs the DO UPDATE to update the row with the value that 

683would have been inserted had the constraint not failed: 

684 

685.. sourcecode:: pycon+sql 

686 

687 >>> stmt = insert(my_table).values( 

688 ... id="some_id", data="inserted value", author="jlh" 

689 ... ) 

690 

691 >>> do_update_stmt = stmt.on_conflict_do_update( 

692 ... index_elements=["id"], 

693 ... set_=dict(data="updated value", author=stmt.excluded.author), 

694 ... ) 

695 

696 >>> print(do_update_stmt) 

697 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) 

698 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author 

699 

700Additional WHERE Criteria 

701^^^^^^^^^^^^^^^^^^^^^^^^^ 

702 

703The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts 

704a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where` 

705parameter, which will limit those rows which receive an UPDATE: 

706 

707.. sourcecode:: pycon+sql 

708 

709 >>> stmt = insert(my_table).values( 

710 ... id="some_id", data="inserted value", author="jlh" 

711 ... ) 

712 

713 >>> on_update_stmt = stmt.on_conflict_do_update( 

714 ... index_elements=["id"], 

715 ... set_=dict(data="updated value", author=stmt.excluded.author), 

716 ... where=(my_table.c.status == 2), 

717 ... ) 

718 >>> print(on_update_stmt) 

719 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) 

720 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author 

721 WHERE my_table.status = ? 

722 

723 

724Skipping Rows with DO NOTHING 

725^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

726 

727``ON CONFLICT`` may be used to skip inserting a row entirely 

728if any conflict with a unique constraint occurs; below this is illustrated 

729using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method: 

730 

731.. sourcecode:: pycon+sql 

732 

733 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

734 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) 

735 >>> print(stmt) 

736 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING 

737 

738 

739If ``DO NOTHING`` is used without specifying any columns or constraint, 

740it has the effect of skipping the INSERT for any unique violation which 

741occurs: 

742 

743.. sourcecode:: pycon+sql 

744 

745 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

746 >>> stmt = stmt.on_conflict_do_nothing() 

747 >>> print(stmt) 

748 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING 

749 

750.. _sqlite_type_reflection: 

751 

752Type Reflection 

753--------------- 

754 

755SQLite types are unlike those of most other database backends, in that 

756the string name of the type usually does not correspond to a "type" in a 

757one-to-one fashion. Instead, SQLite links per-column typing behavior 

758to one of five so-called "type affinities" based on a string matching 

759pattern for the type. 

760 

761SQLAlchemy's reflection process, when inspecting types, uses a simple 

762lookup table to link the keywords returned to provided SQLAlchemy types. 

763This lookup table is present within the SQLite dialect as it is for all 

764other dialects. However, the SQLite dialect has a different "fallback" 

765routine for when a particular type name is not located in the lookup map; 

766it instead implements the SQLite "type affinity" scheme located at 

767https://www.sqlite.org/datatype3.html section 2.1. 

768 

769The provided typemap will make direct associations from an exact string 

770name match for the following types: 

771 

772:class:`_types.BIGINT`, :class:`_types.BLOB`, 

773:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`, 

774:class:`_types.CHAR`, :class:`_types.DATE`, 

775:class:`_types.DATETIME`, :class:`_types.FLOAT`, 

776:class:`_types.DECIMAL`, :class:`_types.FLOAT`, 

777:class:`_types.INTEGER`, :class:`_types.INTEGER`, 

778:class:`_types.NUMERIC`, :class:`_types.REAL`, 

779:class:`_types.SMALLINT`, :class:`_types.TEXT`, 

780:class:`_types.TIME`, :class:`_types.TIMESTAMP`, 

781:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`, 

782:class:`_types.NCHAR` 

783 

784When a type name does not match one of the above types, the "type affinity" 

785lookup is used instead: 

786 

787* :class:`_types.INTEGER` is returned if the type name includes the 

788 string ``INT`` 

789* :class:`_types.TEXT` is returned if the type name includes the 

790 string ``CHAR``, ``CLOB`` or ``TEXT`` 

791* :class:`_types.NullType` is returned if the type name includes the 

792 string ``BLOB`` 

793* :class:`_types.REAL` is returned if the type name includes the string 

794 ``REAL``, ``FLOA`` or ``DOUB``. 

795* Otherwise, the :class:`_types.NUMERIC` type is used. 

796 

797.. _sqlite_partial_index: 

798 

799Partial Indexes 

800--------------- 

801 

802A partial index, e.g. one which uses a WHERE clause, can be specified 

803with the DDL system using the argument ``sqlite_where``:: 

804 

805 tbl = Table("testtbl", m, Column("data", Integer)) 

806 idx = Index( 

807 "test_idx1", 

808 tbl.c.data, 

809 sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10), 

810 ) 

811 

812The index will be rendered at create time as: 

813 

814.. sourcecode:: sql 

815 

816 CREATE INDEX test_idx1 ON testtbl (data) 

817 WHERE data > 5 AND data < 10 

818 

819.. _sqlite_dotted_column_names: 

820 

821Dotted Column Names 

822------------------- 

823 

824Using table or column names that explicitly have periods in them is 

825**not recommended**. While this is generally a bad idea for relational 

826databases in general, as the dot is a syntactically significant character, 

827the SQLite driver up until version **3.10.0** of SQLite has a bug which 

828requires that SQLAlchemy filter out these dots in result sets. 

829 

830The bug, entirely outside of SQLAlchemy, can be illustrated thusly:: 

831 

832 import sqlite3 

833 

834 assert sqlite3.sqlite_version_info < ( 

835 3, 

836 10, 

837 0, 

838 ), "bug is fixed in this version" 

839 

840 conn = sqlite3.connect(":memory:") 

841 cursor = conn.cursor() 

842 

843 cursor.execute("create table x (a integer, b integer)") 

844 cursor.execute("insert into x (a, b) values (1, 1)") 

845 cursor.execute("insert into x (a, b) values (2, 2)") 

846 

847 cursor.execute("select x.a, x.b from x") 

848 assert [c[0] for c in cursor.description] == ["a", "b"] 

849 

850 cursor.execute( 

851 """ 

852 select x.a, x.b from x where a=1 

853 union 

854 select x.a, x.b from x where a=2 

855 """ 

856 ) 

857 assert [c[0] for c in cursor.description] == ["a", "b"], [ 

858 c[0] for c in cursor.description 

859 ] 

860 

861The second assertion fails: 

862 

863.. sourcecode:: text 

864 

865 Traceback (most recent call last): 

866 File "test.py", line 19, in <module> 

867 [c[0] for c in cursor.description] 

868 AssertionError: ['x.a', 'x.b'] 

869 

870Where above, the driver incorrectly reports the names of the columns 

871including the name of the table, which is entirely inconsistent vs. 

872when the UNION is not present. 

873 

874SQLAlchemy relies upon column names being predictable in how they match 

875to the original statement, so the SQLAlchemy dialect has no choice but 

876to filter these out:: 

877 

878 

879 from sqlalchemy import create_engine 

880 

881 eng = create_engine("sqlite://") 

882 conn = eng.connect() 

883 

884 conn.exec_driver_sql("create table x (a integer, b integer)") 

885 conn.exec_driver_sql("insert into x (a, b) values (1, 1)") 

886 conn.exec_driver_sql("insert into x (a, b) values (2, 2)") 

887 

888 result = conn.exec_driver_sql("select x.a, x.b from x") 

889 assert result.keys() == ["a", "b"] 

890 

891 result = conn.exec_driver_sql( 

892 """ 

893 select x.a, x.b from x where a=1 

894 union 

895 select x.a, x.b from x where a=2 

896 """ 

897 ) 

898 assert result.keys() == ["a", "b"] 

899 

900Note that above, even though SQLAlchemy filters out the dots, *both 

901names are still addressable*:: 

902 

903 >>> row = result.first() 

904 >>> row["a"] 

905 1 

906 >>> row["x.a"] 

907 1 

908 >>> row["b"] 

909 1 

910 >>> row["x.b"] 

911 1 

912 

913Therefore, the workaround applied by SQLAlchemy only impacts 

914:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In 

915the very specific case where an application is forced to use column names that 

916contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and 

917:meth:`.Row.keys()` is required to return these dotted names unmodified, 

918the ``sqlite_raw_colnames`` execution option may be provided, either on a 

919per-:class:`_engine.Connection` basis:: 

920 

921 result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql( 

922 """ 

923 select x.a, x.b from x where a=1 

924 union 

925 select x.a, x.b from x where a=2 

926 """ 

927 ) 

928 assert result.keys() == ["x.a", "x.b"] 

929 

930or on a per-:class:`_engine.Engine` basis:: 

931 

932 engine = create_engine( 

933 "sqlite://", execution_options={"sqlite_raw_colnames": True} 

934 ) 

935 

936When using the per-:class:`_engine.Engine` execution option, note that 

937**Core and ORM queries that use UNION may not function properly**. 

938 

939SQLite-specific table options 

940----------------------------- 

941 

942One option for CREATE TABLE is supported directly by the SQLite 

943dialect in conjunction with the :class:`_schema.Table` construct: 

944 

945* ``WITHOUT ROWID``:: 

946 

947 Table("some_table", metadata, ..., sqlite_with_rowid=False) 

948 

949* 

950 ``STRICT``:: 

951 

952 Table("some_table", metadata, ..., sqlite_strict=True) 

953 

954 .. versionadded:: 2.0.37 

955 

956.. seealso:: 

957 

958 `SQLite CREATE TABLE options 

959 <https://www.sqlite.org/lang_createtable.html>`_ 

960 

961.. _sqlite_include_internal: 

962 

963Reflecting internal schema tables 

964---------------------------------- 

965 

966Reflection methods that return lists of tables will omit so-called 

967"SQLite internal schema object" names, which are considered by SQLite 

968as any object name that is prefixed with ``sqlite_``. An example of 

969such an object is the ``sqlite_sequence`` table that's generated when 

970the ``AUTOINCREMENT`` column parameter is used. In order to return 

971these objects, the parameter ``sqlite_include_internal=True`` may be 

972passed to methods such as :meth:`_schema.MetaData.reflect` or 

973:meth:`.Inspector.get_table_names`. 

974 

975.. versionadded:: 2.0 Added the ``sqlite_include_internal=True`` parameter. 

976 Previously, these tables were not ignored by SQLAlchemy reflection 

977 methods. 

978 

979.. note:: 

980 

981 The ``sqlite_include_internal`` parameter does not refer to the 

982 "system" tables that are present in schemas such as ``sqlite_master``. 

983 

984.. seealso:: 

985 

986 `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite 

987 documentation. 

988 

989''' # noqa 

990from __future__ import annotations 

991 

992import datetime 

993import numbers 

994import re 

995from typing import Optional 

996 

997from .json import JSON 

998from .json import JSONIndexType 

999from .json import JSONPathType 

1000from ... import exc 

1001from ... import schema as sa_schema 

1002from ... import sql 

1003from ... import text 

1004from ... import types as sqltypes 

1005from ... import util 

1006from ...engine import default 

1007from ...engine import processors 

1008from ...engine import reflection 

1009from ...engine.reflection import ReflectionDefaults 

1010from ...sql import coercions 

1011from ...sql import compiler 

1012from ...sql import elements 

1013from ...sql import roles 

1014from ...sql import schema 

1015from ...types import BLOB # noqa 

1016from ...types import BOOLEAN # noqa 

1017from ...types import CHAR # noqa 

1018from ...types import DECIMAL # noqa 

1019from ...types import FLOAT # noqa 

1020from ...types import INTEGER # noqa 

1021from ...types import NUMERIC # noqa 

1022from ...types import REAL # noqa 

1023from ...types import SMALLINT # noqa 

1024from ...types import TEXT # noqa 

1025from ...types import TIMESTAMP # noqa 

1026from ...types import VARCHAR # noqa 

1027 

1028 

1029class _SQliteJson(JSON): 

1030 def result_processor(self, dialect, coltype): 

1031 default_processor = super().result_processor(dialect, coltype) 

1032 

1033 def process(value): 

1034 try: 

1035 return default_processor(value) 

1036 except TypeError: 

1037 if isinstance(value, numbers.Number): 

1038 return value 

1039 else: 

1040 raise 

1041 

1042 return process 

1043 

1044 

1045class _DateTimeMixin: 

1046 _reg = None 

1047 _storage_format = None 

1048 

1049 def __init__(self, storage_format=None, regexp=None, **kw): 

1050 super().__init__(**kw) 

1051 if regexp is not None: 

1052 self._reg = re.compile(regexp) 

1053 if storage_format is not None: 

1054 self._storage_format = storage_format 

1055 

1056 @property 

1057 def format_is_text_affinity(self): 

1058 """return True if the storage format will automatically imply 

1059 a TEXT affinity. 

1060 

1061 If the storage format contains no non-numeric characters, 

1062 it will imply a NUMERIC storage format on SQLite; in this case, 

1063 the type will generate its DDL as DATE_CHAR, DATETIME_CHAR, 

1064 TIME_CHAR. 

1065 

1066 """ 

1067 spec = self._storage_format % { 

1068 "year": 0, 

1069 "month": 0, 

1070 "day": 0, 

1071 "hour": 0, 

1072 "minute": 0, 

1073 "second": 0, 

1074 "microsecond": 0, 

1075 } 

1076 return bool(re.search(r"[^0-9]", spec)) 

1077 

1078 def adapt(self, cls, **kw): 

1079 if issubclass(cls, _DateTimeMixin): 

1080 if self._storage_format: 

1081 kw["storage_format"] = self._storage_format 

1082 if self._reg: 

1083 kw["regexp"] = self._reg 

1084 return super().adapt(cls, **kw) 

1085 

1086 def literal_processor(self, dialect): 

1087 bp = self.bind_processor(dialect) 

1088 

1089 def process(value): 

1090 return "'%s'" % bp(value) 

1091 

1092 return process 

1093 

1094 

1095class DATETIME(_DateTimeMixin, sqltypes.DateTime): 

1096 r"""Represent a Python datetime object in SQLite using a string. 

1097 

1098 The default string storage format is:: 

1099 

1100 "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1101 

1102 e.g.: 

1103 

1104 .. sourcecode:: text 

1105 

1106 2021-03-15 12:05:57.105542 

1107 

1108 The incoming storage format is by default parsed using the 

1109 Python ``datetime.fromisoformat()`` function. 

1110 

1111 .. versionchanged:: 2.0 ``datetime.fromisoformat()`` is used for default 

1112 datetime string parsing. 

1113 

1114 The storage format can be customized to some degree using the 

1115 ``storage_format`` and ``regexp`` parameters, such as:: 

1116 

1117 import re 

1118 from sqlalchemy.dialects.sqlite import DATETIME 

1119 

1120 dt = DATETIME( 

1121 storage_format=( 

1122 "%(year)04d/%(month)02d/%(day)02d %(hour)02d:%(minute)02d:%(second)02d" 

1123 ), 

1124 regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)", 

1125 ) 

1126 

1127 :param truncate_microseconds: when ``True`` microseconds will be truncated 

1128 from the datetime. Can't be specified together with ``storage_format`` 

1129 or ``regexp``. 

1130 

1131 :param storage_format: format string which will be applied to the dict 

1132 with keys year, month, day, hour, minute, second, and microsecond. 

1133 

1134 :param regexp: regular expression which will be applied to incoming result 

1135 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming 

1136 strings. If the regexp contains named groups, the resulting match dict is 

1137 applied to the Python datetime() constructor as keyword arguments. 

1138 Otherwise, if positional groups are used, the datetime() constructor 

1139 is called with positional arguments via 

1140 ``*map(int, match_obj.groups(0))``. 

1141 

1142 """ # noqa 

1143 

1144 _storage_format = ( 

1145 "%(year)04d-%(month)02d-%(day)02d " 

1146 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1147 ) 

1148 

1149 def __init__(self, *args, **kwargs): 

1150 truncate_microseconds = kwargs.pop("truncate_microseconds", False) 

1151 super().__init__(*args, **kwargs) 

1152 if truncate_microseconds: 

1153 assert "storage_format" not in kwargs, ( 

1154 "You can specify only " 

1155 "one of truncate_microseconds or storage_format." 

1156 ) 

1157 assert "regexp" not in kwargs, ( 

1158 "You can specify only one of " 

1159 "truncate_microseconds or regexp." 

1160 ) 

1161 self._storage_format = ( 

1162 "%(year)04d-%(month)02d-%(day)02d " 

1163 "%(hour)02d:%(minute)02d:%(second)02d" 

1164 ) 

1165 

1166 def bind_processor(self, dialect): 

1167 datetime_datetime = datetime.datetime 

1168 datetime_date = datetime.date 

1169 format_ = self._storage_format 

1170 

1171 def process(value): 

1172 if value is None: 

1173 return None 

1174 elif isinstance(value, datetime_datetime): 

1175 return format_ % { 

1176 "year": value.year, 

1177 "month": value.month, 

1178 "day": value.day, 

1179 "hour": value.hour, 

1180 "minute": value.minute, 

1181 "second": value.second, 

1182 "microsecond": value.microsecond, 

1183 } 

1184 elif isinstance(value, datetime_date): 

1185 return format_ % { 

1186 "year": value.year, 

1187 "month": value.month, 

1188 "day": value.day, 

1189 "hour": 0, 

1190 "minute": 0, 

1191 "second": 0, 

1192 "microsecond": 0, 

1193 } 

1194 else: 

1195 raise TypeError( 

1196 "SQLite DateTime type only accepts Python " 

1197 "datetime and date objects as input." 

1198 ) 

1199 

1200 return process 

1201 

1202 def result_processor(self, dialect, coltype): 

1203 if self._reg: 

1204 return processors.str_to_datetime_processor_factory( 

1205 self._reg, datetime.datetime 

1206 ) 

1207 else: 

1208 return processors.str_to_datetime 

1209 

1210 

1211class DATE(_DateTimeMixin, sqltypes.Date): 

1212 r"""Represent a Python date object in SQLite using a string. 

1213 

1214 The default string storage format is:: 

1215 

1216 "%(year)04d-%(month)02d-%(day)02d" 

1217 

1218 e.g.: 

1219 

1220 .. sourcecode:: text 

1221 

1222 2011-03-15 

1223 

1224 The incoming storage format is by default parsed using the 

1225 Python ``date.fromisoformat()`` function. 

1226 

1227 .. versionchanged:: 2.0 ``date.fromisoformat()`` is used for default 

1228 date string parsing. 

1229 

1230 

1231 The storage format can be customized to some degree using the 

1232 ``storage_format`` and ``regexp`` parameters, such as:: 

1233 

1234 import re 

1235 from sqlalchemy.dialects.sqlite import DATE 

1236 

1237 d = DATE( 

1238 storage_format="%(month)02d/%(day)02d/%(year)04d", 

1239 regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)"), 

1240 ) 

1241 

1242 :param storage_format: format string which will be applied to the 

1243 dict with keys year, month, and day. 

1244 

1245 :param regexp: regular expression which will be applied to 

1246 incoming result rows, replacing the use of ``date.fromisoformat()`` to 

1247 parse incoming strings. If the regexp contains named groups, the resulting 

1248 match dict is applied to the Python date() constructor as keyword 

1249 arguments. Otherwise, if positional groups are used, the date() 

1250 constructor is called with positional arguments via 

1251 ``*map(int, match_obj.groups(0))``. 

1252 

1253 """ 

1254 

1255 _storage_format = "%(year)04d-%(month)02d-%(day)02d" 

1256 

1257 def bind_processor(self, dialect): 

1258 datetime_date = datetime.date 

1259 format_ = self._storage_format 

1260 

1261 def process(value): 

1262 if value is None: 

1263 return None 

1264 elif isinstance(value, datetime_date): 

1265 return format_ % { 

1266 "year": value.year, 

1267 "month": value.month, 

1268 "day": value.day, 

1269 } 

1270 else: 

1271 raise TypeError( 

1272 "SQLite Date type only accepts Python " 

1273 "date objects as input." 

1274 ) 

1275 

1276 return process 

1277 

1278 def result_processor(self, dialect, coltype): 

1279 if self._reg: 

1280 return processors.str_to_datetime_processor_factory( 

1281 self._reg, datetime.date 

1282 ) 

1283 else: 

1284 return processors.str_to_date 

1285 

1286 

1287class TIME(_DateTimeMixin, sqltypes.Time): 

1288 r"""Represent a Python time object in SQLite using a string. 

1289 

1290 The default string storage format is:: 

1291 

1292 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1293 

1294 e.g.: 

1295 

1296 .. sourcecode:: text 

1297 

1298 12:05:57.10558 

1299 

1300 The incoming storage format is by default parsed using the 

1301 Python ``time.fromisoformat()`` function. 

1302 

1303 .. versionchanged:: 2.0 ``time.fromisoformat()`` is used for default 

1304 time string parsing. 

1305 

1306 The storage format can be customized to some degree using the 

1307 ``storage_format`` and ``regexp`` parameters, such as:: 

1308 

1309 import re 

1310 from sqlalchemy.dialects.sqlite import TIME 

1311 

1312 t = TIME( 

1313 storage_format="%(hour)02d-%(minute)02d-%(second)02d-%(microsecond)06d", 

1314 regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?"), 

1315 ) 

1316 

1317 :param truncate_microseconds: when ``True`` microseconds will be truncated 

1318 from the time. Can't be specified together with ``storage_format`` 

1319 or ``regexp``. 

1320 

1321 :param storage_format: format string which will be applied to the dict 

1322 with keys hour, minute, second, and microsecond. 

1323 

1324 :param regexp: regular expression which will be applied to incoming result 

1325 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming 

1326 strings. If the regexp contains named groups, the resulting match dict is 

1327 applied to the Python time() constructor as keyword arguments. Otherwise, 

1328 if positional groups are used, the time() constructor is called with 

1329 positional arguments via ``*map(int, match_obj.groups(0))``. 

1330 

1331 """ 

1332 

1333 _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 

1334 

1335 def __init__(self, *args, **kwargs): 

1336 truncate_microseconds = kwargs.pop("truncate_microseconds", False) 

1337 super().__init__(*args, **kwargs) 

1338 if truncate_microseconds: 

1339 assert "storage_format" not in kwargs, ( 

1340 "You can specify only " 

1341 "one of truncate_microseconds or storage_format." 

1342 ) 

1343 assert "regexp" not in kwargs, ( 

1344 "You can specify only one of " 

1345 "truncate_microseconds or regexp." 

1346 ) 

1347 self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d" 

1348 

1349 def bind_processor(self, dialect): 

1350 datetime_time = datetime.time 

1351 format_ = self._storage_format 

1352 

1353 def process(value): 

1354 if value is None: 

1355 return None 

1356 elif isinstance(value, datetime_time): 

1357 return format_ % { 

1358 "hour": value.hour, 

1359 "minute": value.minute, 

1360 "second": value.second, 

1361 "microsecond": value.microsecond, 

1362 } 

1363 else: 

1364 raise TypeError( 

1365 "SQLite Time type only accepts Python " 

1366 "time objects as input." 

1367 ) 

1368 

1369 return process 

1370 

1371 def result_processor(self, dialect, coltype): 

1372 if self._reg: 

1373 return processors.str_to_datetime_processor_factory( 

1374 self._reg, datetime.time 

1375 ) 

1376 else: 

1377 return processors.str_to_time 

1378 

1379 

1380colspecs = { 

1381 sqltypes.Date: DATE, 

1382 sqltypes.DateTime: DATETIME, 

1383 sqltypes.JSON: _SQliteJson, 

1384 sqltypes.JSON.JSONIndexType: JSONIndexType, 

1385 sqltypes.JSON.JSONPathType: JSONPathType, 

1386 sqltypes.Time: TIME, 

1387} 

1388 

1389ischema_names = { 

1390 "BIGINT": sqltypes.BIGINT, 

1391 "BLOB": sqltypes.BLOB, 

1392 "BOOL": sqltypes.BOOLEAN, 

1393 "BOOLEAN": sqltypes.BOOLEAN, 

1394 "CHAR": sqltypes.CHAR, 

1395 "DATE": sqltypes.DATE, 

1396 "DATE_CHAR": sqltypes.DATE, 

1397 "DATETIME": sqltypes.DATETIME, 

1398 "DATETIME_CHAR": sqltypes.DATETIME, 

1399 "DOUBLE": sqltypes.DOUBLE, 

1400 "DECIMAL": sqltypes.DECIMAL, 

1401 "FLOAT": sqltypes.FLOAT, 

1402 "INT": sqltypes.INTEGER, 

1403 "INTEGER": sqltypes.INTEGER, 

1404 "JSON": JSON, 

1405 "NUMERIC": sqltypes.NUMERIC, 

1406 "REAL": sqltypes.REAL, 

1407 "SMALLINT": sqltypes.SMALLINT, 

1408 "TEXT": sqltypes.TEXT, 

1409 "TIME": sqltypes.TIME, 

1410 "TIME_CHAR": sqltypes.TIME, 

1411 "TIMESTAMP": sqltypes.TIMESTAMP, 

1412 "VARCHAR": sqltypes.VARCHAR, 

1413 "NVARCHAR": sqltypes.NVARCHAR, 

1414 "NCHAR": sqltypes.NCHAR, 

1415} 

1416 

1417 

1418class SQLiteCompiler(compiler.SQLCompiler): 

1419 extract_map = util.update_copy( 

1420 compiler.SQLCompiler.extract_map, 

1421 { 

1422 "month": "%m", 

1423 "day": "%d", 

1424 "year": "%Y", 

1425 "second": "%S", 

1426 "hour": "%H", 

1427 "doy": "%j", 

1428 "minute": "%M", 

1429 "epoch": "%s", 

1430 "dow": "%w", 

1431 "week": "%W", 

1432 }, 

1433 ) 

1434 

1435 def visit_truediv_binary(self, binary, operator, **kw): 

1436 return ( 

1437 self.process(binary.left, **kw) 

1438 + " / " 

1439 + "(%s + 0.0)" % self.process(binary.right, **kw) 

1440 ) 

1441 

1442 def visit_now_func(self, fn, **kw): 

1443 return "CURRENT_TIMESTAMP" 

1444 

1445 def visit_localtimestamp_func(self, func, **kw): 

1446 return "DATETIME(CURRENT_TIMESTAMP, 'localtime')" 

1447 

1448 def visit_true(self, expr, **kw): 

1449 return "1" 

1450 

1451 def visit_false(self, expr, **kw): 

1452 return "0" 

1453 

1454 def visit_char_length_func(self, fn, **kw): 

1455 return "length%s" % self.function_argspec(fn) 

1456 

1457 def visit_aggregate_strings_func(self, fn, **kw): 

1458 return "group_concat%s" % self.function_argspec(fn) 

1459 

1460 def visit_cast(self, cast, **kwargs): 

1461 if self.dialect.supports_cast: 

1462 return super().visit_cast(cast, **kwargs) 

1463 else: 

1464 return self.process(cast.clause, **kwargs) 

1465 

1466 def visit_extract(self, extract, **kw): 

1467 try: 

1468 return "CAST(STRFTIME('%s', %s) AS INTEGER)" % ( 

1469 self.extract_map[extract.field], 

1470 self.process(extract.expr, **kw), 

1471 ) 

1472 except KeyError as err: 

1473 raise exc.CompileError( 

1474 "%s is not a valid extract argument." % extract.field 

1475 ) from err 

1476 

1477 def returning_clause( 

1478 self, 

1479 stmt, 

1480 returning_cols, 

1481 *, 

1482 populate_result_map, 

1483 **kw, 

1484 ): 

1485 kw["include_table"] = False 

1486 return super().returning_clause( 

1487 stmt, returning_cols, populate_result_map=populate_result_map, **kw 

1488 ) 

1489 

1490 def limit_clause(self, select, **kw): 

1491 text = "" 

1492 if select._limit_clause is not None: 

1493 text += "\n LIMIT " + self.process(select._limit_clause, **kw) 

1494 if select._offset_clause is not None: 

1495 if select._limit_clause is None: 

1496 text += "\n LIMIT " + self.process(sql.literal(-1)) 

1497 text += " OFFSET " + self.process(select._offset_clause, **kw) 

1498 else: 

1499 text += " OFFSET " + self.process(sql.literal(0), **kw) 

1500 return text 

1501 

1502 def for_update_clause(self, select, **kw): 

1503 # sqlite has no "FOR UPDATE" AFAICT 

1504 return "" 

1505 

1506 def update_from_clause( 

1507 self, update_stmt, from_table, extra_froms, from_hints, **kw 

1508 ): 

1509 kw["asfrom"] = True 

1510 return "FROM " + ", ".join( 

1511 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

1512 for t in extra_froms 

1513 ) 

1514 

1515 def visit_is_distinct_from_binary(self, binary, operator, **kw): 

1516 return "%s IS NOT %s" % ( 

1517 self.process(binary.left), 

1518 self.process(binary.right), 

1519 ) 

1520 

1521 def visit_is_not_distinct_from_binary(self, binary, operator, **kw): 

1522 return "%s IS %s" % ( 

1523 self.process(binary.left), 

1524 self.process(binary.right), 

1525 ) 

1526 

1527 def visit_json_getitem_op_binary(self, binary, operator, **kw): 

1528 if binary.type._type_affinity is sqltypes.JSON: 

1529 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" 

1530 else: 

1531 expr = "JSON_EXTRACT(%s, %s)" 

1532 

1533 return expr % ( 

1534 self.process(binary.left, **kw), 

1535 self.process(binary.right, **kw), 

1536 ) 

1537 

1538 def visit_json_path_getitem_op_binary(self, binary, operator, **kw): 

1539 if binary.type._type_affinity is sqltypes.JSON: 

1540 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" 

1541 else: 

1542 expr = "JSON_EXTRACT(%s, %s)" 

1543 

1544 return expr % ( 

1545 self.process(binary.left, **kw), 

1546 self.process(binary.right, **kw), 

1547 ) 

1548 

1549 def visit_empty_set_op_expr(self, type_, expand_op, **kw): 

1550 # slightly old SQLite versions don't seem to be able to handle 

1551 # the empty set impl 

1552 return self.visit_empty_set_expr(type_) 

1553 

1554 def visit_empty_set_expr(self, element_types, **kw): 

1555 return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % ( 

1556 ", ".join("1" for type_ in element_types or [INTEGER()]), 

1557 ", ".join("1" for type_ in element_types or [INTEGER()]), 

1558 ) 

1559 

1560 def visit_regexp_match_op_binary(self, binary, operator, **kw): 

1561 return self._generate_generic_binary(binary, " REGEXP ", **kw) 

1562 

1563 def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 

1564 return self._generate_generic_binary(binary, " NOT REGEXP ", **kw) 

1565 

1566 def _on_conflict_target(self, clause, **kw): 

1567 if clause.inferred_target_elements is not None: 

1568 target_text = "(%s)" % ", ".join( 

1569 ( 

1570 self.preparer.quote(c) 

1571 if isinstance(c, str) 

1572 else self.process(c, include_table=False, use_schema=False) 

1573 ) 

1574 for c in clause.inferred_target_elements 

1575 ) 

1576 if clause.inferred_target_whereclause is not None: 

1577 target_text += " WHERE %s" % self.process( 

1578 clause.inferred_target_whereclause, 

1579 include_table=False, 

1580 use_schema=False, 

1581 literal_execute=True, 

1582 ) 

1583 

1584 else: 

1585 target_text = "" 

1586 

1587 return target_text 

1588 

1589 def visit_on_conflict_do_nothing(self, on_conflict, **kw): 

1590 target_text = self._on_conflict_target(on_conflict, **kw) 

1591 

1592 if target_text: 

1593 return "ON CONFLICT %s DO NOTHING" % target_text 

1594 else: 

1595 return "ON CONFLICT DO NOTHING" 

1596 

1597 def visit_on_conflict_do_update(self, on_conflict, **kw): 

1598 clause = on_conflict 

1599 

1600 target_text = self._on_conflict_target(on_conflict, **kw) 

1601 

1602 action_set_ops = [] 

1603 

1604 set_parameters = dict(clause.update_values_to_set) 

1605 # create a list of column assignment clauses as tuples 

1606 

1607 insert_statement = self.stack[-1]["selectable"] 

1608 cols = insert_statement.table.c 

1609 for c in cols: 

1610 col_key = c.key 

1611 

1612 if col_key in set_parameters: 

1613 value = set_parameters.pop(col_key) 

1614 elif c in set_parameters: 

1615 value = set_parameters.pop(c) 

1616 else: 

1617 continue 

1618 

1619 if coercions._is_literal(value): 

1620 value = elements.BindParameter(None, value, type_=c.type) 

1621 

1622 else: 

1623 if ( 

1624 isinstance(value, elements.BindParameter) 

1625 and value.type._isnull 

1626 ): 

1627 value = value._clone() 

1628 value.type = c.type 

1629 value_text = self.process(value.self_group(), use_schema=False) 

1630 

1631 key_text = self.preparer.quote(c.name) 

1632 action_set_ops.append("%s = %s" % (key_text, value_text)) 

1633 

1634 # check for names that don't match columns 

1635 if set_parameters: 

1636 util.warn( 

1637 "Additional column names not matching " 

1638 "any column keys in table '%s': %s" 

1639 % ( 

1640 self.current_executable.table.name, 

1641 (", ".join("'%s'" % c for c in set_parameters)), 

1642 ) 

1643 ) 

1644 for k, v in set_parameters.items(): 

1645 key_text = ( 

1646 self.preparer.quote(k) 

1647 if isinstance(k, str) 

1648 else self.process(k, use_schema=False) 

1649 ) 

1650 value_text = self.process( 

1651 coercions.expect(roles.ExpressionElementRole, v), 

1652 use_schema=False, 

1653 ) 

1654 action_set_ops.append("%s = %s" % (key_text, value_text)) 

1655 

1656 action_text = ", ".join(action_set_ops) 

1657 if clause.update_whereclause is not None: 

1658 action_text += " WHERE %s" % self.process( 

1659 clause.update_whereclause, include_table=True, use_schema=False 

1660 ) 

1661 

1662 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) 

1663 

1664 def visit_bitwise_xor_op_binary(self, binary, operator, **kw): 

1665 # sqlite has no xor. Use "a XOR b" = "(a | b) - (a & b)". 

1666 kw["eager_grouping"] = True 

1667 or_ = self._generate_generic_binary(binary, " | ", **kw) 

1668 and_ = self._generate_generic_binary(binary, " & ", **kw) 

1669 return f"({or_} - {and_})" 

1670 

1671 

1672class SQLiteDDLCompiler(compiler.DDLCompiler): 

1673 def get_column_specification(self, column, **kwargs): 

1674 coltype = self.dialect.type_compiler_instance.process( 

1675 column.type, type_expression=column 

1676 ) 

1677 colspec = self.preparer.format_column(column) + " " + coltype 

1678 default = self.get_column_default_string(column) 

1679 if default is not None: 

1680 

1681 if not re.match(r"""^\s*[\'\"\(]""", default) and re.match( 

1682 r".*\W.*", default 

1683 ): 

1684 colspec += f" DEFAULT ({default})" 

1685 else: 

1686 colspec += f" DEFAULT {default}" 

1687 

1688 if not column.nullable: 

1689 colspec += " NOT NULL" 

1690 

1691 on_conflict_clause = column.dialect_options["sqlite"][ 

1692 "on_conflict_not_null" 

1693 ] 

1694 if on_conflict_clause is not None: 

1695 colspec += " ON CONFLICT " + on_conflict_clause 

1696 

1697 if column.primary_key: 

1698 if ( 

1699 column.autoincrement is True 

1700 and len(column.table.primary_key.columns) != 1 

1701 ): 

1702 raise exc.CompileError( 

1703 "SQLite does not support autoincrement for " 

1704 "composite primary keys" 

1705 ) 

1706 

1707 if ( 

1708 column.table.dialect_options["sqlite"]["autoincrement"] 

1709 and len(column.table.primary_key.columns) == 1 

1710 and issubclass(column.type._type_affinity, sqltypes.Integer) 

1711 and not column.foreign_keys 

1712 ): 

1713 colspec += " PRIMARY KEY" 

1714 

1715 on_conflict_clause = column.dialect_options["sqlite"][ 

1716 "on_conflict_primary_key" 

1717 ] 

1718 if on_conflict_clause is not None: 

1719 colspec += " ON CONFLICT " + on_conflict_clause 

1720 

1721 colspec += " AUTOINCREMENT" 

1722 

1723 if column.computed is not None: 

1724 colspec += " " + self.process(column.computed) 

1725 

1726 return colspec 

1727 

1728 def visit_primary_key_constraint(self, constraint, **kw): 

1729 # for columns with sqlite_autoincrement=True, 

1730 # the PRIMARY KEY constraint can only be inline 

1731 # with the column itself. 

1732 if len(constraint.columns) == 1: 

1733 c = list(constraint)[0] 

1734 if ( 

1735 c.primary_key 

1736 and c.table.dialect_options["sqlite"]["autoincrement"] 

1737 and issubclass(c.type._type_affinity, sqltypes.Integer) 

1738 and not c.foreign_keys 

1739 ): 

1740 return None 

1741 

1742 text = super().visit_primary_key_constraint(constraint) 

1743 

1744 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1745 "on_conflict" 

1746 ] 

1747 if on_conflict_clause is None and len(constraint.columns) == 1: 

1748 on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][ 

1749 "on_conflict_primary_key" 

1750 ] 

1751 

1752 if on_conflict_clause is not None: 

1753 text += " ON CONFLICT " + on_conflict_clause 

1754 

1755 return text 

1756 

1757 def visit_unique_constraint(self, constraint, **kw): 

1758 text = super().visit_unique_constraint(constraint) 

1759 

1760 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1761 "on_conflict" 

1762 ] 

1763 if on_conflict_clause is None and len(constraint.columns) == 1: 

1764 col1 = list(constraint)[0] 

1765 if isinstance(col1, schema.SchemaItem): 

1766 on_conflict_clause = list(constraint)[0].dialect_options[ 

1767 "sqlite" 

1768 ]["on_conflict_unique"] 

1769 

1770 if on_conflict_clause is not None: 

1771 text += " ON CONFLICT " + on_conflict_clause 

1772 

1773 return text 

1774 

1775 def visit_check_constraint(self, constraint, **kw): 

1776 text = super().visit_check_constraint(constraint) 

1777 

1778 on_conflict_clause = constraint.dialect_options["sqlite"][ 

1779 "on_conflict" 

1780 ] 

1781 

1782 if on_conflict_clause is not None: 

1783 text += " ON CONFLICT " + on_conflict_clause 

1784 

1785 return text 

1786 

1787 def visit_column_check_constraint(self, constraint, **kw): 

1788 text = super().visit_column_check_constraint(constraint) 

1789 

1790 if constraint.dialect_options["sqlite"]["on_conflict"] is not None: 

1791 raise exc.CompileError( 

1792 "SQLite does not support on conflict clause for " 

1793 "column check constraint" 

1794 ) 

1795 

1796 return text 

1797 

1798 def visit_foreign_key_constraint(self, constraint, **kw): 

1799 local_table = constraint.elements[0].parent.table 

1800 remote_table = constraint.elements[0].column.table 

1801 

1802 if local_table.schema != remote_table.schema: 

1803 return None 

1804 else: 

1805 return super().visit_foreign_key_constraint(constraint) 

1806 

1807 def define_constraint_remote_table(self, constraint, table, preparer): 

1808 """Format the remote table clause of a CREATE CONSTRAINT clause.""" 

1809 

1810 return preparer.format_table(table, use_schema=False) 

1811 

1812 def visit_create_index( 

1813 self, create, include_schema=False, include_table_schema=True, **kw 

1814 ): 

1815 index = create.element 

1816 self._verify_index_table(index) 

1817 preparer = self.preparer 

1818 text = "CREATE " 

1819 if index.unique: 

1820 text += "UNIQUE " 

1821 

1822 text += "INDEX " 

1823 

1824 if create.if_not_exists: 

1825 text += "IF NOT EXISTS " 

1826 

1827 text += "%s ON %s (%s)" % ( 

1828 self._prepared_index_name(index, include_schema=True), 

1829 preparer.format_table(index.table, use_schema=False), 

1830 ", ".join( 

1831 self.sql_compiler.process( 

1832 expr, include_table=False, literal_binds=True 

1833 ) 

1834 for expr in index.expressions 

1835 ), 

1836 ) 

1837 

1838 whereclause = index.dialect_options["sqlite"]["where"] 

1839 if whereclause is not None: 

1840 where_compiled = self.sql_compiler.process( 

1841 whereclause, include_table=False, literal_binds=True 

1842 ) 

1843 text += " WHERE " + where_compiled 

1844 

1845 return text 

1846 

1847 def post_create_table(self, table): 

1848 table_options = [] 

1849 

1850 if not table.dialect_options["sqlite"]["with_rowid"]: 

1851 table_options.append("WITHOUT ROWID") 

1852 

1853 if table.dialect_options["sqlite"]["strict"]: 

1854 table_options.append("STRICT") 

1855 

1856 if table_options: 

1857 return "\n " + ",\n ".join(table_options) 

1858 else: 

1859 return "" 

1860 

1861 

1862class SQLiteTypeCompiler(compiler.GenericTypeCompiler): 

1863 def visit_large_binary(self, type_, **kw): 

1864 return self.visit_BLOB(type_) 

1865 

1866 def visit_DATETIME(self, type_, **kw): 

1867 if ( 

1868 not isinstance(type_, _DateTimeMixin) 

1869 or type_.format_is_text_affinity 

1870 ): 

1871 return super().visit_DATETIME(type_) 

1872 else: 

1873 return "DATETIME_CHAR" 

1874 

1875 def visit_DATE(self, type_, **kw): 

1876 if ( 

1877 not isinstance(type_, _DateTimeMixin) 

1878 or type_.format_is_text_affinity 

1879 ): 

1880 return super().visit_DATE(type_) 

1881 else: 

1882 return "DATE_CHAR" 

1883 

1884 def visit_TIME(self, type_, **kw): 

1885 if ( 

1886 not isinstance(type_, _DateTimeMixin) 

1887 or type_.format_is_text_affinity 

1888 ): 

1889 return super().visit_TIME(type_) 

1890 else: 

1891 return "TIME_CHAR" 

1892 

1893 def visit_JSON(self, type_, **kw): 

1894 # note this name provides NUMERIC affinity, not TEXT. 

1895 # should not be an issue unless the JSON value consists of a single 

1896 # numeric value. JSONTEXT can be used if this case is required. 

1897 return "JSON" 

1898 

1899 

1900class SQLiteIdentifierPreparer(compiler.IdentifierPreparer): 

1901 reserved_words = { 

1902 "add", 

1903 "after", 

1904 "all", 

1905 "alter", 

1906 "analyze", 

1907 "and", 

1908 "as", 

1909 "asc", 

1910 "attach", 

1911 "autoincrement", 

1912 "before", 

1913 "begin", 

1914 "between", 

1915 "by", 

1916 "cascade", 

1917 "case", 

1918 "cast", 

1919 "check", 

1920 "collate", 

1921 "column", 

1922 "commit", 

1923 "conflict", 

1924 "constraint", 

1925 "create", 

1926 "cross", 

1927 "current_date", 

1928 "current_time", 

1929 "current_timestamp", 

1930 "database", 

1931 "default", 

1932 "deferrable", 

1933 "deferred", 

1934 "delete", 

1935 "desc", 

1936 "detach", 

1937 "distinct", 

1938 "drop", 

1939 "each", 

1940 "else", 

1941 "end", 

1942 "escape", 

1943 "except", 

1944 "exclusive", 

1945 "exists", 

1946 "explain", 

1947 "false", 

1948 "fail", 

1949 "for", 

1950 "foreign", 

1951 "from", 

1952 "full", 

1953 "glob", 

1954 "group", 

1955 "having", 

1956 "if", 

1957 "ignore", 

1958 "immediate", 

1959 "in", 

1960 "index", 

1961 "indexed", 

1962 "initially", 

1963 "inner", 

1964 "insert", 

1965 "instead", 

1966 "intersect", 

1967 "into", 

1968 "is", 

1969 "isnull", 

1970 "join", 

1971 "key", 

1972 "left", 

1973 "like", 

1974 "limit", 

1975 "match", 

1976 "natural", 

1977 "not", 

1978 "notnull", 

1979 "null", 

1980 "of", 

1981 "offset", 

1982 "on", 

1983 "or", 

1984 "order", 

1985 "outer", 

1986 "plan", 

1987 "pragma", 

1988 "primary", 

1989 "query", 

1990 "raise", 

1991 "references", 

1992 "reindex", 

1993 "rename", 

1994 "replace", 

1995 "restrict", 

1996 "right", 

1997 "rollback", 

1998 "row", 

1999 "select", 

2000 "set", 

2001 "table", 

2002 "temp", 

2003 "temporary", 

2004 "then", 

2005 "to", 

2006 "transaction", 

2007 "trigger", 

2008 "true", 

2009 "union", 

2010 "unique", 

2011 "update", 

2012 "using", 

2013 "vacuum", 

2014 "values", 

2015 "view", 

2016 "virtual", 

2017 "when", 

2018 "where", 

2019 } 

2020 

2021 

2022class SQLiteExecutionContext(default.DefaultExecutionContext): 

2023 @util.memoized_property 

2024 def _preserve_raw_colnames(self): 

2025 return ( 

2026 not self.dialect._broken_dotted_colnames 

2027 or self.execution_options.get("sqlite_raw_colnames", False) 

2028 ) 

2029 

2030 def _translate_colname(self, colname): 

2031 # TODO: detect SQLite version 3.10.0 or greater; 

2032 # see [ticket:3633] 

2033 

2034 # adjust for dotted column names. SQLite 

2035 # in the case of UNION may store col names as 

2036 # "tablename.colname", or if using an attached database, 

2037 # "database.tablename.colname", in cursor.description 

2038 if not self._preserve_raw_colnames and "." in colname: 

2039 return colname.split(".")[-1], colname 

2040 else: 

2041 return colname, None 

2042 

2043 

2044class SQLiteDialect(default.DefaultDialect): 

2045 name = "sqlite" 

2046 supports_alter = False 

2047 

2048 # SQlite supports "DEFAULT VALUES" but *does not* support 

2049 # "VALUES (DEFAULT)" 

2050 supports_default_values = True 

2051 supports_default_metavalue = False 

2052 

2053 # sqlite issue: 

2054 # https://github.com/python/cpython/issues/93421 

2055 # note this parameter is no longer used by the ORM or default dialect 

2056 # see #9414 

2057 supports_sane_rowcount_returning = False 

2058 

2059 supports_empty_insert = False 

2060 supports_cast = True 

2061 supports_multivalues_insert = True 

2062 use_insertmanyvalues = True 

2063 tuple_in_values = True 

2064 supports_statement_cache = True 

2065 insert_null_pk_still_autoincrements = True 

2066 insert_returning = True 

2067 update_returning = True 

2068 update_returning_multifrom = True 

2069 delete_returning = True 

2070 update_returning_multifrom = True 

2071 

2072 supports_default_metavalue = True 

2073 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

2074 

2075 default_metavalue_token = "NULL" 

2076 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

2077 parenthesis.""" 

2078 

2079 default_paramstyle = "qmark" 

2080 execution_ctx_cls = SQLiteExecutionContext 

2081 statement_compiler = SQLiteCompiler 

2082 ddl_compiler = SQLiteDDLCompiler 

2083 type_compiler_cls = SQLiteTypeCompiler 

2084 preparer = SQLiteIdentifierPreparer 

2085 ischema_names = ischema_names 

2086 colspecs = colspecs 

2087 

2088 construct_arguments = [ 

2089 ( 

2090 sa_schema.Table, 

2091 { 

2092 "autoincrement": False, 

2093 "with_rowid": True, 

2094 "strict": False, 

2095 }, 

2096 ), 

2097 (sa_schema.Index, {"where": None}), 

2098 ( 

2099 sa_schema.Column, 

2100 { 

2101 "on_conflict_primary_key": None, 

2102 "on_conflict_not_null": None, 

2103 "on_conflict_unique": None, 

2104 }, 

2105 ), 

2106 (sa_schema.Constraint, {"on_conflict": None}), 

2107 ] 

2108 

2109 _broken_fk_pragma_quotes = False 

2110 _broken_dotted_colnames = False 

2111 

2112 @util.deprecated_params( 

2113 _json_serializer=( 

2114 "1.3.7", 

2115 "The _json_serializer argument to the SQLite dialect has " 

2116 "been renamed to the correct name of json_serializer. The old " 

2117 "argument name will be removed in a future release.", 

2118 ), 

2119 _json_deserializer=( 

2120 "1.3.7", 

2121 "The _json_deserializer argument to the SQLite dialect has " 

2122 "been renamed to the correct name of json_deserializer. The old " 

2123 "argument name will be removed in a future release.", 

2124 ), 

2125 ) 

2126 def __init__( 

2127 self, 

2128 native_datetime=False, 

2129 json_serializer=None, 

2130 json_deserializer=None, 

2131 _json_serializer=None, 

2132 _json_deserializer=None, 

2133 **kwargs, 

2134 ): 

2135 default.DefaultDialect.__init__(self, **kwargs) 

2136 

2137 if _json_serializer: 

2138 json_serializer = _json_serializer 

2139 if _json_deserializer: 

2140 json_deserializer = _json_deserializer 

2141 self._json_serializer = json_serializer 

2142 self._json_deserializer = json_deserializer 

2143 

2144 # this flag used by pysqlite dialect, and perhaps others in the 

2145 # future, to indicate the driver is handling date/timestamp 

2146 # conversions (and perhaps datetime/time as well on some hypothetical 

2147 # driver ?) 

2148 self.native_datetime = native_datetime 

2149 

2150 if self.dbapi is not None: 

2151 if self.dbapi.sqlite_version_info < (3, 7, 16): 

2152 util.warn( 

2153 "SQLite version %s is older than 3.7.16, and will not " 

2154 "support right nested joins, as are sometimes used in " 

2155 "more complex ORM scenarios. SQLAlchemy 1.4 and above " 

2156 "no longer tries to rewrite these joins." 

2157 % (self.dbapi.sqlite_version_info,) 

2158 ) 

2159 

2160 # NOTE: python 3.7 on fedora for me has SQLite 3.34.1. These 

2161 # version checks are getting very stale. 

2162 self._broken_dotted_colnames = self.dbapi.sqlite_version_info < ( 

2163 3, 

2164 10, 

2165 0, 

2166 ) 

2167 self.supports_default_values = self.dbapi.sqlite_version_info >= ( 

2168 3, 

2169 3, 

2170 8, 

2171 ) 

2172 self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3) 

2173 self.supports_multivalues_insert = ( 

2174 # https://www.sqlite.org/releaselog/3_7_11.html 

2175 self.dbapi.sqlite_version_info 

2176 >= (3, 7, 11) 

2177 ) 

2178 # see https://www.sqlalchemy.org/trac/ticket/2568 

2179 # as well as https://www.sqlite.org/src/info/600482d161 

2180 self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < ( 

2181 3, 

2182 6, 

2183 14, 

2184 ) 

2185 

2186 if self.dbapi.sqlite_version_info < (3, 35) or util.pypy: 

2187 self.update_returning = self.delete_returning = ( 

2188 self.insert_returning 

2189 ) = False 

2190 

2191 if self.dbapi.sqlite_version_info < (3, 32, 0): 

2192 # https://www.sqlite.org/limits.html 

2193 self.insertmanyvalues_max_parameters = 999 

2194 

2195 _isolation_lookup = util.immutabledict( 

2196 {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0} 

2197 ) 

2198 

2199 def get_isolation_level_values(self, dbapi_connection): 

2200 return list(self._isolation_lookup) 

2201 

2202 def set_isolation_level(self, dbapi_connection, level): 

2203 isolation_level = self._isolation_lookup[level] 

2204 

2205 cursor = dbapi_connection.cursor() 

2206 cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}") 

2207 cursor.close() 

2208 

2209 def get_isolation_level(self, dbapi_connection): 

2210 cursor = dbapi_connection.cursor() 

2211 cursor.execute("PRAGMA read_uncommitted") 

2212 res = cursor.fetchone() 

2213 if res: 

2214 value = res[0] 

2215 else: 

2216 # https://www.sqlite.org/changes.html#version_3_3_3 

2217 # "Optional READ UNCOMMITTED isolation (instead of the 

2218 # default isolation level of SERIALIZABLE) and 

2219 # table level locking when database connections 

2220 # share a common cache."" 

2221 # pre-SQLite 3.3.0 default to 0 

2222 value = 0 

2223 cursor.close() 

2224 if value == 0: 

2225 return "SERIALIZABLE" 

2226 elif value == 1: 

2227 return "READ UNCOMMITTED" 

2228 else: 

2229 assert False, "Unknown isolation level %s" % value 

2230 

2231 @reflection.cache 

2232 def get_schema_names(self, connection, **kw): 

2233 s = "PRAGMA database_list" 

2234 dl = connection.exec_driver_sql(s) 

2235 

2236 return [db[1] for db in dl if db[1] != "temp"] 

2237 

2238 def _format_schema(self, schema, table_name): 

2239 if schema is not None: 

2240 qschema = self.identifier_preparer.quote_identifier(schema) 

2241 name = f"{qschema}.{table_name}" 

2242 else: 

2243 name = table_name 

2244 return name 

2245 

2246 def _sqlite_main_query( 

2247 self, 

2248 table: str, 

2249 type_: str, 

2250 schema: Optional[str], 

2251 sqlite_include_internal: bool, 

2252 ): 

2253 main = self._format_schema(schema, table) 

2254 if not sqlite_include_internal: 

2255 filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'" 

2256 else: 

2257 filter_table = "" 

2258 query = ( 

2259 f"SELECT name FROM {main} " 

2260 f"WHERE type='{type_}'{filter_table} " 

2261 "ORDER BY name" 

2262 ) 

2263 return query 

2264 

2265 @reflection.cache 

2266 def get_table_names( 

2267 self, connection, schema=None, sqlite_include_internal=False, **kw 

2268 ): 

2269 query = self._sqlite_main_query( 

2270 "sqlite_master", "table", schema, sqlite_include_internal 

2271 ) 

2272 names = connection.exec_driver_sql(query).scalars().all() 

2273 return names 

2274 

2275 @reflection.cache 

2276 def get_temp_table_names( 

2277 self, connection, sqlite_include_internal=False, **kw 

2278 ): 

2279 query = self._sqlite_main_query( 

2280 "sqlite_temp_master", "table", None, sqlite_include_internal 

2281 ) 

2282 names = connection.exec_driver_sql(query).scalars().all() 

2283 return names 

2284 

2285 @reflection.cache 

2286 def get_temp_view_names( 

2287 self, connection, sqlite_include_internal=False, **kw 

2288 ): 

2289 query = self._sqlite_main_query( 

2290 "sqlite_temp_master", "view", None, sqlite_include_internal 

2291 ) 

2292 names = connection.exec_driver_sql(query).scalars().all() 

2293 return names 

2294 

2295 @reflection.cache 

2296 def has_table(self, connection, table_name, schema=None, **kw): 

2297 self._ensure_has_table_connection(connection) 

2298 

2299 if schema is not None and schema not in self.get_schema_names( 

2300 connection, **kw 

2301 ): 

2302 return False 

2303 

2304 info = self._get_table_pragma( 

2305 connection, "table_info", table_name, schema=schema 

2306 ) 

2307 return bool(info) 

2308 

2309 def _get_default_schema_name(self, connection): 

2310 return "main" 

2311 

2312 @reflection.cache 

2313 def get_view_names( 

2314 self, connection, schema=None, sqlite_include_internal=False, **kw 

2315 ): 

2316 query = self._sqlite_main_query( 

2317 "sqlite_master", "view", schema, sqlite_include_internal 

2318 ) 

2319 names = connection.exec_driver_sql(query).scalars().all() 

2320 return names 

2321 

2322 @reflection.cache 

2323 def get_view_definition(self, connection, view_name, schema=None, **kw): 

2324 if schema is not None: 

2325 qschema = self.identifier_preparer.quote_identifier(schema) 

2326 master = f"{qschema}.sqlite_master" 

2327 s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % ( 

2328 master, 

2329 ) 

2330 rs = connection.exec_driver_sql(s, (view_name,)) 

2331 else: 

2332 try: 

2333 s = ( 

2334 "SELECT sql FROM " 

2335 " (SELECT * FROM sqlite_master UNION ALL " 

2336 " SELECT * FROM sqlite_temp_master) " 

2337 "WHERE name = ? " 

2338 "AND type='view'" 

2339 ) 

2340 rs = connection.exec_driver_sql(s, (view_name,)) 

2341 except exc.DBAPIError: 

2342 s = ( 

2343 "SELECT sql FROM sqlite_master WHERE name = ? " 

2344 "AND type='view'" 

2345 ) 

2346 rs = connection.exec_driver_sql(s, (view_name,)) 

2347 

2348 result = rs.fetchall() 

2349 if result: 

2350 return result[0].sql 

2351 else: 

2352 raise exc.NoSuchTableError( 

2353 f"{schema}.{view_name}" if schema else view_name 

2354 ) 

2355 

2356 @reflection.cache 

2357 def get_columns(self, connection, table_name, schema=None, **kw): 

2358 pragma = "table_info" 

2359 # computed columns are threaded as hidden, they require table_xinfo 

2360 if self.server_version_info >= (3, 31): 

2361 pragma = "table_xinfo" 

2362 info = self._get_table_pragma( 

2363 connection, pragma, table_name, schema=schema 

2364 ) 

2365 columns = [] 

2366 tablesql = None 

2367 for row in info: 

2368 name = row[1] 

2369 type_ = row[2].upper() 

2370 nullable = not row[3] 

2371 default = row[4] 

2372 primary_key = row[5] 

2373 hidden = row[6] if pragma == "table_xinfo" else 0 

2374 

2375 # hidden has value 0 for normal columns, 1 for hidden columns, 

2376 # 2 for computed virtual columns and 3 for computed stored columns 

2377 # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b 

2378 if hidden == 1: 

2379 continue 

2380 

2381 generated = bool(hidden) 

2382 persisted = hidden == 3 

2383 

2384 if tablesql is None and generated: 

2385 tablesql = self._get_table_sql( 

2386 connection, table_name, schema, **kw 

2387 ) 

2388 # remove create table 

2389 match = re.match( 

2390 r"create table .*?\((.*)\)$", 

2391 tablesql.strip(), 

2392 re.DOTALL | re.IGNORECASE, 

2393 ) 

2394 assert match, f"create table not found in {tablesql}" 

2395 tablesql = match.group(1).strip() 

2396 

2397 columns.append( 

2398 self._get_column_info( 

2399 name, 

2400 type_, 

2401 nullable, 

2402 default, 

2403 primary_key, 

2404 generated, 

2405 persisted, 

2406 tablesql, 

2407 ) 

2408 ) 

2409 if columns: 

2410 return columns 

2411 elif not self.has_table(connection, table_name, schema): 

2412 raise exc.NoSuchTableError( 

2413 f"{schema}.{table_name}" if schema else table_name 

2414 ) 

2415 else: 

2416 return ReflectionDefaults.columns() 

2417 

2418 def _get_column_info( 

2419 self, 

2420 name, 

2421 type_, 

2422 nullable, 

2423 default, 

2424 primary_key, 

2425 generated, 

2426 persisted, 

2427 tablesql, 

2428 ): 

2429 if generated: 

2430 # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)" 

2431 # somehow is "INTEGER GENERATED ALWAYS" 

2432 type_ = re.sub("generated", "", type_, flags=re.IGNORECASE) 

2433 type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip() 

2434 

2435 coltype = self._resolve_type_affinity(type_) 

2436 

2437 if default is not None: 

2438 default = str(default) 

2439 

2440 colspec = { 

2441 "name": name, 

2442 "type": coltype, 

2443 "nullable": nullable, 

2444 "default": default, 

2445 "primary_key": primary_key, 

2446 } 

2447 if generated: 

2448 sqltext = "" 

2449 if tablesql: 

2450 pattern = ( 

2451 r"[^,]*\s+GENERATED\s+ALWAYS\s+AS" 

2452 r"\s+\((.*)\)\s*(?:virtual|stored)?" 

2453 ) 

2454 match = re.search( 

2455 re.escape(name) + pattern, tablesql, re.IGNORECASE 

2456 ) 

2457 if match: 

2458 sqltext = match.group(1) 

2459 colspec["computed"] = {"sqltext": sqltext, "persisted": persisted} 

2460 return colspec 

2461 

2462 def _resolve_type_affinity(self, type_): 

2463 """Return a data type from a reflected column, using affinity rules. 

2464 

2465 SQLite's goal for universal compatibility introduces some complexity 

2466 during reflection, as a column's defined type might not actually be a 

2467 type that SQLite understands - or indeed, my not be defined *at all*. 

2468 Internally, SQLite handles this with a 'data type affinity' for each 

2469 column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER', 

2470 'REAL', or 'NONE' (raw bits). The algorithm that determines this is 

2471 listed in https://www.sqlite.org/datatype3.html section 2.1. 

2472 

2473 This method allows SQLAlchemy to support that algorithm, while still 

2474 providing access to smarter reflection utilities by recognizing 

2475 column definitions that SQLite only supports through affinity (like 

2476 DATE and DOUBLE). 

2477 

2478 """ 

2479 match = re.match(r"([\w ]+)(\(.*?\))?", type_) 

2480 if match: 

2481 coltype = match.group(1) 

2482 args = match.group(2) 

2483 else: 

2484 coltype = "" 

2485 args = "" 

2486 

2487 if coltype in self.ischema_names: 

2488 coltype = self.ischema_names[coltype] 

2489 elif "INT" in coltype: 

2490 coltype = sqltypes.INTEGER 

2491 elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype: 

2492 coltype = sqltypes.TEXT 

2493 elif "BLOB" in coltype or not coltype: 

2494 coltype = sqltypes.NullType 

2495 elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype: 

2496 coltype = sqltypes.REAL 

2497 else: 

2498 coltype = sqltypes.NUMERIC 

2499 

2500 if args is not None: 

2501 args = re.findall(r"(\d+)", args) 

2502 try: 

2503 coltype = coltype(*[int(a) for a in args]) 

2504 except TypeError: 

2505 util.warn( 

2506 "Could not instantiate type %s with " 

2507 "reflected arguments %s; using no arguments." 

2508 % (coltype, args) 

2509 ) 

2510 coltype = coltype() 

2511 else: 

2512 coltype = coltype() 

2513 

2514 return coltype 

2515 

2516 @reflection.cache 

2517 def get_pk_constraint(self, connection, table_name, schema=None, **kw): 

2518 constraint_name = None 

2519 table_data = self._get_table_sql(connection, table_name, schema=schema) 

2520 if table_data: 

2521 PK_PATTERN = r"CONSTRAINT (\w+) PRIMARY KEY" 

2522 result = re.search(PK_PATTERN, table_data, re.I) 

2523 constraint_name = result.group(1) if result else None 

2524 

2525 cols = self.get_columns(connection, table_name, schema, **kw) 

2526 # consider only pk columns. This also avoids sorting the cached 

2527 # value returned by get_columns 

2528 cols = [col for col in cols if col.get("primary_key", 0) > 0] 

2529 cols.sort(key=lambda col: col.get("primary_key")) 

2530 pkeys = [col["name"] for col in cols] 

2531 

2532 if pkeys: 

2533 return {"constrained_columns": pkeys, "name": constraint_name} 

2534 else: 

2535 return ReflectionDefaults.pk_constraint() 

2536 

2537 @reflection.cache 

2538 def get_foreign_keys(self, connection, table_name, schema=None, **kw): 

2539 # sqlite makes this *extremely difficult*. 

2540 # First, use the pragma to get the actual FKs. 

2541 pragma_fks = self._get_table_pragma( 

2542 connection, "foreign_key_list", table_name, schema=schema 

2543 ) 

2544 

2545 fks = {} 

2546 

2547 for row in pragma_fks: 

2548 (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4]) 

2549 

2550 if not rcol: 

2551 # no referred column, which means it was not named in the 

2552 # original DDL. The referred columns of the foreign key 

2553 # constraint are therefore the primary key of the referred 

2554 # table. 

2555 try: 

2556 referred_pk = self.get_pk_constraint( 

2557 connection, rtbl, schema=schema, **kw 

2558 ) 

2559 referred_columns = referred_pk["constrained_columns"] 

2560 except exc.NoSuchTableError: 

2561 # ignore not existing parents 

2562 referred_columns = [] 

2563 else: 

2564 # note we use this list only if this is the first column 

2565 # in the constraint. for subsequent columns we ignore the 

2566 # list and append "rcol" if present. 

2567 referred_columns = [] 

2568 

2569 if self._broken_fk_pragma_quotes: 

2570 rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl) 

2571 

2572 if numerical_id in fks: 

2573 fk = fks[numerical_id] 

2574 else: 

2575 fk = fks[numerical_id] = { 

2576 "name": None, 

2577 "constrained_columns": [], 

2578 "referred_schema": schema, 

2579 "referred_table": rtbl, 

2580 "referred_columns": referred_columns, 

2581 "options": {}, 

2582 } 

2583 fks[numerical_id] = fk 

2584 

2585 fk["constrained_columns"].append(lcol) 

2586 

2587 if rcol: 

2588 fk["referred_columns"].append(rcol) 

2589 

2590 def fk_sig(constrained_columns, referred_table, referred_columns): 

2591 return ( 

2592 tuple(constrained_columns) 

2593 + (referred_table,) 

2594 + tuple(referred_columns) 

2595 ) 

2596 

2597 # then, parse the actual SQL and attempt to find DDL that matches 

2598 # the names as well. SQLite saves the DDL in whatever format 

2599 # it was typed in as, so need to be liberal here. 

2600 

2601 keys_by_signature = { 

2602 fk_sig( 

2603 fk["constrained_columns"], 

2604 fk["referred_table"], 

2605 fk["referred_columns"], 

2606 ): fk 

2607 for fk in fks.values() 

2608 } 

2609 

2610 table_data = self._get_table_sql(connection, table_name, schema=schema) 

2611 

2612 def parse_fks(): 

2613 if table_data is None: 

2614 # system tables, etc. 

2615 return 

2616 

2617 # note that we already have the FKs from PRAGMA above. This whole 

2618 # regexp thing is trying to locate additional detail about the 

2619 # FKs, namely the name of the constraint and other options. 

2620 # so parsing the columns is really about matching it up to what 

2621 # we already have. 

2622 FK_PATTERN = ( 

2623 r"(?:CONSTRAINT (\w+) +)?" 

2624 r"FOREIGN KEY *\( *(.+?) *\) +" 

2625 r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *' # noqa: E501 

2626 r"((?:ON (?:DELETE|UPDATE) " 

2627 r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)" 

2628 r"((?:NOT +)?DEFERRABLE)?" 

2629 r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?" 

2630 ) 

2631 for match in re.finditer(FK_PATTERN, table_data, re.I): 

2632 ( 

2633 constraint_name, 

2634 constrained_columns, 

2635 referred_quoted_name, 

2636 referred_name, 

2637 referred_columns, 

2638 onupdatedelete, 

2639 deferrable, 

2640 initially, 

2641 ) = match.group(1, 2, 3, 4, 5, 6, 7, 8) 

2642 constrained_columns = list( 

2643 self._find_cols_in_sig(constrained_columns) 

2644 ) 

2645 if not referred_columns: 

2646 referred_columns = constrained_columns 

2647 else: 

2648 referred_columns = list( 

2649 self._find_cols_in_sig(referred_columns) 

2650 ) 

2651 referred_name = referred_quoted_name or referred_name 

2652 options = {} 

2653 

2654 for token in re.split(r" *\bON\b *", onupdatedelete.upper()): 

2655 if token.startswith("DELETE"): 

2656 ondelete = token[6:].strip() 

2657 if ondelete and ondelete != "NO ACTION": 

2658 options["ondelete"] = ondelete 

2659 elif token.startswith("UPDATE"): 

2660 onupdate = token[6:].strip() 

2661 if onupdate and onupdate != "NO ACTION": 

2662 options["onupdate"] = onupdate 

2663 

2664 if deferrable: 

2665 options["deferrable"] = "NOT" not in deferrable.upper() 

2666 if initially: 

2667 options["initially"] = initially.upper() 

2668 

2669 yield ( 

2670 constraint_name, 

2671 constrained_columns, 

2672 referred_name, 

2673 referred_columns, 

2674 options, 

2675 ) 

2676 

2677 fkeys = [] 

2678 

2679 for ( 

2680 constraint_name, 

2681 constrained_columns, 

2682 referred_name, 

2683 referred_columns, 

2684 options, 

2685 ) in parse_fks(): 

2686 sig = fk_sig(constrained_columns, referred_name, referred_columns) 

2687 if sig not in keys_by_signature: 

2688 util.warn( 

2689 "WARNING: SQL-parsed foreign key constraint " 

2690 "'%s' could not be located in PRAGMA " 

2691 "foreign_keys for table %s" % (sig, table_name) 

2692 ) 

2693 continue 

2694 key = keys_by_signature.pop(sig) 

2695 key["name"] = constraint_name 

2696 key["options"] = options 

2697 fkeys.append(key) 

2698 # assume the remainders are the unnamed, inline constraints, just 

2699 # use them as is as it's extremely difficult to parse inline 

2700 # constraints 

2701 fkeys.extend(keys_by_signature.values()) 

2702 if fkeys: 

2703 return fkeys 

2704 else: 

2705 return ReflectionDefaults.foreign_keys() 

2706 

2707 def _find_cols_in_sig(self, sig): 

2708 for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I): 

2709 yield match.group(1) or match.group(2) 

2710 

2711 @reflection.cache 

2712 def get_unique_constraints( 

2713 self, connection, table_name, schema=None, **kw 

2714 ): 

2715 auto_index_by_sig = {} 

2716 for idx in self.get_indexes( 

2717 connection, 

2718 table_name, 

2719 schema=schema, 

2720 include_auto_indexes=True, 

2721 **kw, 

2722 ): 

2723 if not idx["name"].startswith("sqlite_autoindex"): 

2724 continue 

2725 sig = tuple(idx["column_names"]) 

2726 auto_index_by_sig[sig] = idx 

2727 

2728 table_data = self._get_table_sql( 

2729 connection, table_name, schema=schema, **kw 

2730 ) 

2731 unique_constraints = [] 

2732 

2733 def parse_uqs(): 

2734 if table_data is None: 

2735 return 

2736 UNIQUE_PATTERN = r'(?:CONSTRAINT "?(.+?)"? +)?UNIQUE *\((.+?)\)' 

2737 INLINE_UNIQUE_PATTERN = ( 

2738 r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?)[\t ]' 

2739 r"+[a-z0-9_ ]+?[\t ]+UNIQUE" 

2740 ) 

2741 

2742 for match in re.finditer(UNIQUE_PATTERN, table_data, re.I): 

2743 name, cols = match.group(1, 2) 

2744 yield name, list(self._find_cols_in_sig(cols)) 

2745 

2746 # we need to match inlines as well, as we seek to differentiate 

2747 # a UNIQUE constraint from a UNIQUE INDEX, even though these 

2748 # are kind of the same thing :) 

2749 for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I): 

2750 cols = list( 

2751 self._find_cols_in_sig(match.group(1) or match.group(2)) 

2752 ) 

2753 yield None, cols 

2754 

2755 for name, cols in parse_uqs(): 

2756 sig = tuple(cols) 

2757 if sig in auto_index_by_sig: 

2758 auto_index_by_sig.pop(sig) 

2759 parsed_constraint = {"name": name, "column_names": cols} 

2760 unique_constraints.append(parsed_constraint) 

2761 # NOTE: auto_index_by_sig might not be empty here, 

2762 # the PRIMARY KEY may have an entry. 

2763 if unique_constraints: 

2764 return unique_constraints 

2765 else: 

2766 return ReflectionDefaults.unique_constraints() 

2767 

2768 @reflection.cache 

2769 def get_check_constraints(self, connection, table_name, schema=None, **kw): 

2770 table_data = self._get_table_sql( 

2771 connection, table_name, schema=schema, **kw 

2772 ) 

2773 

2774 # NOTE NOTE NOTE 

2775 # DO NOT CHANGE THIS REGULAR EXPRESSION. There is no known way 

2776 # to parse CHECK constraints that contain newlines themselves using 

2777 # regular expressions, and the approach here relies upon each 

2778 # individual 

2779 # CHECK constraint being on a single line by itself. This 

2780 # necessarily makes assumptions as to how the CREATE TABLE 

2781 # was emitted. A more comprehensive DDL parsing solution would be 

2782 # needed to improve upon the current situation. See #11840 for 

2783 # background 

2784 CHECK_PATTERN = r"(?:CONSTRAINT (.+) +)?CHECK *\( *(.+) *\),? *" 

2785 cks = [] 

2786 

2787 for match in re.finditer(CHECK_PATTERN, table_data or "", re.I): 

2788 

2789 name = match.group(1) 

2790 

2791 if name: 

2792 name = re.sub(r'^"|"$', "", name) 

2793 

2794 cks.append({"sqltext": match.group(2), "name": name}) 

2795 cks.sort(key=lambda d: d["name"] or "~") # sort None as last 

2796 if cks: 

2797 return cks 

2798 else: 

2799 return ReflectionDefaults.check_constraints() 

2800 

2801 @reflection.cache 

2802 def get_indexes(self, connection, table_name, schema=None, **kw): 

2803 pragma_indexes = self._get_table_pragma( 

2804 connection, "index_list", table_name, schema=schema 

2805 ) 

2806 indexes = [] 

2807 

2808 # regular expression to extract the filter predicate of a partial 

2809 # index. this could fail to extract the predicate correctly on 

2810 # indexes created like 

2811 # CREATE INDEX i ON t (col || ') where') WHERE col <> '' 

2812 # but as this function does not support expression-based indexes 

2813 # this case does not occur. 

2814 partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE) 

2815 

2816 if schema: 

2817 schema_expr = "%s." % self.identifier_preparer.quote_identifier( 

2818 schema 

2819 ) 

2820 else: 

2821 schema_expr = "" 

2822 

2823 include_auto_indexes = kw.pop("include_auto_indexes", False) 

2824 for row in pragma_indexes: 

2825 # ignore implicit primary key index. 

2826 # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html 

2827 if not include_auto_indexes and row[1].startswith( 

2828 "sqlite_autoindex" 

2829 ): 

2830 continue 

2831 indexes.append( 

2832 dict( 

2833 name=row[1], 

2834 column_names=[], 

2835 unique=row[2], 

2836 dialect_options={}, 

2837 ) 

2838 ) 

2839 

2840 # check partial indexes 

2841 if len(row) >= 5 and row[4]: 

2842 s = ( 

2843 "SELECT sql FROM %(schema)ssqlite_master " 

2844 "WHERE name = ? " 

2845 "AND type = 'index'" % {"schema": schema_expr} 

2846 ) 

2847 rs = connection.exec_driver_sql(s, (row[1],)) 

2848 index_sql = rs.scalar() 

2849 predicate_match = partial_pred_re.search(index_sql) 

2850 if predicate_match is None: 

2851 # unless the regex is broken this case shouldn't happen 

2852 # because we know this is a partial index, so the 

2853 # definition sql should match the regex 

2854 util.warn( 

2855 "Failed to look up filter predicate of " 

2856 "partial index %s" % row[1] 

2857 ) 

2858 else: 

2859 predicate = predicate_match.group(1) 

2860 indexes[-1]["dialect_options"]["sqlite_where"] = text( 

2861 predicate 

2862 ) 

2863 

2864 # loop thru unique indexes to get the column names. 

2865 for idx in list(indexes): 

2866 pragma_index = self._get_table_pragma( 

2867 connection, "index_info", idx["name"], schema=schema 

2868 ) 

2869 

2870 for row in pragma_index: 

2871 if row[2] is None: 

2872 util.warn( 

2873 "Skipped unsupported reflection of " 

2874 "expression-based index %s" % idx["name"] 

2875 ) 

2876 indexes.remove(idx) 

2877 break 

2878 else: 

2879 idx["column_names"].append(row[2]) 

2880 

2881 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last 

2882 if indexes: 

2883 return indexes 

2884 elif not self.has_table(connection, table_name, schema): 

2885 raise exc.NoSuchTableError( 

2886 f"{schema}.{table_name}" if schema else table_name 

2887 ) 

2888 else: 

2889 return ReflectionDefaults.indexes() 

2890 

2891 def _is_sys_table(self, table_name): 

2892 return table_name in { 

2893 "sqlite_schema", 

2894 "sqlite_master", 

2895 "sqlite_temp_schema", 

2896 "sqlite_temp_master", 

2897 } 

2898 

2899 @reflection.cache 

2900 def _get_table_sql(self, connection, table_name, schema=None, **kw): 

2901 if schema: 

2902 schema_expr = "%s." % ( 

2903 self.identifier_preparer.quote_identifier(schema) 

2904 ) 

2905 else: 

2906 schema_expr = "" 

2907 try: 

2908 s = ( 

2909 "SELECT sql FROM " 

2910 " (SELECT * FROM %(schema)ssqlite_master UNION ALL " 

2911 " SELECT * FROM %(schema)ssqlite_temp_master) " 

2912 "WHERE name = ? " 

2913 "AND type in ('table', 'view')" % {"schema": schema_expr} 

2914 ) 

2915 rs = connection.exec_driver_sql(s, (table_name,)) 

2916 except exc.DBAPIError: 

2917 s = ( 

2918 "SELECT sql FROM %(schema)ssqlite_master " 

2919 "WHERE name = ? " 

2920 "AND type in ('table', 'view')" % {"schema": schema_expr} 

2921 ) 

2922 rs = connection.exec_driver_sql(s, (table_name,)) 

2923 value = rs.scalar() 

2924 if value is None and not self._is_sys_table(table_name): 

2925 raise exc.NoSuchTableError(f"{schema_expr}{table_name}") 

2926 return value 

2927 

2928 def _get_table_pragma(self, connection, pragma, table_name, schema=None): 

2929 quote = self.identifier_preparer.quote_identifier 

2930 if schema is not None: 

2931 statements = [f"PRAGMA {quote(schema)}."] 

2932 else: 

2933 # because PRAGMA looks in all attached databases if no schema 

2934 # given, need to specify "main" schema, however since we want 

2935 # 'temp' tables in the same namespace as 'main', need to run 

2936 # the PRAGMA twice 

2937 statements = ["PRAGMA main.", "PRAGMA temp."] 

2938 

2939 qtable = quote(table_name) 

2940 for statement in statements: 

2941 statement = f"{statement}{pragma}({qtable})" 

2942 cursor = connection.exec_driver_sql(statement) 

2943 if not cursor._soft_closed: 

2944 # work around SQLite issue whereby cursor.description 

2945 # is blank when PRAGMA returns no rows: 

2946 # https://www.sqlite.org/cvstrac/tktview?tn=1884 

2947 result = cursor.fetchall() 

2948 else: 

2949 result = [] 

2950 if result: 

2951 return result 

2952 else: 

2953 return []