1# dialects/sqlite/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10r'''
11.. dialect:: sqlite
12 :name: SQLite
13 :normal_support: 3.12+
14 :best_effort: 3.7.16+
15
16.. _sqlite_datetime:
17
18Date and Time Types
19-------------------
20
21SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does
22not provide out of the box functionality for translating values between Python
23`datetime` objects and a SQLite-supported format. SQLAlchemy's own
24:class:`~sqlalchemy.types.DateTime` and related types provide date formatting
25and parsing functionality when SQLite is used. The implementation classes are
26:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`.
27These types represent dates and times as ISO formatted strings, which also
28nicely support ordering. There's no reliance on typical "libc" internals for
29these functions so historical dates are fully supported.
30
31Ensuring Text affinity
32^^^^^^^^^^^^^^^^^^^^^^
33
34The DDL rendered for these types is the standard ``DATE``, ``TIME``
35and ``DATETIME`` indicators. However, custom storage formats can also be
36applied to these types. When the
37storage format is detected as containing no alpha characters, the DDL for
38these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``,
39so that the column continues to have textual affinity.
40
41.. seealso::
42
43 `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ -
44 in the SQLite documentation
45
46.. _sqlite_autoincrement:
47
48SQLite Auto Incrementing Behavior
49----------------------------------
50
51Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html
52
53Key concepts:
54
55* SQLite has an implicit "auto increment" feature that takes place for any
56 non-composite primary-key column that is specifically created using
57 "INTEGER PRIMARY KEY" for the type + primary key.
58
59* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not**
60 equivalent to the implicit autoincrement feature; this keyword is not
61 recommended for general use. SQLAlchemy does not render this keyword
62 unless a special SQLite-specific directive is used (see below). However,
63 it still requires that the column's type is named "INTEGER".
64
65Using the AUTOINCREMENT Keyword
66^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
67
68To specifically render the AUTOINCREMENT keyword on the primary key column
69when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table
70construct::
71
72 Table(
73 "sometable",
74 metadata,
75 Column("id", Integer, primary_key=True),
76 sqlite_autoincrement=True,
77 )
78
79Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER
80^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
81
82SQLite's typing model is based on naming conventions. Among other things, this
83means that any type name which contains the substring ``"INT"`` will be
84determined to be of "integer affinity". A type named ``"BIGINT"``,
85``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be
86of "integer" affinity. However, **the SQLite autoincrement feature, whether
87implicitly or explicitly enabled, requires that the name of the column's type
88is exactly the string "INTEGER"**. Therefore, if an application uses a type
89like :class:`.BigInteger` for a primary key, on SQLite this type will need to
90be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE
91TABLE`` statement in order for the autoincrement behavior to be available.
92
93One approach to achieve this is to use :class:`.Integer` on SQLite
94only using :meth:`.TypeEngine.with_variant`::
95
96 table = Table(
97 "my_table",
98 metadata,
99 Column(
100 "id",
101 BigInteger().with_variant(Integer, "sqlite"),
102 primary_key=True,
103 ),
104 )
105
106Another is to use a subclass of :class:`.BigInteger` that overrides its DDL
107name to be ``INTEGER`` when compiled against SQLite::
108
109 from sqlalchemy import BigInteger
110 from sqlalchemy.ext.compiler import compiles
111
112
113 class SLBigInteger(BigInteger):
114 pass
115
116
117 @compiles(SLBigInteger, "sqlite")
118 def bi_c(element, compiler, **kw):
119 return "INTEGER"
120
121
122 @compiles(SLBigInteger)
123 def bi_c(element, compiler, **kw):
124 return compiler.visit_BIGINT(element, **kw)
125
126
127 table = Table(
128 "my_table", metadata, Column("id", SLBigInteger(), primary_key=True)
129 )
130
131.. seealso::
132
133 :meth:`.TypeEngine.with_variant`
134
135 :ref:`sqlalchemy.ext.compiler_toplevel`
136
137 `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_
138
139.. _sqlite_transactions:
140
141Transactions with SQLite and the sqlite3 driver
142-----------------------------------------------
143
144As a file-based database, SQLite's approach to transactions differs from
145traditional databases in many ways. Additionally, the ``sqlite3`` driver
146standard with Python (as well as the async version ``aiosqlite`` which builds
147on top of it) has several quirks, workarounds, and API features in the
148area of transaction control, all of which generally need to be addressed when
149constructing a SQLAlchemy application that uses SQLite.
150
151Legacy Transaction Mode with the sqlite3 driver
152^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
153
154The most important aspect of transaction handling with the sqlite3 driver is
155that it defaults (which will continue through Python 3.15 before being
156removed in Python 3.16) to legacy transactional behavior which does
157not strictly follow :pep:`249`. The way in which the driver diverges from the
158PEP is that it does not "begin" a transaction automatically as dictated by
159:pep:`249` except in the case of DML statements, e.g. INSERT, UPDATE, and
160DELETE. Normally, :pep:`249` dictates that a BEGIN must be emitted upon
161the first SQL statement of any kind, so that all subsequent operations will
162be established within a transaction until ``connection.commit()`` has been
163called. The ``sqlite3`` driver, in an effort to be easier to use in
164highly concurrent environments, skips this step for DQL (e.g. SELECT) statements,
165and also skips it for DDL (e.g. CREATE TABLE etc.) statements for more legacy
166reasons. Statements such as SAVEPOINT are also skipped.
167
168In modern versions of the ``sqlite3`` driver as of Python 3.12, this legacy
169mode of operation is referred to as
170`"legacy transaction control" <https://docs.python.org/3/library/sqlite3.html#sqlite3-transaction-control-isolation-level>`_, and is in
171effect by default due to the ``Connection.autocommit`` parameter being set to
172the constant ``sqlite3.LEGACY_TRANSACTION_CONTROL``. Prior to Python 3.12,
173the ``Connection.autocommit`` attribute did not exist.
174
175The implications of legacy transaction mode include:
176
177* **Incorrect support for transactional DDL** - statements like CREATE TABLE, ALTER TABLE,
178 CREATE INDEX etc. will not automatically BEGIN a transaction if one were not
179 started already, leading to the changes by each statement being
180 "autocommitted" immediately unless BEGIN were otherwise emitted first. Very
181 old (pre Python 3.6) versions of SQLite would also force a COMMIT for these
182 operations even if a transaction were present, however this is no longer the
183 case.
184* **SERIALIZABLE behavior not fully functional** - SQLite's transaction isolation
185 behavior is normally consistent with SERIALIZABLE isolation, as it is a file-
186 based system that locks the database file entirely for write operations,
187 preventing COMMIT until all reader transactions (and associated file locks)
188 have completed. However, sqlite3's legacy transaction mode fails to emit BEGIN for SELECT
189 statements, which causes these SELECT statements to no longer be "repeatable",
190 failing one of the consistency guarantees of SERIALIZABLE.
191* **Incorrect behavior for SAVEPOINT** - as the SAVEPOINT statement does not
192 imply a BEGIN, a new SAVEPOINT emitted before a BEGIN will function on its
193 own but fails to participate in the enclosing transaction, meaning a ROLLBACK
194 of the transaction will not rollback elements that were part of a released
195 savepoint.
196
197Legacy transaction mode first existed in order to facilitate working around
198SQLite's file locks. Because SQLite relies upon whole-file locks, it is easy to
199get "database is locked" errors, particularly when newer features like "write
200ahead logging" are disabled. This is a key reason why ``sqlite3``'s legacy
201transaction mode is still the default mode of operation; disabling it will
202produce behavior that is more susceptible to locked database errors. However
203note that **legacy transaction mode will no longer be the default** in a future
204Python version (3.16 as of this writing).
205
206.. _sqlite_enabling_transactions:
207
208Enabling Non-Legacy SQLite Transactional Modes with the sqlite3 or aiosqlite driver
209^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
210
211Current SQLAlchemy support allows either for setting the
212``.Connection.autocommit`` attribute, most directly by using a
213:func:`._sa.create_engine` parameter, or if on an older version of Python where
214the attribute is not available, using event hooks to control the behavior of
215BEGIN.
216
217* **Enabling modern sqlite3 transaction control via the autocommit connect parameter** (Python 3.12 and above)
218
219 To use SQLite in the mode described at `Transaction control via the autocommit attribute <https://docs.python.org/3/library/sqlite3.html#transaction-control-via-the-autocommit-attribute>`_,
220 the most straightforward approach is to set the attribute to its recommended value
221 of ``False`` at the connect level using :paramref:`_sa.create_engine.connect_args``::
222
223 from sqlalchemy import create_engine
224
225 engine = create_engine(
226 "sqlite:///myfile.db", connect_args={"autocommit": False}
227 )
228
229 This parameter is also passed through when using the aiosqlite driver::
230
231 from sqlalchemy.ext.asyncio import create_async_engine
232
233 engine = create_async_engine(
234 "sqlite+aiosqlite:///myfile.db", connect_args={"autocommit": False}
235 )
236
237 The parameter can also be set at the attribute level using the :meth:`.PoolEvents.connect`
238 event hook, however this will only work for sqlite3, as aiosqlite does not yet expose this
239 attribute on its ``Connection`` object::
240
241 from sqlalchemy import create_engine, event
242
243 engine = create_engine("sqlite:///myfile.db")
244
245
246 @event.listens_for(engine, "connect")
247 def do_connect(dbapi_connection, connection_record):
248 # enable autocommit=False mode
249 dbapi_connection.autocommit = False
250
251* **Using SQLAlchemy to emit BEGIN in lieu of SQLite's transaction control** (all Python versions, sqlite3 and aiosqlite)
252
253 For older versions of ``sqlite3`` or for cross-compatibility with older and
254 newer versions, SQLAlchemy can also take over the job of transaction control.
255 This is achieved by using the :meth:`.ConnectionEvents.begin` hook
256 to emit the "BEGIN" command directly, while also disabling SQLite's control
257 of this command using the :meth:`.PoolEvents.connect` event hook to set the
258 ``Connection.isolation_level`` attribute to ``None``::
259
260
261 from sqlalchemy import create_engine, event
262
263 engine = create_engine("sqlite:///myfile.db")
264
265
266 @event.listens_for(engine, "connect")
267 def do_connect(dbapi_connection, connection_record):
268 # disable sqlite3's emitting of the BEGIN statement entirely.
269 dbapi_connection.isolation_level = None
270
271
272 @event.listens_for(engine, "begin")
273 def do_begin(conn):
274 # emit our own BEGIN. sqlite3 still emits COMMIT/ROLLBACK correctly
275 conn.exec_driver_sql("BEGIN")
276
277 When using the asyncio variant ``aiosqlite``, refer to ``engine.sync_engine``
278 as in the example below::
279
280 from sqlalchemy import create_engine, event
281 from sqlalchemy.ext.asyncio import create_async_engine
282
283 engine = create_async_engine("sqlite+aiosqlite:///myfile.db")
284
285
286 @event.listens_for(engine.sync_engine, "connect")
287 def do_connect(dbapi_connection, connection_record):
288 # disable aiosqlite's emitting of the BEGIN statement entirely.
289 dbapi_connection.isolation_level = None
290
291
292 @event.listens_for(engine.sync_engine, "begin")
293 def do_begin(conn):
294 # emit our own BEGIN. aiosqlite still emits COMMIT/ROLLBACK correctly
295 conn.exec_driver_sql("BEGIN")
296
297.. _sqlite_isolation_level:
298
299Using SQLAlchemy's Driver Level AUTOCOMMIT Feature with SQLite
300^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
301
302SQLAlchemy has a comprehensive database isolation feature with optional
303autocommit support that is introduced in the section :ref:`dbapi_autocommit`.
304
305For the ``sqlite3`` and ``aiosqlite`` drivers, SQLAlchemy only includes
306built-in support for "AUTOCOMMIT". Note that this mode is currently incompatible
307with the non-legacy isolation mode hooks documented in the previous
308section at :ref:`sqlite_enabling_transactions`.
309
310To use the ``sqlite3`` driver with SQLAlchemy driver-level autocommit,
311create an engine setting the :paramref:`_sa.create_engine.isolation_level`
312parameter to "AUTOCOMMIT"::
313
314 eng = create_engine("sqlite:///myfile.db", isolation_level="AUTOCOMMIT")
315
316When using the above mode, any event hooks that set the sqlite3 ``Connection.autocommit``
317parameter away from its default of ``sqlite3.LEGACY_TRANSACTION_CONTROL``
318as well as hooks that emit ``BEGIN`` should be disabled.
319
320Additional Reading for SQLite / sqlite3 transaction control
321^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
322
323Links with important information on SQLite, the sqlite3 driver,
324as well as long historical conversations on how things got to their current state:
325
326* `Isolation in SQLite <https://www.sqlite.org/isolation.html>`_ - on the SQLite website
327* `Transaction control <https://docs.python.org/3/library/sqlite3.html#transaction-control>`_ - describes the sqlite3 autocommit attribute as well
328 as the legacy isolation_level attribute.
329* `sqlite3 SELECT does not BEGIN a transaction, but should according to spec <https://github.com/python/cpython/issues/54133>`_ - imported Python standard library issue on github
330* `sqlite3 module breaks transactions and potentially corrupts data <https://github.com/python/cpython/issues/54949>`_ - imported Python standard library issue on github
331
332
333INSERT/UPDATE/DELETE...RETURNING
334---------------------------------
335
336The SQLite dialect supports SQLite 3.35's ``INSERT|UPDATE|DELETE..RETURNING``
337syntax. ``INSERT..RETURNING`` may be used
338automatically in some cases in order to fetch newly generated identifiers in
339place of the traditional approach of using ``cursor.lastrowid``, however
340``cursor.lastrowid`` is currently still preferred for simple single-statement
341cases for its better performance.
342
343To specify an explicit ``RETURNING`` clause, use the
344:meth:`._UpdateBase.returning` method on a per-statement basis::
345
346 # INSERT..RETURNING
347 result = connection.execute(
348 table.insert().values(name="foo").returning(table.c.col1, table.c.col2)
349 )
350 print(result.all())
351
352 # UPDATE..RETURNING
353 result = connection.execute(
354 table.update()
355 .where(table.c.name == "foo")
356 .values(name="bar")
357 .returning(table.c.col1, table.c.col2)
358 )
359 print(result.all())
360
361 # DELETE..RETURNING
362 result = connection.execute(
363 table.delete()
364 .where(table.c.name == "foo")
365 .returning(table.c.col1, table.c.col2)
366 )
367 print(result.all())
368
369.. versionadded:: 2.0 Added support for SQLite RETURNING
370
371
372.. _sqlite_foreign_keys:
373
374Foreign Key Support
375-------------------
376
377SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables,
378however by default these constraints have no effect on the operation of the
379table.
380
381Constraint checking on SQLite has three prerequisites:
382
383* At least version 3.6.19 of SQLite must be in use
384* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY
385 or SQLITE_OMIT_TRIGGER symbols enabled.
386* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all
387 connections before use -- including the initial call to
388 :meth:`sqlalchemy.schema.MetaData.create_all`.
389
390SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for
391new connections through the usage of events::
392
393 from sqlalchemy.engine import Engine
394 from sqlalchemy import event
395
396
397 @event.listens_for(Engine, "connect")
398 def set_sqlite_pragma(dbapi_connection, connection_record):
399 # the sqlite3 driver will not set PRAGMA foreign_keys
400 # if autocommit=False; set to True temporarily
401 ac = dbapi_connection.autocommit
402 dbapi_connection.autocommit = True
403
404 cursor = dbapi_connection.cursor()
405 cursor.execute("PRAGMA foreign_keys=ON")
406 cursor.close()
407
408 # restore previous autocommit setting
409 dbapi_connection.autocommit = ac
410
411.. warning::
412
413 When SQLite foreign keys are enabled, it is **not possible**
414 to emit CREATE or DROP statements for tables that contain
415 mutually-dependent foreign key constraints;
416 to emit the DDL for these tables requires that ALTER TABLE be used to
417 create or drop these constraints separately, for which SQLite has
418 no support.
419
420.. seealso::
421
422 `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_
423 - on the SQLite web site.
424
425 :ref:`event_toplevel` - SQLAlchemy event API.
426
427 :ref:`use_alter` - more information on SQLAlchemy's facilities for handling
428 mutually-dependent foreign key constraints.
429
430.. _sqlite_on_conflict_ddl:
431
432ON CONFLICT support for constraints
433-----------------------------------
434
435.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for
436 SQLite, which occurs within a CREATE TABLE statement. For "ON CONFLICT" as
437 applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`.
438
439SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied
440to primary key, unique, check, and not null constraints. In DDL, it is
441rendered either within the "CONSTRAINT" clause or within the column definition
442itself depending on the location of the target constraint. To render this
443clause within DDL, the extension parameter ``sqlite_on_conflict`` can be
444specified with a string conflict resolution algorithm within the
445:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`,
446:class:`.CheckConstraint` objects. Within the :class:`_schema.Column` object,
447there
448are individual parameters ``sqlite_on_conflict_not_null``,
449``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each
450correspond to the three types of relevant constraint types that can be
451indicated from a :class:`_schema.Column` object.
452
453.. seealso::
454
455 `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite
456 documentation
457
458.. versionadded:: 1.3
459
460
461The ``sqlite_on_conflict`` parameters accept a string argument which is just
462the resolution name to be chosen, which on SQLite can be one of ROLLBACK,
463ABORT, FAIL, IGNORE, and REPLACE. For example, to add a UNIQUE constraint
464that specifies the IGNORE algorithm::
465
466 some_table = Table(
467 "some_table",
468 metadata,
469 Column("id", Integer, primary_key=True),
470 Column("data", Integer),
471 UniqueConstraint("id", "data", sqlite_on_conflict="IGNORE"),
472 )
473
474The above renders CREATE TABLE DDL as:
475
476.. sourcecode:: sql
477
478 CREATE TABLE some_table (
479 id INTEGER NOT NULL,
480 data INTEGER,
481 PRIMARY KEY (id),
482 UNIQUE (id, data) ON CONFLICT IGNORE
483 )
484
485
486When using the :paramref:`_schema.Column.unique`
487flag to add a UNIQUE constraint
488to a single column, the ``sqlite_on_conflict_unique`` parameter can
489be added to the :class:`_schema.Column` as well, which will be added to the
490UNIQUE constraint in the DDL::
491
492 some_table = Table(
493 "some_table",
494 metadata,
495 Column("id", Integer, primary_key=True),
496 Column(
497 "data", Integer, unique=True, sqlite_on_conflict_unique="IGNORE"
498 ),
499 )
500
501rendering:
502
503.. sourcecode:: sql
504
505 CREATE TABLE some_table (
506 id INTEGER NOT NULL,
507 data INTEGER,
508 PRIMARY KEY (id),
509 UNIQUE (data) ON CONFLICT IGNORE
510 )
511
512To apply the FAIL algorithm for a NOT NULL constraint,
513``sqlite_on_conflict_not_null`` is used::
514
515 some_table = Table(
516 "some_table",
517 metadata,
518 Column("id", Integer, primary_key=True),
519 Column(
520 "data", Integer, nullable=False, sqlite_on_conflict_not_null="FAIL"
521 ),
522 )
523
524this renders the column inline ON CONFLICT phrase:
525
526.. sourcecode:: sql
527
528 CREATE TABLE some_table (
529 id INTEGER NOT NULL,
530 data INTEGER NOT NULL ON CONFLICT FAIL,
531 PRIMARY KEY (id)
532 )
533
534
535Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``::
536
537 some_table = Table(
538 "some_table",
539 metadata,
540 Column(
541 "id",
542 Integer,
543 primary_key=True,
544 sqlite_on_conflict_primary_key="FAIL",
545 ),
546 )
547
548SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict
549resolution algorithm is applied to the constraint itself:
550
551.. sourcecode:: sql
552
553 CREATE TABLE some_table (
554 id INTEGER NOT NULL,
555 PRIMARY KEY (id) ON CONFLICT FAIL
556 )
557
558.. _sqlite_on_conflict_insert:
559
560INSERT...ON CONFLICT (Upsert)
561-----------------------------
562
563.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for
564 SQLite, which occurs within an INSERT statement. For "ON CONFLICT" as
565 applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`.
566
567From version 3.24.0 onwards, SQLite supports "upserts" (update or insert)
568of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT``
569statement. A candidate row will only be inserted if that row does not violate
570any unique or primary key constraints. In the case of a unique constraint violation, a
571secondary action can occur which can be either "DO UPDATE", indicating that
572the data in the target row should be updated, or "DO NOTHING", which indicates
573to silently skip this row.
574
575Conflicts are determined using columns that are part of existing unique
576constraints and indexes. These constraints are identified by stating the
577columns and conditions that comprise the indexes.
578
579SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific
580:func:`_sqlite.insert()` function, which provides
581the generative methods :meth:`_sqlite.Insert.on_conflict_do_update`
582and :meth:`_sqlite.Insert.on_conflict_do_nothing`:
583
584.. sourcecode:: pycon+sql
585
586 >>> from sqlalchemy.dialects.sqlite import insert
587
588 >>> insert_stmt = insert(my_table).values(
589 ... id="some_existing_id", data="inserted value"
590 ... )
591
592 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
593 ... index_elements=["id"], set_=dict(data="updated value")
594 ... )
595
596 >>> print(do_update_stmt)
597 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
598 ON CONFLICT (id) DO UPDATE SET data = ?{stop}
599
600 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"])
601
602 >>> print(do_nothing_stmt)
603 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
604 ON CONFLICT (id) DO NOTHING
605
606.. versionadded:: 1.4
607
608.. seealso::
609
610 `Upsert
611 <https://sqlite.org/lang_UPSERT.html>`_
612 - in the SQLite documentation.
613
614
615Specifying the Target
616^^^^^^^^^^^^^^^^^^^^^
617
618Both methods supply the "target" of the conflict using column inference:
619
620* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument
621 specifies a sequence containing string column names, :class:`_schema.Column`
622 objects, and/or SQL expression elements, which would identify a unique index
623 or unique constraint.
624
625* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements`
626 to infer an index, a partial index can be inferred by also specifying the
627 :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter:
628
629 .. sourcecode:: pycon+sql
630
631 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data")
632
633 >>> do_update_stmt = stmt.on_conflict_do_update(
634 ... index_elements=[my_table.c.user_email],
635 ... index_where=my_table.c.user_email.like("%@gmail.com"),
636 ... set_=dict(data=stmt.excluded.data),
637 ... )
638
639 >>> print(do_update_stmt)
640 {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?)
641 ON CONFLICT (user_email)
642 WHERE user_email LIKE '%@gmail.com'
643 DO UPDATE SET data = excluded.data
644
645The SET Clause
646^^^^^^^^^^^^^^^
647
648``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
649existing row, using any combination of new values as well as values
650from the proposed insertion. These values are specified using the
651:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter. This
652parameter accepts a dictionary which consists of direct values
653for UPDATE:
654
655.. sourcecode:: pycon+sql
656
657 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
658
659 >>> do_update_stmt = stmt.on_conflict_do_update(
660 ... index_elements=["id"], set_=dict(data="updated value")
661 ... )
662
663 >>> print(do_update_stmt)
664 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
665 ON CONFLICT (id) DO UPDATE SET data = ?
666
667.. warning::
668
669 The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take
670 into account Python-side default UPDATE values or generation functions,
671 e.g. those specified using :paramref:`_schema.Column.onupdate`. These
672 values will not be exercised for an ON CONFLICT style of UPDATE, unless
673 they are manually specified in the
674 :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary.
675
676Updating using the Excluded INSERT Values
677^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
678
679In order to refer to the proposed insertion row, the special alias
680:attr:`~.sqlite.Insert.excluded` is available as an attribute on
681the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix
682on a column, that informs the DO UPDATE to update the row with the value that
683would have been inserted had the constraint not failed:
684
685.. sourcecode:: pycon+sql
686
687 >>> stmt = insert(my_table).values(
688 ... id="some_id", data="inserted value", author="jlh"
689 ... )
690
691 >>> do_update_stmt = stmt.on_conflict_do_update(
692 ... index_elements=["id"],
693 ... set_=dict(data="updated value", author=stmt.excluded.author),
694 ... )
695
696 >>> print(do_update_stmt)
697 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
698 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
699
700Additional WHERE Criteria
701^^^^^^^^^^^^^^^^^^^^^^^^^
702
703The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts
704a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where`
705parameter, which will limit those rows which receive an UPDATE:
706
707.. sourcecode:: pycon+sql
708
709 >>> stmt = insert(my_table).values(
710 ... id="some_id", data="inserted value", author="jlh"
711 ... )
712
713 >>> on_update_stmt = stmt.on_conflict_do_update(
714 ... index_elements=["id"],
715 ... set_=dict(data="updated value", author=stmt.excluded.author),
716 ... where=(my_table.c.status == 2),
717 ... )
718 >>> print(on_update_stmt)
719 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
720 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
721 WHERE my_table.status = ?
722
723
724Skipping Rows with DO NOTHING
725^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
726
727``ON CONFLICT`` may be used to skip inserting a row entirely
728if any conflict with a unique constraint occurs; below this is illustrated
729using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method:
730
731.. sourcecode:: pycon+sql
732
733 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
734 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
735 >>> print(stmt)
736 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING
737
738
739If ``DO NOTHING`` is used without specifying any columns or constraint,
740it has the effect of skipping the INSERT for any unique violation which
741occurs:
742
743.. sourcecode:: pycon+sql
744
745 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
746 >>> stmt = stmt.on_conflict_do_nothing()
747 >>> print(stmt)
748 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING
749
750.. _sqlite_type_reflection:
751
752Type Reflection
753---------------
754
755SQLite types are unlike those of most other database backends, in that
756the string name of the type usually does not correspond to a "type" in a
757one-to-one fashion. Instead, SQLite links per-column typing behavior
758to one of five so-called "type affinities" based on a string matching
759pattern for the type.
760
761SQLAlchemy's reflection process, when inspecting types, uses a simple
762lookup table to link the keywords returned to provided SQLAlchemy types.
763This lookup table is present within the SQLite dialect as it is for all
764other dialects. However, the SQLite dialect has a different "fallback"
765routine for when a particular type name is not located in the lookup map;
766it instead implements the SQLite "type affinity" scheme located at
767https://www.sqlite.org/datatype3.html section 2.1.
768
769The provided typemap will make direct associations from an exact string
770name match for the following types:
771
772:class:`_types.BIGINT`, :class:`_types.BLOB`,
773:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`,
774:class:`_types.CHAR`, :class:`_types.DATE`,
775:class:`_types.DATETIME`, :class:`_types.FLOAT`,
776:class:`_types.DECIMAL`, :class:`_types.FLOAT`,
777:class:`_types.INTEGER`, :class:`_types.INTEGER`,
778:class:`_types.NUMERIC`, :class:`_types.REAL`,
779:class:`_types.SMALLINT`, :class:`_types.TEXT`,
780:class:`_types.TIME`, :class:`_types.TIMESTAMP`,
781:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`,
782:class:`_types.NCHAR`
783
784When a type name does not match one of the above types, the "type affinity"
785lookup is used instead:
786
787* :class:`_types.INTEGER` is returned if the type name includes the
788 string ``INT``
789* :class:`_types.TEXT` is returned if the type name includes the
790 string ``CHAR``, ``CLOB`` or ``TEXT``
791* :class:`_types.NullType` is returned if the type name includes the
792 string ``BLOB``
793* :class:`_types.REAL` is returned if the type name includes the string
794 ``REAL``, ``FLOA`` or ``DOUB``.
795* Otherwise, the :class:`_types.NUMERIC` type is used.
796
797.. _sqlite_partial_index:
798
799Partial Indexes
800---------------
801
802A partial index, e.g. one which uses a WHERE clause, can be specified
803with the DDL system using the argument ``sqlite_where``::
804
805 tbl = Table("testtbl", m, Column("data", Integer))
806 idx = Index(
807 "test_idx1",
808 tbl.c.data,
809 sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10),
810 )
811
812The index will be rendered at create time as:
813
814.. sourcecode:: sql
815
816 CREATE INDEX test_idx1 ON testtbl (data)
817 WHERE data > 5 AND data < 10
818
819.. _sqlite_dotted_column_names:
820
821Dotted Column Names
822-------------------
823
824Using table or column names that explicitly have periods in them is
825**not recommended**. While this is generally a bad idea for relational
826databases in general, as the dot is a syntactically significant character,
827the SQLite driver up until version **3.10.0** of SQLite has a bug which
828requires that SQLAlchemy filter out these dots in result sets.
829
830The bug, entirely outside of SQLAlchemy, can be illustrated thusly::
831
832 import sqlite3
833
834 assert sqlite3.sqlite_version_info < (
835 3,
836 10,
837 0,
838 ), "bug is fixed in this version"
839
840 conn = sqlite3.connect(":memory:")
841 cursor = conn.cursor()
842
843 cursor.execute("create table x (a integer, b integer)")
844 cursor.execute("insert into x (a, b) values (1, 1)")
845 cursor.execute("insert into x (a, b) values (2, 2)")
846
847 cursor.execute("select x.a, x.b from x")
848 assert [c[0] for c in cursor.description] == ["a", "b"]
849
850 cursor.execute("""
851 select x.a, x.b from x where a=1
852 union
853 select x.a, x.b from x where a=2
854 """)
855 assert [c[0] for c in cursor.description] == ["a", "b"], [
856 c[0] for c in cursor.description
857 ]
858
859The second assertion fails:
860
861.. sourcecode:: text
862
863 Traceback (most recent call last):
864 File "test.py", line 19, in <module>
865 [c[0] for c in cursor.description]
866 AssertionError: ['x.a', 'x.b']
867
868Where above, the driver incorrectly reports the names of the columns
869including the name of the table, which is entirely inconsistent vs.
870when the UNION is not present.
871
872SQLAlchemy relies upon column names being predictable in how they match
873to the original statement, so the SQLAlchemy dialect has no choice but
874to filter these out::
875
876
877 from sqlalchemy import create_engine
878
879 eng = create_engine("sqlite://")
880 conn = eng.connect()
881
882 conn.exec_driver_sql("create table x (a integer, b integer)")
883 conn.exec_driver_sql("insert into x (a, b) values (1, 1)")
884 conn.exec_driver_sql("insert into x (a, b) values (2, 2)")
885
886 result = conn.exec_driver_sql("select x.a, x.b from x")
887 assert result.keys() == ["a", "b"]
888
889 result = conn.exec_driver_sql("""
890 select x.a, x.b from x where a=1
891 union
892 select x.a, x.b from x where a=2
893 """)
894 assert result.keys() == ["a", "b"]
895
896Note that above, even though SQLAlchemy filters out the dots, *both
897names are still addressable*::
898
899 >>> row = result.first()
900 >>> row["a"]
901 1
902 >>> row["x.a"]
903 1
904 >>> row["b"]
905 1
906 >>> row["x.b"]
907 1
908
909Therefore, the workaround applied by SQLAlchemy only impacts
910:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In
911the very specific case where an application is forced to use column names that
912contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and
913:meth:`.Row.keys()` is required to return these dotted names unmodified,
914the ``sqlite_raw_colnames`` execution option may be provided, either on a
915per-:class:`_engine.Connection` basis::
916
917 result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql(
918 """
919 select x.a, x.b from x where a=1
920 union
921 select x.a, x.b from x where a=2
922 """
923 )
924 assert result.keys() == ["x.a", "x.b"]
925
926or on a per-:class:`_engine.Engine` basis::
927
928 engine = create_engine(
929 "sqlite://", execution_options={"sqlite_raw_colnames": True}
930 )
931
932When using the per-:class:`_engine.Engine` execution option, note that
933**Core and ORM queries that use UNION may not function properly**.
934
935SQLite-specific table options
936-----------------------------
937
938One option for CREATE TABLE is supported directly by the SQLite
939dialect in conjunction with the :class:`_schema.Table` construct:
940
941* ``WITHOUT ROWID``::
942
943 Table("some_table", metadata, ..., sqlite_with_rowid=False)
944
945*
946 ``STRICT``::
947
948 Table("some_table", metadata, ..., sqlite_strict=True)
949
950 .. versionadded:: 2.0.37
951
952.. seealso::
953
954 `SQLite CREATE TABLE options
955 <https://www.sqlite.org/lang_createtable.html>`_
956
957.. _sqlite_include_internal:
958
959Reflecting internal schema tables
960----------------------------------
961
962Reflection methods that return lists of tables will omit so-called
963"SQLite internal schema object" names, which are considered by SQLite
964as any object name that is prefixed with ``sqlite_``. An example of
965such an object is the ``sqlite_sequence`` table that's generated when
966the ``AUTOINCREMENT`` column parameter is used. In order to return
967these objects, the parameter ``sqlite_include_internal=True`` may be
968passed to methods such as :meth:`_schema.MetaData.reflect` or
969:meth:`.Inspector.get_table_names`.
970
971.. versionadded:: 2.0 Added the ``sqlite_include_internal=True`` parameter.
972 Previously, these tables were not ignored by SQLAlchemy reflection
973 methods.
974
975.. note::
976
977 The ``sqlite_include_internal`` parameter does not refer to the
978 "system" tables that are present in schemas such as ``sqlite_master``.
979
980.. seealso::
981
982 `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite
983 documentation.
984
985''' # noqa
986
987from __future__ import annotations
988
989import datetime
990import numbers
991import re
992from typing import Any
993from typing import Callable
994from typing import Optional
995from typing import TYPE_CHECKING
996
997from .json import JSON
998from .json import JSONIndexType
999from .json import JSONPathType
1000from ... import exc
1001from ... import schema as sa_schema
1002from ... import sql
1003from ... import text
1004from ... import types as sqltypes
1005from ... import util
1006from ...engine import default
1007from ...engine import processors
1008from ...engine import reflection
1009from ...engine.reflection import ReflectionDefaults
1010from ...sql import coercions
1011from ...sql import compiler
1012from ...sql import elements
1013from ...sql import roles
1014from ...sql import schema
1015from ...types import BLOB # noqa
1016from ...types import BOOLEAN # noqa
1017from ...types import CHAR # noqa
1018from ...types import DECIMAL # noqa
1019from ...types import FLOAT # noqa
1020from ...types import INTEGER # noqa
1021from ...types import NUMERIC # noqa
1022from ...types import REAL # noqa
1023from ...types import SMALLINT # noqa
1024from ...types import TEXT # noqa
1025from ...types import TIMESTAMP # noqa
1026from ...types import VARCHAR # noqa
1027
1028if TYPE_CHECKING:
1029 from ...engine.interfaces import DBAPIConnection
1030 from ...engine.interfaces import Dialect
1031 from ...engine.interfaces import IsolationLevel
1032 from ...sql.type_api import _BindProcessorType
1033 from ...sql.type_api import _ResultProcessorType
1034
1035
1036class _SQliteJson(JSON):
1037 def result_processor(self, dialect, coltype):
1038 default_processor = super().result_processor(dialect, coltype)
1039
1040 def process(value):
1041 try:
1042 return default_processor(value)
1043 except TypeError:
1044 if isinstance(value, numbers.Number):
1045 return value
1046 else:
1047 raise
1048
1049 return process
1050
1051
1052class _DateTimeMixin:
1053 _reg = None
1054 _storage_format = None
1055
1056 def __init__(self, storage_format=None, regexp=None, **kw):
1057 super().__init__(**kw)
1058 if regexp is not None:
1059 self._reg = re.compile(regexp)
1060 if storage_format is not None:
1061 self._storage_format = storage_format
1062
1063 @property
1064 def format_is_text_affinity(self):
1065 """return True if the storage format will automatically imply
1066 a TEXT affinity.
1067
1068 If the storage format contains no non-numeric characters,
1069 it will imply a NUMERIC storage format on SQLite; in this case,
1070 the type will generate its DDL as DATE_CHAR, DATETIME_CHAR,
1071 TIME_CHAR.
1072
1073 """
1074 spec = self._storage_format % {
1075 "year": 0,
1076 "month": 0,
1077 "day": 0,
1078 "hour": 0,
1079 "minute": 0,
1080 "second": 0,
1081 "microsecond": 0,
1082 }
1083 return bool(re.search(r"[^0-9]", spec))
1084
1085 def adapt(self, cls, **kw):
1086 if issubclass(cls, _DateTimeMixin):
1087 if self._storage_format:
1088 kw["storage_format"] = self._storage_format
1089 if self._reg:
1090 kw["regexp"] = self._reg
1091 return super().adapt(cls, **kw)
1092
1093 def literal_processor(self, dialect):
1094 bp = self.bind_processor(dialect)
1095
1096 def process(value):
1097 return "'%s'" % bp(value)
1098
1099 return process
1100
1101
1102class DATETIME(_DateTimeMixin, sqltypes.DateTime):
1103 r"""Represent a Python datetime object in SQLite using a string.
1104
1105 The default string storage format is::
1106
1107 "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1108
1109 e.g.:
1110
1111 .. sourcecode:: text
1112
1113 2021-03-15 12:05:57.105542
1114
1115 The incoming storage format is by default parsed using the
1116 Python ``datetime.fromisoformat()`` function.
1117
1118 .. versionchanged:: 2.0 ``datetime.fromisoformat()`` is used for default
1119 datetime string parsing.
1120
1121 The storage format can be customized to some degree using the
1122 ``storage_format`` and ``regexp`` parameters, such as::
1123
1124 import re
1125 from sqlalchemy.dialects.sqlite import DATETIME
1126
1127 dt = DATETIME(
1128 storage_format=(
1129 "%(year)04d/%(month)02d/%(day)02d %(hour)02d:%(minute)02d:%(second)02d"
1130 ),
1131 regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)",
1132 )
1133
1134 :param truncate_microseconds: when ``True`` microseconds will be truncated
1135 from the datetime. Can't be specified together with ``storage_format``
1136 or ``regexp``.
1137
1138 :param storage_format: format string which will be applied to the dict
1139 with keys year, month, day, hour, minute, second, and microsecond.
1140
1141 :param regexp: regular expression which will be applied to incoming result
1142 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming
1143 strings. If the regexp contains named groups, the resulting match dict is
1144 applied to the Python datetime() constructor as keyword arguments.
1145 Otherwise, if positional groups are used, the datetime() constructor
1146 is called with positional arguments via
1147 ``*map(int, match_obj.groups(0))``.
1148
1149 """ # noqa
1150
1151 _storage_format = (
1152 "%(year)04d-%(month)02d-%(day)02d "
1153 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1154 )
1155
1156 def __init__(self, *args, **kwargs):
1157 truncate_microseconds = kwargs.pop("truncate_microseconds", False)
1158 super().__init__(*args, **kwargs)
1159 if truncate_microseconds:
1160 assert "storage_format" not in kwargs, (
1161 "You can specify only "
1162 "one of truncate_microseconds or storage_format."
1163 )
1164 assert "regexp" not in kwargs, (
1165 "You can specify only one of "
1166 "truncate_microseconds or regexp."
1167 )
1168 self._storage_format = (
1169 "%(year)04d-%(month)02d-%(day)02d "
1170 "%(hour)02d:%(minute)02d:%(second)02d"
1171 )
1172
1173 def bind_processor(
1174 self, dialect: Dialect
1175 ) -> Optional[_BindProcessorType[Any]]:
1176 datetime_datetime = datetime.datetime
1177 datetime_date = datetime.date
1178 format_ = self._storage_format
1179
1180 def process(value):
1181 if value is None:
1182 return None
1183 elif isinstance(value, datetime_datetime):
1184 return format_ % {
1185 "year": value.year,
1186 "month": value.month,
1187 "day": value.day,
1188 "hour": value.hour,
1189 "minute": value.minute,
1190 "second": value.second,
1191 "microsecond": value.microsecond,
1192 }
1193 elif isinstance(value, datetime_date):
1194 return format_ % {
1195 "year": value.year,
1196 "month": value.month,
1197 "day": value.day,
1198 "hour": 0,
1199 "minute": 0,
1200 "second": 0,
1201 "microsecond": 0,
1202 }
1203 else:
1204 raise TypeError(
1205 "SQLite DateTime type only accepts Python "
1206 "datetime and date objects as input."
1207 )
1208
1209 return process
1210
1211 def result_processor(
1212 self, dialect: Dialect, coltype: object
1213 ) -> Optional[_ResultProcessorType[Any]]:
1214 if self._reg:
1215 return processors.str_to_datetime_processor_factory(
1216 self._reg, datetime.datetime
1217 )
1218 else:
1219 return processors.str_to_datetime
1220
1221
1222class DATE(_DateTimeMixin, sqltypes.Date):
1223 r"""Represent a Python date object in SQLite using a string.
1224
1225 The default string storage format is::
1226
1227 "%(year)04d-%(month)02d-%(day)02d"
1228
1229 e.g.:
1230
1231 .. sourcecode:: text
1232
1233 2011-03-15
1234
1235 The incoming storage format is by default parsed using the
1236 Python ``date.fromisoformat()`` function.
1237
1238 .. versionchanged:: 2.0 ``date.fromisoformat()`` is used for default
1239 date string parsing.
1240
1241
1242 The storage format can be customized to some degree using the
1243 ``storage_format`` and ``regexp`` parameters, such as::
1244
1245 import re
1246 from sqlalchemy.dialects.sqlite import DATE
1247
1248 d = DATE(
1249 storage_format="%(month)02d/%(day)02d/%(year)04d",
1250 regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)"),
1251 )
1252
1253 :param storage_format: format string which will be applied to the
1254 dict with keys year, month, and day.
1255
1256 :param regexp: regular expression which will be applied to
1257 incoming result rows, replacing the use of ``date.fromisoformat()`` to
1258 parse incoming strings. If the regexp contains named groups, the resulting
1259 match dict is applied to the Python date() constructor as keyword
1260 arguments. Otherwise, if positional groups are used, the date()
1261 constructor is called with positional arguments via
1262 ``*map(int, match_obj.groups(0))``.
1263
1264 """
1265
1266 _storage_format = "%(year)04d-%(month)02d-%(day)02d"
1267
1268 def bind_processor(
1269 self, dialect: Dialect
1270 ) -> Optional[_BindProcessorType[Any]]:
1271 datetime_date = datetime.date
1272 format_ = self._storage_format
1273
1274 def process(value):
1275 if value is None:
1276 return None
1277 elif isinstance(value, datetime_date):
1278 return format_ % {
1279 "year": value.year,
1280 "month": value.month,
1281 "day": value.day,
1282 }
1283 else:
1284 raise TypeError(
1285 "SQLite Date type only accepts Python "
1286 "date objects as input."
1287 )
1288
1289 return process
1290
1291 def result_processor(
1292 self, dialect: Dialect, coltype: object
1293 ) -> Optional[_ResultProcessorType[Any]]:
1294 if self._reg:
1295 return processors.str_to_datetime_processor_factory(
1296 self._reg, datetime.date
1297 )
1298 else:
1299 return processors.str_to_date
1300
1301
1302class TIME(_DateTimeMixin, sqltypes.Time):
1303 r"""Represent a Python time object in SQLite using a string.
1304
1305 The default string storage format is::
1306
1307 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1308
1309 e.g.:
1310
1311 .. sourcecode:: text
1312
1313 12:05:57.10558
1314
1315 The incoming storage format is by default parsed using the
1316 Python ``time.fromisoformat()`` function.
1317
1318 .. versionchanged:: 2.0 ``time.fromisoformat()`` is used for default
1319 time string parsing.
1320
1321 The storage format can be customized to some degree using the
1322 ``storage_format`` and ``regexp`` parameters, such as::
1323
1324 import re
1325 from sqlalchemy.dialects.sqlite import TIME
1326
1327 t = TIME(
1328 storage_format="%(hour)02d-%(minute)02d-%(second)02d-%(microsecond)06d",
1329 regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?"),
1330 )
1331
1332 :param truncate_microseconds: when ``True`` microseconds will be truncated
1333 from the time. Can't be specified together with ``storage_format``
1334 or ``regexp``.
1335
1336 :param storage_format: format string which will be applied to the dict
1337 with keys hour, minute, second, and microsecond.
1338
1339 :param regexp: regular expression which will be applied to incoming result
1340 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming
1341 strings. If the regexp contains named groups, the resulting match dict is
1342 applied to the Python time() constructor as keyword arguments. Otherwise,
1343 if positional groups are used, the time() constructor is called with
1344 positional arguments via ``*map(int, match_obj.groups(0))``.
1345
1346 """
1347
1348 _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1349
1350 def __init__(self, *args, **kwargs):
1351 truncate_microseconds = kwargs.pop("truncate_microseconds", False)
1352 super().__init__(*args, **kwargs)
1353 if truncate_microseconds:
1354 assert "storage_format" not in kwargs, (
1355 "You can specify only "
1356 "one of truncate_microseconds or storage_format."
1357 )
1358 assert "regexp" not in kwargs, (
1359 "You can specify only one of "
1360 "truncate_microseconds or regexp."
1361 )
1362 self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d"
1363
1364 def bind_processor(self, dialect):
1365 datetime_time = datetime.time
1366 format_ = self._storage_format
1367
1368 def process(value):
1369 if value is None:
1370 return None
1371 elif isinstance(value, datetime_time):
1372 return format_ % {
1373 "hour": value.hour,
1374 "minute": value.minute,
1375 "second": value.second,
1376 "microsecond": value.microsecond,
1377 }
1378 else:
1379 raise TypeError(
1380 "SQLite Time type only accepts Python "
1381 "time objects as input."
1382 )
1383
1384 return process
1385
1386 def result_processor(self, dialect, coltype):
1387 if self._reg:
1388 return processors.str_to_datetime_processor_factory(
1389 self._reg, datetime.time
1390 )
1391 else:
1392 return processors.str_to_time
1393
1394
1395colspecs = {
1396 sqltypes.Date: DATE,
1397 sqltypes.DateTime: DATETIME,
1398 sqltypes.JSON: _SQliteJson,
1399 sqltypes.JSON.JSONIndexType: JSONIndexType,
1400 sqltypes.JSON.JSONPathType: JSONPathType,
1401 sqltypes.Time: TIME,
1402}
1403
1404ischema_names = {
1405 "BIGINT": sqltypes.BIGINT,
1406 "BLOB": sqltypes.BLOB,
1407 "BOOL": sqltypes.BOOLEAN,
1408 "BOOLEAN": sqltypes.BOOLEAN,
1409 "CHAR": sqltypes.CHAR,
1410 "DATE": sqltypes.DATE,
1411 "DATE_CHAR": sqltypes.DATE,
1412 "DATETIME": sqltypes.DATETIME,
1413 "DATETIME_CHAR": sqltypes.DATETIME,
1414 "DOUBLE": sqltypes.DOUBLE,
1415 "DECIMAL": sqltypes.DECIMAL,
1416 "FLOAT": sqltypes.FLOAT,
1417 "INT": sqltypes.INTEGER,
1418 "INTEGER": sqltypes.INTEGER,
1419 "JSON": JSON,
1420 "NUMERIC": sqltypes.NUMERIC,
1421 "REAL": sqltypes.REAL,
1422 "SMALLINT": sqltypes.SMALLINT,
1423 "TEXT": sqltypes.TEXT,
1424 "TIME": sqltypes.TIME,
1425 "TIME_CHAR": sqltypes.TIME,
1426 "TIMESTAMP": sqltypes.TIMESTAMP,
1427 "VARCHAR": sqltypes.VARCHAR,
1428 "NVARCHAR": sqltypes.NVARCHAR,
1429 "NCHAR": sqltypes.NCHAR,
1430}
1431
1432
1433class SQLiteCompiler(compiler.SQLCompiler):
1434 extract_map = util.update_copy(
1435 compiler.SQLCompiler.extract_map,
1436 {
1437 "month": "%m",
1438 "day": "%d",
1439 "year": "%Y",
1440 "second": "%S",
1441 "hour": "%H",
1442 "doy": "%j",
1443 "minute": "%M",
1444 "epoch": "%s",
1445 "dow": "%w",
1446 "week": "%W",
1447 },
1448 )
1449
1450 def visit_truediv_binary(self, binary, operator, **kw):
1451 return (
1452 self.process(binary.left, **kw)
1453 + " / "
1454 + "(%s + 0.0)" % self.process(binary.right, **kw)
1455 )
1456
1457 def visit_now_func(self, fn, **kw):
1458 return "CURRENT_TIMESTAMP"
1459
1460 def visit_localtimestamp_func(self, func, **kw):
1461 return "DATETIME(CURRENT_TIMESTAMP, 'localtime')"
1462
1463 def visit_true(self, expr, **kw):
1464 return "1"
1465
1466 def visit_false(self, expr, **kw):
1467 return "0"
1468
1469 def visit_char_length_func(self, fn, **kw):
1470 return "length%s" % self.function_argspec(fn)
1471
1472 def visit_aggregate_strings_func(self, fn, **kw):
1473 return "group_concat%s" % self.function_argspec(fn)
1474
1475 def visit_cast(self, cast, **kwargs):
1476 if self.dialect.supports_cast:
1477 return super().visit_cast(cast, **kwargs)
1478 else:
1479 return self.process(cast.clause, **kwargs)
1480
1481 def visit_extract(self, extract, **kw):
1482 try:
1483 return "CAST(STRFTIME('%s', %s) AS INTEGER)" % (
1484 self.extract_map[extract.field],
1485 self.process(extract.expr, **kw),
1486 )
1487 except KeyError as err:
1488 raise exc.CompileError(
1489 "%s is not a valid extract argument." % extract.field
1490 ) from err
1491
1492 def returning_clause(
1493 self,
1494 stmt,
1495 returning_cols,
1496 *,
1497 populate_result_map,
1498 **kw,
1499 ):
1500 kw["include_table"] = False
1501 return super().returning_clause(
1502 stmt, returning_cols, populate_result_map=populate_result_map, **kw
1503 )
1504
1505 def limit_clause(self, select, **kw):
1506 text = ""
1507 if select._limit_clause is not None:
1508 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
1509 if select._offset_clause is not None:
1510 if select._limit_clause is None:
1511 text += "\n LIMIT " + self.process(sql.literal(-1))
1512 text += " OFFSET " + self.process(select._offset_clause, **kw)
1513 else:
1514 text += " OFFSET " + self.process(sql.literal(0), **kw)
1515 return text
1516
1517 def for_update_clause(self, select, **kw):
1518 # sqlite has no "FOR UPDATE" AFAICT
1519 return ""
1520
1521 def update_from_clause(
1522 self, update_stmt, from_table, extra_froms, from_hints, **kw
1523 ):
1524 kw["asfrom"] = True
1525 return "FROM " + ", ".join(
1526 t._compiler_dispatch(self, fromhints=from_hints, **kw)
1527 for t in extra_froms
1528 )
1529
1530 def visit_is_distinct_from_binary(self, binary, operator, **kw):
1531 return "%s IS NOT %s" % (
1532 self.process(binary.left),
1533 self.process(binary.right),
1534 )
1535
1536 def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
1537 return "%s IS %s" % (
1538 self.process(binary.left),
1539 self.process(binary.right),
1540 )
1541
1542 def visit_json_getitem_op_binary(self, binary, operator, **kw):
1543 if binary.type._type_affinity is sqltypes.JSON:
1544 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
1545 else:
1546 expr = "JSON_EXTRACT(%s, %s)"
1547
1548 return expr % (
1549 self.process(binary.left, **kw),
1550 self.process(binary.right, **kw),
1551 )
1552
1553 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
1554 if binary.type._type_affinity is sqltypes.JSON:
1555 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
1556 else:
1557 expr = "JSON_EXTRACT(%s, %s)"
1558
1559 return expr % (
1560 self.process(binary.left, **kw),
1561 self.process(binary.right, **kw),
1562 )
1563
1564 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
1565 # slightly old SQLite versions don't seem to be able to handle
1566 # the empty set impl
1567 return self.visit_empty_set_expr(type_)
1568
1569 def visit_empty_set_expr(self, element_types, **kw):
1570 return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % (
1571 ", ".join("1" for type_ in element_types or [INTEGER()]),
1572 ", ".join("1" for type_ in element_types or [INTEGER()]),
1573 )
1574
1575 def visit_regexp_match_op_binary(self, binary, operator, **kw):
1576 return self._generate_generic_binary(binary, " REGEXP ", **kw)
1577
1578 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
1579 return self._generate_generic_binary(binary, " NOT REGEXP ", **kw)
1580
1581 def _on_conflict_target(self, clause, **kw):
1582 if clause.inferred_target_elements is not None:
1583 target_text = "(%s)" % ", ".join(
1584 (
1585 self.preparer.quote(c)
1586 if isinstance(c, str)
1587 else self.process(c, include_table=False, use_schema=False)
1588 )
1589 for c in clause.inferred_target_elements
1590 )
1591 if clause.inferred_target_whereclause is not None:
1592 whereclause_kw = dict(kw)
1593 whereclause_kw.update(
1594 include_table=False,
1595 use_schema=False,
1596 literal_execute=True,
1597 )
1598 target_text += " WHERE %s" % self.process(
1599 clause.inferred_target_whereclause,
1600 **whereclause_kw,
1601 )
1602
1603 else:
1604 target_text = ""
1605
1606 return target_text
1607
1608 def visit_on_conflict_do_nothing(self, on_conflict, **kw):
1609 target_text = self._on_conflict_target(on_conflict, **kw)
1610
1611 if target_text:
1612 return "ON CONFLICT %s DO NOTHING" % target_text
1613 else:
1614 return "ON CONFLICT DO NOTHING"
1615
1616 def visit_on_conflict_do_update(self, on_conflict, **kw):
1617 clause = on_conflict
1618
1619 target_text = self._on_conflict_target(on_conflict, **kw)
1620
1621 action_set_ops = []
1622
1623 set_parameters = dict(clause.update_values_to_set)
1624 # create a list of column assignment clauses as tuples
1625
1626 insert_statement = self.stack[-1]["selectable"]
1627 cols = insert_statement.table.c
1628 set_kw = dict(kw)
1629 set_kw.update(use_schema=False)
1630 for c in cols:
1631 col_key = c.key
1632
1633 if col_key in set_parameters:
1634 value = set_parameters.pop(col_key)
1635 elif c in set_parameters:
1636 value = set_parameters.pop(c)
1637 else:
1638 continue
1639
1640 if coercions._is_literal(value):
1641 value = elements.BindParameter(None, value, type_=c.type)
1642
1643 else:
1644 if (
1645 isinstance(value, elements.BindParameter)
1646 and value.type._isnull
1647 ):
1648 value = value._clone()
1649 value.type = c.type
1650 value_text = self.process(
1651 value.self_group(), is_upsert_set=True, **set_kw
1652 )
1653
1654 key_text = self.preparer.quote(c.name)
1655 action_set_ops.append("%s = %s" % (key_text, value_text))
1656
1657 # check for names that don't match columns
1658 if set_parameters:
1659 util.warn(
1660 "Additional column names not matching "
1661 "any column keys in table '%s': %s"
1662 % (
1663 self.current_executable.table.name,
1664 (", ".join("'%s'" % c for c in set_parameters)),
1665 )
1666 )
1667 for k, v in set_parameters.items():
1668 key_text = (
1669 self.preparer.quote(k)
1670 if isinstance(k, str)
1671 else self.process(k, **set_kw)
1672 )
1673 value_text = self.process(
1674 coercions.expect(roles.ExpressionElementRole, v),
1675 is_upsert_set=True,
1676 **set_kw,
1677 )
1678 action_set_ops.append("%s = %s" % (key_text, value_text))
1679
1680 action_text = ", ".join(action_set_ops)
1681 if clause.update_whereclause is not None:
1682 where_kw = dict(kw)
1683 where_kw.update(include_table=True, use_schema=False)
1684 action_text += " WHERE %s" % self.process(
1685 clause.update_whereclause, **where_kw
1686 )
1687
1688 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
1689
1690 def visit_bitwise_xor_op_binary(self, binary, operator, **kw):
1691 # sqlite has no xor. Use "a XOR b" = "(a | b) - (a & b)".
1692 kw["eager_grouping"] = True
1693 or_ = self._generate_generic_binary(binary, " | ", **kw)
1694 and_ = self._generate_generic_binary(binary, " & ", **kw)
1695 return f"({or_} - {and_})"
1696
1697
1698class SQLiteDDLCompiler(compiler.DDLCompiler):
1699 def get_column_specification(self, column, **kwargs):
1700 coltype = self.dialect.type_compiler_instance.process(
1701 column.type, type_expression=column
1702 )
1703 colspec = self.preparer.format_column(column) + " " + coltype
1704 default = self.get_column_default_string(column)
1705 if default is not None:
1706
1707 if not re.match(r"""^\s*[\'\"\(]""", default) and re.match(
1708 r".*\W.*", default
1709 ):
1710 colspec += f" DEFAULT ({default})"
1711 else:
1712 colspec += f" DEFAULT {default}"
1713
1714 if not column.nullable:
1715 colspec += " NOT NULL"
1716
1717 on_conflict_clause = column.dialect_options["sqlite"][
1718 "on_conflict_not_null"
1719 ]
1720 if on_conflict_clause is not None:
1721 colspec += " ON CONFLICT " + on_conflict_clause
1722
1723 if column.primary_key:
1724 if (
1725 column.autoincrement is True
1726 and len(column.table.primary_key.columns) != 1
1727 ):
1728 raise exc.CompileError(
1729 "SQLite does not support autoincrement for "
1730 "composite primary keys"
1731 )
1732
1733 if (
1734 column.table.dialect_options["sqlite"]["autoincrement"]
1735 and len(column.table.primary_key.columns) == 1
1736 and issubclass(column.type._type_affinity, sqltypes.Integer)
1737 and not column.foreign_keys
1738 ):
1739 colspec += " PRIMARY KEY"
1740
1741 on_conflict_clause = column.dialect_options["sqlite"][
1742 "on_conflict_primary_key"
1743 ]
1744 if on_conflict_clause is not None:
1745 colspec += " ON CONFLICT " + on_conflict_clause
1746
1747 colspec += " AUTOINCREMENT"
1748
1749 if column.computed is not None:
1750 colspec += " " + self.process(column.computed)
1751
1752 return colspec
1753
1754 def visit_primary_key_constraint(self, constraint, **kw):
1755 # for columns with sqlite_autoincrement=True,
1756 # the PRIMARY KEY constraint can only be inline
1757 # with the column itself.
1758 if len(constraint.columns) == 1:
1759 c = list(constraint)[0]
1760 if (
1761 c.primary_key
1762 and c.table.dialect_options["sqlite"]["autoincrement"]
1763 and issubclass(c.type._type_affinity, sqltypes.Integer)
1764 and not c.foreign_keys
1765 ):
1766 return None
1767
1768 text = super().visit_primary_key_constraint(constraint)
1769
1770 on_conflict_clause = constraint.dialect_options["sqlite"][
1771 "on_conflict"
1772 ]
1773 if on_conflict_clause is None and len(constraint.columns) == 1:
1774 on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][
1775 "on_conflict_primary_key"
1776 ]
1777
1778 if on_conflict_clause is not None:
1779 text += " ON CONFLICT " + on_conflict_clause
1780
1781 return text
1782
1783 def visit_unique_constraint(self, constraint, **kw):
1784 text = super().visit_unique_constraint(constraint)
1785
1786 on_conflict_clause = constraint.dialect_options["sqlite"][
1787 "on_conflict"
1788 ]
1789 if on_conflict_clause is None and len(constraint.columns) == 1:
1790 col1 = list(constraint)[0]
1791 if isinstance(col1, schema.SchemaItem):
1792 on_conflict_clause = list(constraint)[0].dialect_options[
1793 "sqlite"
1794 ]["on_conflict_unique"]
1795
1796 if on_conflict_clause is not None:
1797 text += " ON CONFLICT " + on_conflict_clause
1798
1799 return text
1800
1801 def visit_check_constraint(self, constraint, **kw):
1802 text = super().visit_check_constraint(constraint)
1803
1804 on_conflict_clause = constraint.dialect_options["sqlite"][
1805 "on_conflict"
1806 ]
1807
1808 if on_conflict_clause is not None:
1809 text += " ON CONFLICT " + on_conflict_clause
1810
1811 return text
1812
1813 def visit_column_check_constraint(self, constraint, **kw):
1814 text = super().visit_column_check_constraint(constraint)
1815
1816 if constraint.dialect_options["sqlite"]["on_conflict"] is not None:
1817 raise exc.CompileError(
1818 "SQLite does not support on conflict clause for "
1819 "column check constraint"
1820 )
1821
1822 return text
1823
1824 def visit_foreign_key_constraint(self, constraint, **kw):
1825 local_table = constraint.elements[0].parent.table
1826 remote_table = constraint.elements[0].column.table
1827
1828 if local_table.schema != remote_table.schema:
1829 return None
1830 else:
1831 return super().visit_foreign_key_constraint(constraint)
1832
1833 def define_constraint_remote_table(self, constraint, table, preparer):
1834 """Format the remote table clause of a CREATE CONSTRAINT clause."""
1835
1836 return preparer.format_table(table, use_schema=False)
1837
1838 def visit_create_index(
1839 self, create, include_schema=False, include_table_schema=True, **kw
1840 ):
1841 index = create.element
1842 self._verify_index_table(index)
1843 preparer = self.preparer
1844 text = "CREATE "
1845 if index.unique:
1846 text += "UNIQUE "
1847
1848 text += "INDEX "
1849
1850 if create.if_not_exists:
1851 text += "IF NOT EXISTS "
1852
1853 text += "%s ON %s (%s)" % (
1854 self._prepared_index_name(index, include_schema=True),
1855 preparer.format_table(index.table, use_schema=False),
1856 ", ".join(
1857 self.sql_compiler.process(
1858 expr, include_table=False, literal_binds=True
1859 )
1860 for expr in index.expressions
1861 ),
1862 )
1863
1864 whereclause = index.dialect_options["sqlite"]["where"]
1865 if whereclause is not None:
1866 where_compiled = self.sql_compiler.process(
1867 whereclause, include_table=False, literal_binds=True
1868 )
1869 text += " WHERE " + where_compiled
1870
1871 return text
1872
1873 def post_create_table(self, table):
1874 table_options = []
1875
1876 if not table.dialect_options["sqlite"]["with_rowid"]:
1877 table_options.append("WITHOUT ROWID")
1878
1879 if table.dialect_options["sqlite"]["strict"]:
1880 table_options.append("STRICT")
1881
1882 if table_options:
1883 return "\n " + ",\n ".join(table_options)
1884 else:
1885 return ""
1886
1887
1888class SQLiteTypeCompiler(compiler.GenericTypeCompiler):
1889 def visit_large_binary(self, type_, **kw):
1890 return self.visit_BLOB(type_)
1891
1892 def visit_DATETIME(self, type_, **kw):
1893 if (
1894 not isinstance(type_, _DateTimeMixin)
1895 or type_.format_is_text_affinity
1896 ):
1897 return super().visit_DATETIME(type_)
1898 else:
1899 return "DATETIME_CHAR"
1900
1901 def visit_DATE(self, type_, **kw):
1902 if (
1903 not isinstance(type_, _DateTimeMixin)
1904 or type_.format_is_text_affinity
1905 ):
1906 return super().visit_DATE(type_)
1907 else:
1908 return "DATE_CHAR"
1909
1910 def visit_TIME(self, type_, **kw):
1911 if (
1912 not isinstance(type_, _DateTimeMixin)
1913 or type_.format_is_text_affinity
1914 ):
1915 return super().visit_TIME(type_)
1916 else:
1917 return "TIME_CHAR"
1918
1919 def visit_JSON(self, type_, **kw):
1920 # note this name provides NUMERIC affinity, not TEXT.
1921 # should not be an issue unless the JSON value consists of a single
1922 # numeric value. JSONTEXT can be used if this case is required.
1923 return "JSON"
1924
1925
1926class SQLiteIdentifierPreparer(compiler.IdentifierPreparer):
1927 reserved_words = {
1928 "add",
1929 "after",
1930 "all",
1931 "alter",
1932 "analyze",
1933 "and",
1934 "as",
1935 "asc",
1936 "attach",
1937 "autoincrement",
1938 "before",
1939 "begin",
1940 "between",
1941 "by",
1942 "cascade",
1943 "case",
1944 "cast",
1945 "check",
1946 "collate",
1947 "column",
1948 "commit",
1949 "conflict",
1950 "constraint",
1951 "create",
1952 "cross",
1953 "current_date",
1954 "current_time",
1955 "current_timestamp",
1956 "database",
1957 "default",
1958 "deferrable",
1959 "deferred",
1960 "delete",
1961 "desc",
1962 "detach",
1963 "distinct",
1964 "drop",
1965 "each",
1966 "else",
1967 "end",
1968 "escape",
1969 "except",
1970 "exclusive",
1971 "exists",
1972 "explain",
1973 "false",
1974 "fail",
1975 "for",
1976 "foreign",
1977 "from",
1978 "full",
1979 "glob",
1980 "group",
1981 "having",
1982 "if",
1983 "ignore",
1984 "immediate",
1985 "in",
1986 "index",
1987 "indexed",
1988 "initially",
1989 "inner",
1990 "insert",
1991 "instead",
1992 "intersect",
1993 "into",
1994 "is",
1995 "isnull",
1996 "join",
1997 "key",
1998 "left",
1999 "like",
2000 "limit",
2001 "match",
2002 "natural",
2003 "not",
2004 "notnull",
2005 "null",
2006 "of",
2007 "offset",
2008 "on",
2009 "or",
2010 "order",
2011 "outer",
2012 "plan",
2013 "pragma",
2014 "primary",
2015 "query",
2016 "raise",
2017 "references",
2018 "reindex",
2019 "rename",
2020 "replace",
2021 "restrict",
2022 "right",
2023 "rollback",
2024 "row",
2025 "select",
2026 "set",
2027 "table",
2028 "temp",
2029 "temporary",
2030 "then",
2031 "to",
2032 "transaction",
2033 "trigger",
2034 "true",
2035 "union",
2036 "unique",
2037 "update",
2038 "using",
2039 "vacuum",
2040 "values",
2041 "view",
2042 "virtual",
2043 "when",
2044 "where",
2045 }
2046
2047
2048class SQLiteExecutionContext(default.DefaultExecutionContext):
2049 @util.memoized_property
2050 def _preserve_raw_colnames(self):
2051 return (
2052 not self.dialect._broken_dotted_colnames
2053 or self.execution_options.get("sqlite_raw_colnames", False)
2054 )
2055
2056 def _translate_colname(self, colname):
2057 # TODO: detect SQLite version 3.10.0 or greater;
2058 # see [ticket:3633]
2059
2060 # adjust for dotted column names. SQLite
2061 # in the case of UNION may store col names as
2062 # "tablename.colname", or if using an attached database,
2063 # "database.tablename.colname", in cursor.description
2064 if not self._preserve_raw_colnames and "." in colname:
2065 return colname.split(".")[-1], colname
2066 else:
2067 return colname, None
2068
2069
2070class SQLiteDialect(default.DefaultDialect):
2071 name = "sqlite"
2072 supports_alter = False
2073
2074 # SQlite supports "DEFAULT VALUES" but *does not* support
2075 # "VALUES (DEFAULT)"
2076 supports_default_values = True
2077 supports_default_metavalue = False
2078
2079 # sqlite issue:
2080 # https://github.com/python/cpython/issues/93421
2081 # note this parameter is no longer used by the ORM or default dialect
2082 # see #9414
2083 supports_sane_rowcount_returning = False
2084
2085 supports_empty_insert = False
2086 supports_cast = True
2087 supports_multivalues_insert = True
2088 use_insertmanyvalues = True
2089 tuple_in_values = True
2090 supports_statement_cache = True
2091 insert_null_pk_still_autoincrements = True
2092 insert_returning = True
2093 update_returning = True
2094 update_returning_multifrom = True
2095 delete_returning = True
2096 update_returning_multifrom = True
2097
2098 supports_default_metavalue = True
2099 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
2100
2101 default_metavalue_token = "NULL"
2102 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
2103 parenthesis."""
2104
2105 default_paramstyle = "qmark"
2106 execution_ctx_cls = SQLiteExecutionContext
2107 statement_compiler = SQLiteCompiler
2108 ddl_compiler = SQLiteDDLCompiler
2109 type_compiler_cls = SQLiteTypeCompiler
2110 preparer = SQLiteIdentifierPreparer
2111 ischema_names = ischema_names
2112 colspecs = colspecs
2113
2114 construct_arguments = [
2115 (
2116 sa_schema.Table,
2117 {
2118 "autoincrement": False,
2119 "with_rowid": True,
2120 "strict": False,
2121 },
2122 ),
2123 (sa_schema.Index, {"where": None}),
2124 (
2125 sa_schema.Column,
2126 {
2127 "on_conflict_primary_key": None,
2128 "on_conflict_not_null": None,
2129 "on_conflict_unique": None,
2130 },
2131 ),
2132 (sa_schema.Constraint, {"on_conflict": None}),
2133 ]
2134
2135 _broken_fk_pragma_quotes = False
2136 _broken_dotted_colnames = False
2137
2138 @util.deprecated_params(
2139 _json_serializer=(
2140 "1.3.7",
2141 "The _json_serializer argument to the SQLite dialect has "
2142 "been renamed to the correct name of json_serializer. The old "
2143 "argument name will be removed in a future release.",
2144 ),
2145 _json_deserializer=(
2146 "1.3.7",
2147 "The _json_deserializer argument to the SQLite dialect has "
2148 "been renamed to the correct name of json_deserializer. The old "
2149 "argument name will be removed in a future release.",
2150 ),
2151 )
2152 def __init__(
2153 self,
2154 native_datetime: bool = False,
2155 json_serializer: Optional[Callable[..., Any]] = None,
2156 json_deserializer: Optional[Callable[..., Any]] = None,
2157 _json_serializer: Optional[Callable[..., Any]] = None,
2158 _json_deserializer: Optional[Callable[..., Any]] = None,
2159 **kwargs: Any,
2160 ) -> None:
2161 default.DefaultDialect.__init__(self, **kwargs)
2162
2163 if _json_serializer:
2164 json_serializer = _json_serializer
2165 if _json_deserializer:
2166 json_deserializer = _json_deserializer
2167 self._json_serializer = json_serializer
2168 self._json_deserializer = json_deserializer
2169
2170 # this flag used by pysqlite dialect, and perhaps others in the
2171 # future, to indicate the driver is handling date/timestamp
2172 # conversions (and perhaps datetime/time as well on some hypothetical
2173 # driver ?)
2174 self.native_datetime = native_datetime
2175
2176 if self.dbapi is not None:
2177 if self.dbapi.sqlite_version_info < (3, 7, 16):
2178 util.warn(
2179 "SQLite version %s is older than 3.7.16, and will not "
2180 "support right nested joins, as are sometimes used in "
2181 "more complex ORM scenarios. SQLAlchemy 1.4 and above "
2182 "no longer tries to rewrite these joins."
2183 % (self.dbapi.sqlite_version_info,)
2184 )
2185
2186 # NOTE: python 3.7 on fedora for me has SQLite 3.34.1. These
2187 # version checks are getting very stale.
2188 self._broken_dotted_colnames = self.dbapi.sqlite_version_info < (
2189 3,
2190 10,
2191 0,
2192 )
2193 self.supports_default_values = self.dbapi.sqlite_version_info >= (
2194 3,
2195 3,
2196 8,
2197 )
2198 self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3)
2199 self.supports_multivalues_insert = (
2200 # https://www.sqlite.org/releaselog/3_7_11.html
2201 self.dbapi.sqlite_version_info
2202 >= (3, 7, 11)
2203 )
2204 # see https://www.sqlalchemy.org/trac/ticket/2568
2205 # as well as https://www.sqlite.org/src/info/600482d161
2206 self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < (
2207 3,
2208 6,
2209 14,
2210 )
2211
2212 if self.dbapi.sqlite_version_info < (3, 35):
2213 self.update_returning = self.delete_returning = (
2214 self.insert_returning
2215 ) = False
2216
2217 if self.dbapi.sqlite_version_info < (3, 32, 0):
2218 # https://www.sqlite.org/limits.html
2219 self.insertmanyvalues_max_parameters = 999
2220
2221 _isolation_lookup = util.immutabledict(
2222 {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0}
2223 )
2224
2225 def get_isolation_level_values(self, dbapi_connection):
2226 return list(self._isolation_lookup)
2227
2228 def set_isolation_level(
2229 self, dbapi_connection: DBAPIConnection, level: IsolationLevel
2230 ) -> None:
2231 isolation_level = self._isolation_lookup[level]
2232
2233 cursor = dbapi_connection.cursor()
2234 cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}")
2235 cursor.close()
2236
2237 def get_isolation_level(self, dbapi_connection):
2238 cursor = dbapi_connection.cursor()
2239 cursor.execute("PRAGMA read_uncommitted")
2240 res = cursor.fetchone()
2241 if res:
2242 value = res[0]
2243 else:
2244 # https://www.sqlite.org/changes.html#version_3_3_3
2245 # "Optional READ UNCOMMITTED isolation (instead of the
2246 # default isolation level of SERIALIZABLE) and
2247 # table level locking when database connections
2248 # share a common cache.""
2249 # pre-SQLite 3.3.0 default to 0
2250 value = 0
2251 cursor.close()
2252 if value == 0:
2253 return "SERIALIZABLE"
2254 elif value == 1:
2255 return "READ UNCOMMITTED"
2256 else:
2257 assert False, "Unknown isolation level %s" % value
2258
2259 @reflection.cache
2260 def get_schema_names(self, connection, **kw):
2261 s = "PRAGMA database_list"
2262 dl = connection.exec_driver_sql(s)
2263
2264 return [db[1] for db in dl if db[1] != "temp"]
2265
2266 def _format_schema(self, schema, table_name):
2267 if schema is not None:
2268 qschema = self.identifier_preparer.quote_identifier(schema)
2269 name = f"{qschema}.{table_name}"
2270 else:
2271 name = table_name
2272 return name
2273
2274 def _sqlite_main_query(
2275 self,
2276 table: str,
2277 type_: str,
2278 schema: Optional[str],
2279 sqlite_include_internal: bool,
2280 ):
2281 main = self._format_schema(schema, table)
2282 if not sqlite_include_internal:
2283 filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'"
2284 else:
2285 filter_table = ""
2286 query = (
2287 f"SELECT name FROM {main} "
2288 f"WHERE type='{type_}'{filter_table} "
2289 "ORDER BY name"
2290 )
2291 return query
2292
2293 @reflection.cache
2294 def get_table_names(
2295 self, connection, schema=None, sqlite_include_internal=False, **kw
2296 ):
2297 query = self._sqlite_main_query(
2298 "sqlite_master", "table", schema, sqlite_include_internal
2299 )
2300 names = connection.exec_driver_sql(query).scalars().all()
2301 return names
2302
2303 @reflection.cache
2304 def get_temp_table_names(
2305 self, connection, sqlite_include_internal=False, **kw
2306 ):
2307 query = self._sqlite_main_query(
2308 "sqlite_temp_master", "table", None, sqlite_include_internal
2309 )
2310 names = connection.exec_driver_sql(query).scalars().all()
2311 return names
2312
2313 @reflection.cache
2314 def get_temp_view_names(
2315 self, connection, sqlite_include_internal=False, **kw
2316 ):
2317 query = self._sqlite_main_query(
2318 "sqlite_temp_master", "view", None, sqlite_include_internal
2319 )
2320 names = connection.exec_driver_sql(query).scalars().all()
2321 return names
2322
2323 @reflection.cache
2324 def has_table(self, connection, table_name, schema=None, **kw):
2325 self._ensure_has_table_connection(connection)
2326
2327 if schema is not None and schema not in self.get_schema_names(
2328 connection, **kw
2329 ):
2330 return False
2331
2332 info = self._get_table_pragma(
2333 connection, "table_info", table_name, schema=schema
2334 )
2335 return bool(info)
2336
2337 def _get_default_schema_name(self, connection):
2338 return "main"
2339
2340 @reflection.cache
2341 def get_view_names(
2342 self, connection, schema=None, sqlite_include_internal=False, **kw
2343 ):
2344 query = self._sqlite_main_query(
2345 "sqlite_master", "view", schema, sqlite_include_internal
2346 )
2347 names = connection.exec_driver_sql(query).scalars().all()
2348 return names
2349
2350 @reflection.cache
2351 def get_view_definition(self, connection, view_name, schema=None, **kw):
2352 if schema is not None:
2353 qschema = self.identifier_preparer.quote_identifier(schema)
2354 master = f"{qschema}.sqlite_master"
2355 s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % (
2356 master,
2357 )
2358 rs = connection.exec_driver_sql(s, (view_name,))
2359 else:
2360 try:
2361 s = (
2362 "SELECT sql FROM "
2363 " (SELECT * FROM sqlite_master UNION ALL "
2364 " SELECT * FROM sqlite_temp_master) "
2365 "WHERE name = ? "
2366 "AND type='view'"
2367 )
2368 rs = connection.exec_driver_sql(s, (view_name,))
2369 except exc.DBAPIError:
2370 s = (
2371 "SELECT sql FROM sqlite_master WHERE name = ? "
2372 "AND type='view'"
2373 )
2374 rs = connection.exec_driver_sql(s, (view_name,))
2375
2376 result = rs.fetchall()
2377 if result:
2378 return result[0].sql
2379 else:
2380 raise exc.NoSuchTableError(
2381 f"{schema}.{view_name}" if schema else view_name
2382 )
2383
2384 @reflection.cache
2385 def get_columns(self, connection, table_name, schema=None, **kw):
2386 pragma = "table_info"
2387 # computed columns are threaded as hidden, they require table_xinfo
2388 if self.server_version_info >= (3, 31):
2389 pragma = "table_xinfo"
2390 info = self._get_table_pragma(
2391 connection, pragma, table_name, schema=schema
2392 )
2393 columns = []
2394 tablesql = None
2395 for row in info:
2396 name = row[1]
2397 type_ = row[2].upper()
2398 nullable = not row[3]
2399 default = row[4]
2400 primary_key = row[5]
2401 hidden = row[6] if pragma == "table_xinfo" else 0
2402
2403 # hidden has value 0 for normal columns, 1 for hidden columns,
2404 # 2 for computed virtual columns and 3 for computed stored columns
2405 # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b
2406 if hidden == 1:
2407 continue
2408
2409 generated = bool(hidden)
2410 persisted = hidden == 3
2411
2412 if tablesql is None and generated:
2413 tablesql = self._get_table_sql(
2414 connection, table_name, schema, **kw
2415 )
2416 # remove create table
2417 match = re.match(
2418 (
2419 r"create table .*?\((.*)\)"
2420 r"(?:\s*,?\s*(?:WITHOUT\s+ROWID|STRICT))*$"
2421 ),
2422 tablesql.strip(),
2423 re.DOTALL | re.IGNORECASE,
2424 )
2425 assert match, f"create table not found in {tablesql}"
2426 tablesql = match.group(1).strip()
2427
2428 columns.append(
2429 self._get_column_info(
2430 name,
2431 type_,
2432 nullable,
2433 default,
2434 primary_key,
2435 generated,
2436 persisted,
2437 tablesql,
2438 )
2439 )
2440 if columns:
2441 return columns
2442 elif not self.has_table(connection, table_name, schema):
2443 raise exc.NoSuchTableError(
2444 f"{schema}.{table_name}" if schema else table_name
2445 )
2446 else:
2447 return ReflectionDefaults.columns()
2448
2449 def _get_column_info(
2450 self,
2451 name,
2452 type_,
2453 nullable,
2454 default,
2455 primary_key,
2456 generated,
2457 persisted,
2458 tablesql,
2459 ):
2460 if generated:
2461 # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)"
2462 # somehow is "INTEGER GENERATED ALWAYS"
2463 type_ = re.sub("generated", "", type_, flags=re.IGNORECASE)
2464 type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip()
2465
2466 coltype = self._resolve_type_affinity(type_)
2467
2468 if default is not None:
2469 default = str(default)
2470
2471 colspec = {
2472 "name": name,
2473 "type": coltype,
2474 "nullable": nullable,
2475 "default": default,
2476 "primary_key": primary_key,
2477 }
2478 if generated:
2479 sqltext = ""
2480 if tablesql:
2481 pattern = (
2482 r"[^,]*\s+GENERATED\s+ALWAYS\s+AS"
2483 r"\s+\((.*)\)\s*(?:virtual|stored)?"
2484 )
2485 match = re.search(
2486 re.escape(name) + pattern, tablesql, re.IGNORECASE
2487 )
2488 if match:
2489 sqltext = match.group(1)
2490 colspec["computed"] = {"sqltext": sqltext, "persisted": persisted}
2491 return colspec
2492
2493 def _resolve_type_affinity(self, type_):
2494 """Return a data type from a reflected column, using affinity rules.
2495
2496 SQLite's goal for universal compatibility introduces some complexity
2497 during reflection, as a column's defined type might not actually be a
2498 type that SQLite understands - or indeed, my not be defined *at all*.
2499 Internally, SQLite handles this with a 'data type affinity' for each
2500 column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER',
2501 'REAL', or 'NONE' (raw bits). The algorithm that determines this is
2502 listed in https://www.sqlite.org/datatype3.html section 2.1.
2503
2504 This method allows SQLAlchemy to support that algorithm, while still
2505 providing access to smarter reflection utilities by recognizing
2506 column definitions that SQLite only supports through affinity (like
2507 DATE and DOUBLE).
2508
2509 """
2510 match = re.match(r"([\w ]+)(\(.*?\))?", type_)
2511 if match:
2512 coltype = match.group(1)
2513 args = match.group(2)
2514 else:
2515 coltype = ""
2516 args = ""
2517
2518 if coltype in self.ischema_names:
2519 coltype = self.ischema_names[coltype]
2520 elif "INT" in coltype:
2521 coltype = sqltypes.INTEGER
2522 elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype:
2523 coltype = sqltypes.TEXT
2524 elif "BLOB" in coltype or not coltype:
2525 coltype = sqltypes.NullType
2526 elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype:
2527 coltype = sqltypes.REAL
2528 else:
2529 coltype = sqltypes.NUMERIC
2530
2531 if args is not None:
2532 args = re.findall(r"(\d+)", args)
2533 try:
2534 coltype = coltype(*[int(a) for a in args])
2535 except TypeError:
2536 util.warn(
2537 "Could not instantiate type %s with "
2538 "reflected arguments %s; using no arguments."
2539 % (coltype, args)
2540 )
2541 coltype = coltype()
2542 else:
2543 coltype = coltype()
2544
2545 return coltype
2546
2547 @reflection.cache
2548 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
2549 constraint_name = None
2550 table_data = self._get_table_sql(connection, table_name, schema=schema)
2551 if table_data:
2552 PK_PATTERN = r'CONSTRAINT +(?:"(.+?)"|(\w+)) +PRIMARY KEY'
2553 result = re.search(PK_PATTERN, table_data, re.I)
2554 if result:
2555 constraint_name = result.group(1) or result.group(2)
2556 else:
2557 constraint_name = None
2558
2559 cols = self.get_columns(connection, table_name, schema, **kw)
2560 # consider only pk columns. This also avoids sorting the cached
2561 # value returned by get_columns
2562 cols = [col for col in cols if col.get("primary_key", 0) > 0]
2563 cols.sort(key=lambda col: col.get("primary_key"))
2564 pkeys = [col["name"] for col in cols]
2565
2566 if pkeys:
2567 return {"constrained_columns": pkeys, "name": constraint_name}
2568 else:
2569 return ReflectionDefaults.pk_constraint()
2570
2571 @reflection.cache
2572 def get_foreign_keys(self, connection, table_name, schema=None, **kw):
2573 # sqlite makes this *extremely difficult*.
2574 # First, use the pragma to get the actual FKs.
2575 pragma_fks = self._get_table_pragma(
2576 connection, "foreign_key_list", table_name, schema=schema
2577 )
2578
2579 fks = {}
2580
2581 for row in pragma_fks:
2582 numerical_id, rtbl, lcol, rcol = (row[0], row[2], row[3], row[4])
2583
2584 if not rcol:
2585 # no referred column, which means it was not named in the
2586 # original DDL. The referred columns of the foreign key
2587 # constraint are therefore the primary key of the referred
2588 # table.
2589 try:
2590 referred_pk = self.get_pk_constraint(
2591 connection, rtbl, schema=schema, **kw
2592 )
2593 referred_columns = referred_pk["constrained_columns"]
2594 except exc.NoSuchTableError:
2595 # ignore not existing parents
2596 referred_columns = []
2597 else:
2598 # note we use this list only if this is the first column
2599 # in the constraint. for subsequent columns we ignore the
2600 # list and append "rcol" if present.
2601 referred_columns = []
2602
2603 if self._broken_fk_pragma_quotes:
2604 rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl)
2605
2606 if numerical_id in fks:
2607 fk = fks[numerical_id]
2608 else:
2609 fk = fks[numerical_id] = {
2610 "name": None,
2611 "constrained_columns": [],
2612 "referred_schema": schema,
2613 "referred_table": rtbl,
2614 "referred_columns": referred_columns,
2615 "options": {},
2616 }
2617 fks[numerical_id] = fk
2618
2619 fk["constrained_columns"].append(lcol)
2620
2621 if rcol:
2622 fk["referred_columns"].append(rcol)
2623
2624 def fk_sig(constrained_columns, referred_table, referred_columns):
2625 return (
2626 tuple(constrained_columns)
2627 + (referred_table,)
2628 + tuple(referred_columns)
2629 )
2630
2631 # then, parse the actual SQL and attempt to find DDL that matches
2632 # the names as well. SQLite saves the DDL in whatever format
2633 # it was typed in as, so need to be liberal here.
2634
2635 keys_by_signature = {
2636 fk_sig(
2637 fk["constrained_columns"],
2638 fk["referred_table"],
2639 fk["referred_columns"],
2640 ): fk
2641 for fk in fks.values()
2642 }
2643
2644 table_data = self._get_table_sql(connection, table_name, schema=schema)
2645
2646 def parse_fks():
2647 if table_data is None:
2648 # system tables, etc.
2649 return
2650
2651 # note that we already have the FKs from PRAGMA above. This whole
2652 # regexp thing is trying to locate additional detail about the
2653 # FKs, namely the name of the constraint and other options.
2654 # so parsing the columns is really about matching it up to what
2655 # we already have.
2656 FK_PATTERN = (
2657 r'(?:CONSTRAINT +(?:"(.+?)"|(\w+)) +)?'
2658 r"FOREIGN KEY *\( *(.+?) *\) +"
2659 r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *' # noqa: E501
2660 r"((?:ON (?:DELETE|UPDATE) "
2661 r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)"
2662 r"((?:NOT +)?DEFERRABLE)?"
2663 r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?"
2664 )
2665 for match in re.finditer(FK_PATTERN, table_data, re.I):
2666 (
2667 constraint_quoted_name,
2668 constraint_name,
2669 constrained_columns,
2670 referred_quoted_name,
2671 referred_name,
2672 referred_columns,
2673 onupdatedelete,
2674 deferrable,
2675 initially,
2676 ) = match.group(1, 2, 3, 4, 5, 6, 7, 8, 9)
2677 constraint_name = constraint_quoted_name or constraint_name
2678 constrained_columns = list(
2679 self._find_cols_in_sig(constrained_columns)
2680 )
2681 if not referred_columns:
2682 referred_columns = constrained_columns
2683 else:
2684 referred_columns = list(
2685 self._find_cols_in_sig(referred_columns)
2686 )
2687 referred_name = referred_quoted_name or referred_name
2688 options = {}
2689
2690 for token in re.split(r" *\bON\b *", onupdatedelete.upper()):
2691 if token.startswith("DELETE"):
2692 ondelete = token[6:].strip()
2693 if ondelete and ondelete != "NO ACTION":
2694 options["ondelete"] = ondelete
2695 elif token.startswith("UPDATE"):
2696 onupdate = token[6:].strip()
2697 if onupdate and onupdate != "NO ACTION":
2698 options["onupdate"] = onupdate
2699
2700 if deferrable:
2701 options["deferrable"] = "NOT" not in deferrable.upper()
2702 if initially:
2703 options["initially"] = initially.upper()
2704
2705 yield (
2706 constraint_name,
2707 constrained_columns,
2708 referred_name,
2709 referred_columns,
2710 options,
2711 )
2712
2713 fkeys = []
2714
2715 for (
2716 constraint_name,
2717 constrained_columns,
2718 referred_name,
2719 referred_columns,
2720 options,
2721 ) in parse_fks():
2722 sig = fk_sig(constrained_columns, referred_name, referred_columns)
2723 if sig not in keys_by_signature:
2724 util.warn(
2725 "WARNING: SQL-parsed foreign key constraint "
2726 "'%s' could not be located in PRAGMA "
2727 "foreign_keys for table %s" % (sig, table_name)
2728 )
2729 continue
2730 key = keys_by_signature.pop(sig)
2731 key["name"] = constraint_name
2732 key["options"] = options
2733 fkeys.append(key)
2734 # assume the remainders are the unnamed, inline constraints, just
2735 # use them as is as it's extremely difficult to parse inline
2736 # constraints
2737 fkeys.extend(keys_by_signature.values())
2738 if fkeys:
2739 return fkeys
2740 else:
2741 return ReflectionDefaults.foreign_keys()
2742
2743 def _find_cols_in_sig(self, sig):
2744 for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I):
2745 yield match.group(1) or match.group(2)
2746
2747 @reflection.cache
2748 def get_unique_constraints(
2749 self, connection, table_name, schema=None, **kw
2750 ):
2751 auto_index_by_sig = {}
2752 for idx in self.get_indexes(
2753 connection,
2754 table_name,
2755 schema=schema,
2756 include_auto_indexes=True,
2757 **kw,
2758 ):
2759 if not idx["name"].startswith("sqlite_autoindex"):
2760 continue
2761 sig = tuple(idx["column_names"])
2762 auto_index_by_sig[sig] = idx
2763
2764 table_data = self._get_table_sql(
2765 connection, table_name, schema=schema, **kw
2766 )
2767 unique_constraints = []
2768
2769 def parse_uqs():
2770 if table_data is None:
2771 return
2772 UNIQUE_PATTERN = (
2773 r'(?:CONSTRAINT +(?:"(.+?)"|(\w+)) +)?UNIQUE *\((.+?)\)'
2774 )
2775 INLINE_UNIQUE_PATTERN = (
2776 r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?)[\t ]'
2777 r"+[a-z0-9_ ]+?[\t ]+UNIQUE"
2778 )
2779
2780 for match in re.finditer(UNIQUE_PATTERN, table_data, re.I):
2781 quoted_name, unquoted_name, cols = match.group(1, 2, 3)
2782 name = quoted_name or unquoted_name
2783 yield name, list(self._find_cols_in_sig(cols))
2784
2785 # we need to match inlines as well, as we seek to differentiate
2786 # a UNIQUE constraint from a UNIQUE INDEX, even though these
2787 # are kind of the same thing :)
2788 for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I):
2789 cols = list(
2790 self._find_cols_in_sig(match.group(1) or match.group(2))
2791 )
2792 yield None, cols
2793
2794 for name, cols in parse_uqs():
2795 sig = tuple(cols)
2796 if sig in auto_index_by_sig:
2797 auto_index_by_sig.pop(sig)
2798 parsed_constraint = {"name": name, "column_names": cols}
2799 unique_constraints.append(parsed_constraint)
2800 # NOTE: auto_index_by_sig might not be empty here,
2801 # the PRIMARY KEY may have an entry.
2802 if unique_constraints:
2803 return unique_constraints
2804 else:
2805 return ReflectionDefaults.unique_constraints()
2806
2807 @reflection.cache
2808 def get_check_constraints(self, connection, table_name, schema=None, **kw):
2809 table_data = self._get_table_sql(
2810 connection, table_name, schema=schema, **kw
2811 )
2812
2813 # Extract CHECK constraints by properly handling balanced parentheses
2814 # and avoiding false matches when CHECK/CONSTRAINT appear in table
2815 # names. See #12924 for context.
2816 #
2817 # SQLite supports 4 identifier quote styles (see
2818 # sqlite.org/lang_keywords.html):
2819 # - Double quotes "..." (standard SQL)
2820 # - Brackets [...] (MS Access/SQL Server compatibility)
2821 # - Backticks `...` (MySQL compatibility)
2822 # - Single quotes '...' (SQLite extension)
2823 #
2824 # NOTE: there is not currently a way to parse CHECK constraints that
2825 # contain newlines as the approach here relies upon each individual
2826 # CHECK constraint being on a single line by itself. This necessarily
2827 # makes assumptions as to how the CREATE TABLE was emitted.
2828 CHECK_PATTERN = re.compile(
2829 r"""
2830 (?<![A-Za-z0-9_]) # Negative lookbehind: ensure CHECK is not
2831 # part of an identifier (e.g., table name
2832 # like "tableCHECK")
2833
2834 (?: # Optional CONSTRAINT clause
2835 CONSTRAINT\s+
2836 ( # Group 1: Constraint name (quoted or unquoted)
2837 "(?:[^"]|"")+" # Double-quoted: "name" or "na""me"
2838 |'(?:[^']|'')+' # Single-quoted: 'name' or 'na''me'
2839 |\[(?:[^\]]|\]\])+\] # Bracket-quoted: [name] or [na]]me]
2840 |`(?:[^`]|``)+` # Backtick-quoted: `name` or `na``me`
2841 |\S+ # Unquoted: simple_name
2842 )
2843 \s+
2844 )?
2845
2846 CHECK\s*\( # CHECK keyword followed by opening paren
2847 """,
2848 re.VERBOSE | re.IGNORECASE,
2849 )
2850 cks = []
2851
2852 for match in re.finditer(CHECK_PATTERN, table_data or ""):
2853 constraint_name = match.group(1)
2854
2855 if constraint_name:
2856 # Remove surrounding quotes if present
2857 # Double quotes: "name" -> name
2858 # Single quotes: 'name' -> name
2859 # Brackets: [name] -> name
2860 # Backticks: `name` -> name
2861 constraint_name = re.sub(
2862 r'^(["\'`])(.+)\1$|^\[(.+)\]$',
2863 lambda m: m.group(2) or m.group(3),
2864 constraint_name,
2865 flags=re.DOTALL,
2866 )
2867
2868 # Find the matching closing parenthesis by counting balanced parens
2869 # Must track string context to ignore parens inside string literals
2870 start = match.end() # Position after 'CHECK ('
2871 paren_count = 1
2872 in_single_quote = False
2873 in_double_quote = False
2874
2875 for pos, char in enumerate(table_data[start:], start):
2876 # Track string literal context
2877 if char == "'" and not in_double_quote:
2878 in_single_quote = not in_single_quote
2879 elif char == '"' and not in_single_quote:
2880 in_double_quote = not in_double_quote
2881 # Only count parens when not inside a string literal
2882 elif not in_single_quote and not in_double_quote:
2883 if char == "(":
2884 paren_count += 1
2885 elif char == ")":
2886 paren_count -= 1
2887 if paren_count == 0:
2888 # Successfully found matching closing parenthesis
2889 sqltext = table_data[start:pos].strip()
2890 cks.append(
2891 {"sqltext": sqltext, "name": constraint_name}
2892 )
2893 break
2894
2895 cks.sort(key=lambda d: d["name"] or "~") # sort None as last
2896 if cks:
2897 return cks
2898 else:
2899 return ReflectionDefaults.check_constraints()
2900
2901 @reflection.cache
2902 def get_indexes(self, connection, table_name, schema=None, **kw):
2903 pragma_indexes = self._get_table_pragma(
2904 connection, "index_list", table_name, schema=schema
2905 )
2906 indexes = []
2907
2908 # regular expression to extract the filter predicate of a partial
2909 # index. this could fail to extract the predicate correctly on
2910 # indexes created like
2911 # CREATE INDEX i ON t (col || ') where') WHERE col <> ''
2912 # but as this function does not support expression-based indexes
2913 # this case does not occur.
2914 partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE)
2915
2916 if schema:
2917 schema_expr = "%s." % self.identifier_preparer.quote_identifier(
2918 schema
2919 )
2920 else:
2921 schema_expr = ""
2922
2923 include_auto_indexes = kw.pop("include_auto_indexes", False)
2924 for row in pragma_indexes:
2925 # ignore implicit primary key index.
2926 # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html
2927 if not include_auto_indexes and row[1].startswith(
2928 "sqlite_autoindex"
2929 ):
2930 continue
2931 indexes.append(
2932 dict(
2933 name=row[1],
2934 column_names=[],
2935 unique=row[2],
2936 dialect_options={},
2937 )
2938 )
2939
2940 # check partial indexes
2941 if len(row) >= 5 and row[4]:
2942 s = (
2943 "SELECT sql FROM %(schema)ssqlite_master "
2944 "WHERE name = ? "
2945 "AND type = 'index'" % {"schema": schema_expr}
2946 )
2947 rs = connection.exec_driver_sql(s, (row[1],))
2948 index_sql = rs.scalar()
2949 predicate_match = partial_pred_re.search(index_sql)
2950 if predicate_match is None:
2951 # unless the regex is broken this case shouldn't happen
2952 # because we know this is a partial index, so the
2953 # definition sql should match the regex
2954 util.warn(
2955 "Failed to look up filter predicate of "
2956 "partial index %s" % row[1]
2957 )
2958 else:
2959 predicate = predicate_match.group(1)
2960 indexes[-1]["dialect_options"]["sqlite_where"] = text(
2961 predicate
2962 )
2963
2964 # loop thru unique indexes to get the column names.
2965 for idx in list(indexes):
2966 pragma_index = self._get_table_pragma(
2967 connection, "index_info", idx["name"], schema=schema
2968 )
2969
2970 for row in pragma_index:
2971 if row[2] is None:
2972 util.warn(
2973 "Skipped unsupported reflection of "
2974 "expression-based index %s" % idx["name"]
2975 )
2976 indexes.remove(idx)
2977 break
2978 else:
2979 idx["column_names"].append(row[2])
2980
2981 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last
2982 if indexes:
2983 return indexes
2984 elif not self.has_table(connection, table_name, schema):
2985 raise exc.NoSuchTableError(
2986 f"{schema}.{table_name}" if schema else table_name
2987 )
2988 else:
2989 return ReflectionDefaults.indexes()
2990
2991 def _is_sys_table(self, table_name):
2992 return table_name in {
2993 "sqlite_schema",
2994 "sqlite_master",
2995 "sqlite_temp_schema",
2996 "sqlite_temp_master",
2997 }
2998
2999 @reflection.cache
3000 def _get_table_sql(self, connection, table_name, schema=None, **kw):
3001 if schema:
3002 schema_expr = "%s." % (
3003 self.identifier_preparer.quote_identifier(schema)
3004 )
3005 else:
3006 schema_expr = ""
3007 try:
3008 s = (
3009 "SELECT sql FROM "
3010 " (SELECT * FROM %(schema)ssqlite_master UNION ALL "
3011 " SELECT * FROM %(schema)ssqlite_temp_master) "
3012 "WHERE name = ? "
3013 "AND type in ('table', 'view')" % {"schema": schema_expr}
3014 )
3015 rs = connection.exec_driver_sql(s, (table_name,))
3016 except exc.DBAPIError:
3017 s = (
3018 "SELECT sql FROM %(schema)ssqlite_master "
3019 "WHERE name = ? "
3020 "AND type in ('table', 'view')" % {"schema": schema_expr}
3021 )
3022 rs = connection.exec_driver_sql(s, (table_name,))
3023 value = rs.scalar()
3024 if value is None and not self._is_sys_table(table_name):
3025 raise exc.NoSuchTableError(f"{schema_expr}{table_name}")
3026 return value
3027
3028 def _get_table_pragma(self, connection, pragma, table_name, schema=None):
3029 quote = self.identifier_preparer.quote_identifier
3030 if schema is not None:
3031 statements = [f"PRAGMA {quote(schema)}."]
3032 else:
3033 # because PRAGMA looks in all attached databases if no schema
3034 # given, need to specify "main" schema, however since we want
3035 # 'temp' tables in the same namespace as 'main', need to run
3036 # the PRAGMA twice
3037 statements = ["PRAGMA main.", "PRAGMA temp."]
3038
3039 qtable = quote(table_name)
3040 for statement in statements:
3041 statement = f"{statement}{pragma}({qtable})"
3042 cursor = connection.exec_driver_sql(statement)
3043 if not cursor._soft_closed:
3044 # work around SQLite issue whereby cursor.description
3045 # is blank when PRAGMA returns no rows:
3046 # https://www.sqlite.org/cvstrac/tktview?tn=1884
3047 result = cursor.fetchall()
3048 else:
3049 result = []
3050 if result:
3051 return result
3052 else:
3053 return []