1# dialects/sqlite/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10r'''
11.. dialect:: sqlite
12 :name: SQLite
13 :normal_support: 3.12+
14 :best_effort: 3.7.16+
15
16.. _sqlite_datetime:
17
18Date and Time Types
19-------------------
20
21SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does
22not provide out of the box functionality for translating values between Python
23`datetime` objects and a SQLite-supported format. SQLAlchemy's own
24:class:`~sqlalchemy.types.DateTime` and related types provide date formatting
25and parsing functionality when SQLite is used. The implementation classes are
26:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`.
27These types represent dates and times as ISO formatted strings, which also
28nicely support ordering. There's no reliance on typical "libc" internals for
29these functions so historical dates are fully supported.
30
31Ensuring Text affinity
32^^^^^^^^^^^^^^^^^^^^^^
33
34The DDL rendered for these types is the standard ``DATE``, ``TIME``
35and ``DATETIME`` indicators. However, custom storage formats can also be
36applied to these types. When the
37storage format is detected as containing no alpha characters, the DDL for
38these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``,
39so that the column continues to have textual affinity.
40
41.. seealso::
42
43 `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ -
44 in the SQLite documentation
45
46.. _sqlite_autoincrement:
47
48SQLite Auto Incrementing Behavior
49----------------------------------
50
51Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html
52
53Key concepts:
54
55* SQLite has an implicit "auto increment" feature that takes place for any
56 non-composite primary-key column that is specifically created using
57 "INTEGER PRIMARY KEY" for the type + primary key.
58
59* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not**
60 equivalent to the implicit autoincrement feature; this keyword is not
61 recommended for general use. SQLAlchemy does not render this keyword
62 unless a special SQLite-specific directive is used (see below). However,
63 it still requires that the column's type is named "INTEGER".
64
65Using the AUTOINCREMENT Keyword
66^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
67
68To specifically render the AUTOINCREMENT keyword on the primary key column
69when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table
70construct::
71
72 Table(
73 "sometable",
74 metadata,
75 Column("id", Integer, primary_key=True),
76 sqlite_autoincrement=True,
77 )
78
79Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER
80^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
81
82SQLite's typing model is based on naming conventions. Among other things, this
83means that any type name which contains the substring ``"INT"`` will be
84determined to be of "integer affinity". A type named ``"BIGINT"``,
85``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be
86of "integer" affinity. However, **the SQLite autoincrement feature, whether
87implicitly or explicitly enabled, requires that the name of the column's type
88is exactly the string "INTEGER"**. Therefore, if an application uses a type
89like :class:`.BigInteger` for a primary key, on SQLite this type will need to
90be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE
91TABLE`` statement in order for the autoincrement behavior to be available.
92
93One approach to achieve this is to use :class:`.Integer` on SQLite
94only using :meth:`.TypeEngine.with_variant`::
95
96 table = Table(
97 "my_table",
98 metadata,
99 Column(
100 "id",
101 BigInteger().with_variant(Integer, "sqlite"),
102 primary_key=True,
103 ),
104 )
105
106Another is to use a subclass of :class:`.BigInteger` that overrides its DDL
107name to be ``INTEGER`` when compiled against SQLite::
108
109 from sqlalchemy import BigInteger
110 from sqlalchemy.ext.compiler import compiles
111
112
113 class SLBigInteger(BigInteger):
114 pass
115
116
117 @compiles(SLBigInteger, "sqlite")
118 def bi_c(element, compiler, **kw):
119 return "INTEGER"
120
121
122 @compiles(SLBigInteger)
123 def bi_c(element, compiler, **kw):
124 return compiler.visit_BIGINT(element, **kw)
125
126
127 table = Table(
128 "my_table", metadata, Column("id", SLBigInteger(), primary_key=True)
129 )
130
131.. seealso::
132
133 :meth:`.TypeEngine.with_variant`
134
135 :ref:`sqlalchemy.ext.compiler_toplevel`
136
137 `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_
138
139.. _sqlite_transactions:
140
141Transactions with SQLite and the sqlite3 driver
142-----------------------------------------------
143
144As a file-based database, SQLite's approach to transactions differs from
145traditional databases in many ways. Additionally, the ``sqlite3`` driver
146standard with Python (as well as the async version ``aiosqlite`` which builds
147on top of it) has several quirks, workarounds, and API features in the
148area of transaction control, all of which generally need to be addressed when
149constructing a SQLAlchemy application that uses SQLite.
150
151Legacy Transaction Mode with the sqlite3 driver
152^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
153
154The most important aspect of transaction handling with the sqlite3 driver is
155that it defaults (which will continue through Python 3.15 before being
156removed in Python 3.16) to legacy transactional behavior which does
157not strictly follow :pep:`249`. The way in which the driver diverges from the
158PEP is that it does not "begin" a transaction automatically as dictated by
159:pep:`249` except in the case of DML statements, e.g. INSERT, UPDATE, and
160DELETE. Normally, :pep:`249` dictates that a BEGIN must be emitted upon
161the first SQL statement of any kind, so that all subsequent operations will
162be established within a transaction until ``connection.commit()`` has been
163called. The ``sqlite3`` driver, in an effort to be easier to use in
164highly concurrent environments, skips this step for DQL (e.g. SELECT) statements,
165and also skips it for DDL (e.g. CREATE TABLE etc.) statements for more legacy
166reasons. Statements such as SAVEPOINT are also skipped.
167
168In modern versions of the ``sqlite3`` driver as of Python 3.12, this legacy
169mode of operation is referred to as
170`"legacy transaction control" <https://docs.python.org/3/library/sqlite3.html#sqlite3-transaction-control-isolation-level>`_, and is in
171effect by default due to the ``Connection.autocommit`` parameter being set to
172the constant ``sqlite3.LEGACY_TRANSACTION_CONTROL``. Prior to Python 3.12,
173the ``Connection.autocommit`` attribute did not exist.
174
175The implications of legacy transaction mode include:
176
177* **Incorrect support for transactional DDL** - statements like CREATE TABLE, ALTER TABLE,
178 CREATE INDEX etc. will not automatically BEGIN a transaction if one were not
179 started already, leading to the changes by each statement being
180 "autocommitted" immediately unless BEGIN were otherwise emitted first. Very
181 old (pre Python 3.6) versions of SQLite would also force a COMMIT for these
182 operations even if a transaction were present, however this is no longer the
183 case.
184* **SERIALIZABLE behavior not fully functional** - SQLite's transaction isolation
185 behavior is normally consistent with SERIALIZABLE isolation, as it is a file-
186 based system that locks the database file entirely for write operations,
187 preventing COMMIT until all reader transactions (and associated file locks)
188 have completed. However, sqlite3's legacy transaction mode fails to emit BEGIN for SELECT
189 statements, which causes these SELECT statements to no longer be "repeatable",
190 failing one of the consistency guarantees of SERIALIZABLE.
191* **Incorrect behavior for SAVEPOINT** - as the SAVEPOINT statement does not
192 imply a BEGIN, a new SAVEPOINT emitted before a BEGIN will function on its
193 own but fails to participate in the enclosing transaction, meaning a ROLLBACK
194 of the transaction will not rollback elements that were part of a released
195 savepoint.
196
197Legacy transaction mode first existed in order to facilitate working around
198SQLite's file locks. Because SQLite relies upon whole-file locks, it is easy to
199get "database is locked" errors, particularly when newer features like "write
200ahead logging" are disabled. This is a key reason why ``sqlite3``'s legacy
201transaction mode is still the default mode of operation; disabling it will
202produce behavior that is more susceptible to locked database errors. However
203note that **legacy transaction mode will no longer be the default** in a future
204Python version (3.16 as of this writing).
205
206.. _sqlite_enabling_transactions:
207
208Enabling Non-Legacy SQLite Transactional Modes with the sqlite3 or aiosqlite driver
209^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
210
211Current SQLAlchemy support allows either for setting the
212``.Connection.autocommit`` attribute, most directly by using a
213:func:`._sa.create_engine` parameter, or if on an older version of Python where
214the attribute is not available, using event hooks to control the behavior of
215BEGIN.
216
217* **Enabling modern sqlite3 transaction control via the autocommit connect parameter** (Python 3.12 and above)
218
219 To use SQLite in the mode described at `Transaction control via the autocommit attribute <https://docs.python.org/3/library/sqlite3.html#transaction-control-via-the-autocommit-attribute>`_,
220 the most straightforward approach is to set the attribute to its recommended value
221 of ``False`` at the connect level using :paramref:`_sa.create_engine.connect_args``::
222
223 from sqlalchemy import create_engine
224
225 engine = create_engine(
226 "sqlite:///myfile.db", connect_args={"autocommit": False}
227 )
228
229 This parameter is also passed through when using the aiosqlite driver::
230
231 from sqlalchemy.ext.asyncio import create_async_engine
232
233 engine = create_async_engine(
234 "sqlite+aiosqlite:///myfile.db", connect_args={"autocommit": False}
235 )
236
237 The parameter can also be set at the attribute level using the :meth:`.PoolEvents.connect`
238 event hook, however this will only work for sqlite3, as aiosqlite does not yet expose this
239 attribute on its ``Connection`` object::
240
241 from sqlalchemy import create_engine, event
242
243 engine = create_engine("sqlite:///myfile.db")
244
245
246 @event.listens_for(engine, "connect")
247 def do_connect(dbapi_connection, connection_record):
248 # enable autocommit=False mode
249 dbapi_connection.autocommit = False
250
251* **Using SQLAlchemy to emit BEGIN in lieu of SQLite's transaction control** (all Python versions, sqlite3 and aiosqlite)
252
253 For older versions of ``sqlite3`` or for cross-compatibility with older and
254 newer versions, SQLAlchemy can also take over the job of transaction control.
255 This is achieved by using the :meth:`.ConnectionEvents.begin` hook
256 to emit the "BEGIN" command directly, while also disabling SQLite's control
257 of this command using the :meth:`.PoolEvents.connect` event hook to set the
258 ``Connection.isolation_level`` attribute to ``None``::
259
260
261 from sqlalchemy import create_engine, event
262
263 engine = create_engine("sqlite:///myfile.db")
264
265
266 @event.listens_for(engine, "connect")
267 def do_connect(dbapi_connection, connection_record):
268 # disable sqlite3's emitting of the BEGIN statement entirely.
269 dbapi_connection.isolation_level = None
270
271
272 @event.listens_for(engine, "begin")
273 def do_begin(conn):
274 # emit our own BEGIN. sqlite3 still emits COMMIT/ROLLBACK correctly
275 conn.exec_driver_sql("BEGIN")
276
277 When using the asyncio variant ``aiosqlite``, refer to ``engine.sync_engine``
278 as in the example below::
279
280 from sqlalchemy import create_engine, event
281 from sqlalchemy.ext.asyncio import create_async_engine
282
283 engine = create_async_engine("sqlite+aiosqlite:///myfile.db")
284
285
286 @event.listens_for(engine.sync_engine, "connect")
287 def do_connect(dbapi_connection, connection_record):
288 # disable aiosqlite's emitting of the BEGIN statement entirely.
289 dbapi_connection.isolation_level = None
290
291
292 @event.listens_for(engine.sync_engine, "begin")
293 def do_begin(conn):
294 # emit our own BEGIN. aiosqlite still emits COMMIT/ROLLBACK correctly
295 conn.exec_driver_sql("BEGIN")
296
297.. _sqlite_isolation_level:
298
299Using SQLAlchemy's Driver Level AUTOCOMMIT Feature with SQLite
300^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
301
302SQLAlchemy has a comprehensive database isolation feature with optional
303autocommit support that is introduced in the section :ref:`dbapi_autocommit`.
304
305For the ``sqlite3`` and ``aiosqlite`` drivers, SQLAlchemy only includes
306built-in support for "AUTOCOMMIT". Note that this mode is currently incompatible
307with the non-legacy isolation mode hooks documented in the previous
308section at :ref:`sqlite_enabling_transactions`.
309
310To use the ``sqlite3`` driver with SQLAlchemy driver-level autocommit,
311create an engine setting the :paramref:`_sa.create_engine.isolation_level`
312parameter to "AUTOCOMMIT"::
313
314 eng = create_engine("sqlite:///myfile.db", isolation_level="AUTOCOMMIT")
315
316When using the above mode, any event hooks that set the sqlite3 ``Connection.autocommit``
317parameter away from its default of ``sqlite3.LEGACY_TRANSACTION_CONTROL``
318as well as hooks that emit ``BEGIN`` should be disabled.
319
320Additional Reading for SQLite / sqlite3 transaction control
321^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
322
323Links with important information on SQLite, the sqlite3 driver,
324as well as long historical conversations on how things got to their current state:
325
326* `Isolation in SQLite <https://www.sqlite.org/isolation.html>`_ - on the SQLite website
327* `Transaction control <https://docs.python.org/3/library/sqlite3.html#transaction-control>`_ - describes the sqlite3 autocommit attribute as well
328 as the legacy isolation_level attribute.
329* `sqlite3 SELECT does not BEGIN a transaction, but should according to spec <https://github.com/python/cpython/issues/54133>`_ - imported Python standard library issue on github
330* `sqlite3 module breaks transactions and potentially corrupts data <https://github.com/python/cpython/issues/54949>`_ - imported Python standard library issue on github
331
332
333INSERT/UPDATE/DELETE...RETURNING
334---------------------------------
335
336The SQLite dialect supports SQLite 3.35's ``INSERT|UPDATE|DELETE..RETURNING``
337syntax. ``INSERT..RETURNING`` may be used
338automatically in some cases in order to fetch newly generated identifiers in
339place of the traditional approach of using ``cursor.lastrowid``, however
340``cursor.lastrowid`` is currently still preferred for simple single-statement
341cases for its better performance.
342
343To specify an explicit ``RETURNING`` clause, use the
344:meth:`._UpdateBase.returning` method on a per-statement basis::
345
346 # INSERT..RETURNING
347 result = connection.execute(
348 table.insert().values(name="foo").returning(table.c.col1, table.c.col2)
349 )
350 print(result.all())
351
352 # UPDATE..RETURNING
353 result = connection.execute(
354 table.update()
355 .where(table.c.name == "foo")
356 .values(name="bar")
357 .returning(table.c.col1, table.c.col2)
358 )
359 print(result.all())
360
361 # DELETE..RETURNING
362 result = connection.execute(
363 table.delete()
364 .where(table.c.name == "foo")
365 .returning(table.c.col1, table.c.col2)
366 )
367 print(result.all())
368
369.. versionadded:: 2.0 Added support for SQLite RETURNING
370
371
372.. _sqlite_foreign_keys:
373
374Foreign Key Support
375-------------------
376
377SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables,
378however by default these constraints have no effect on the operation of the
379table.
380
381Constraint checking on SQLite has three prerequisites:
382
383* At least version 3.6.19 of SQLite must be in use
384* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY
385 or SQLITE_OMIT_TRIGGER symbols enabled.
386* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all
387 connections before use -- including the initial call to
388 :meth:`sqlalchemy.schema.MetaData.create_all`.
389
390SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for
391new connections through the usage of events::
392
393 from sqlalchemy.engine import Engine
394 from sqlalchemy import event
395
396
397 @event.listens_for(Engine, "connect")
398 def set_sqlite_pragma(dbapi_connection, connection_record):
399 # the sqlite3 driver will not set PRAGMA foreign_keys
400 # if autocommit=False; set to True temporarily
401 ac = dbapi_connection.autocommit
402 dbapi_connection.autocommit = True
403
404 cursor = dbapi_connection.cursor()
405 cursor.execute("PRAGMA foreign_keys=ON")
406 cursor.close()
407
408 # restore previous autocommit setting
409 dbapi_connection.autocommit = ac
410
411.. warning::
412
413 When SQLite foreign keys are enabled, it is **not possible**
414 to emit CREATE or DROP statements for tables that contain
415 mutually-dependent foreign key constraints;
416 to emit the DDL for these tables requires that ALTER TABLE be used to
417 create or drop these constraints separately, for which SQLite has
418 no support.
419
420.. seealso::
421
422 `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_
423 - on the SQLite web site.
424
425 :ref:`event_toplevel` - SQLAlchemy event API.
426
427 :ref:`use_alter` - more information on SQLAlchemy's facilities for handling
428 mutually-dependent foreign key constraints.
429
430.. _sqlite_on_conflict_ddl:
431
432ON CONFLICT support for constraints
433-----------------------------------
434
435.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for
436 SQLite, which occurs within a CREATE TABLE statement. For "ON CONFLICT" as
437 applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`.
438
439SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied
440to primary key, unique, check, and not null constraints. In DDL, it is
441rendered either within the "CONSTRAINT" clause or within the column definition
442itself depending on the location of the target constraint. To render this
443clause within DDL, the extension parameter ``sqlite_on_conflict`` can be
444specified with a string conflict resolution algorithm within the
445:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`,
446:class:`.CheckConstraint` objects. Within the :class:`_schema.Column` object,
447there
448are individual parameters ``sqlite_on_conflict_not_null``,
449``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each
450correspond to the three types of relevant constraint types that can be
451indicated from a :class:`_schema.Column` object.
452
453.. seealso::
454
455 `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite
456 documentation
457
458The ``sqlite_on_conflict`` parameters accept a string argument which is just
459the resolution name to be chosen, which on SQLite can be one of ROLLBACK,
460ABORT, FAIL, IGNORE, and REPLACE. For example, to add a UNIQUE constraint
461that specifies the IGNORE algorithm::
462
463 some_table = Table(
464 "some_table",
465 metadata,
466 Column("id", Integer, primary_key=True),
467 Column("data", Integer),
468 UniqueConstraint("id", "data", sqlite_on_conflict="IGNORE"),
469 )
470
471The above renders CREATE TABLE DDL as:
472
473.. sourcecode:: sql
474
475 CREATE TABLE some_table (
476 id INTEGER NOT NULL,
477 data INTEGER,
478 PRIMARY KEY (id),
479 UNIQUE (id, data) ON CONFLICT IGNORE
480 )
481
482
483When using the :paramref:`_schema.Column.unique`
484flag to add a UNIQUE constraint
485to a single column, the ``sqlite_on_conflict_unique`` parameter can
486be added to the :class:`_schema.Column` as well, which will be added to the
487UNIQUE constraint in the DDL::
488
489 some_table = Table(
490 "some_table",
491 metadata,
492 Column("id", Integer, primary_key=True),
493 Column(
494 "data", Integer, unique=True, sqlite_on_conflict_unique="IGNORE"
495 ),
496 )
497
498rendering:
499
500.. sourcecode:: sql
501
502 CREATE TABLE some_table (
503 id INTEGER NOT NULL,
504 data INTEGER,
505 PRIMARY KEY (id),
506 UNIQUE (data) ON CONFLICT IGNORE
507 )
508
509To apply the FAIL algorithm for a NOT NULL constraint,
510``sqlite_on_conflict_not_null`` is used::
511
512 some_table = Table(
513 "some_table",
514 metadata,
515 Column("id", Integer, primary_key=True),
516 Column(
517 "data", Integer, nullable=False, sqlite_on_conflict_not_null="FAIL"
518 ),
519 )
520
521this renders the column inline ON CONFLICT phrase:
522
523.. sourcecode:: sql
524
525 CREATE TABLE some_table (
526 id INTEGER NOT NULL,
527 data INTEGER NOT NULL ON CONFLICT FAIL,
528 PRIMARY KEY (id)
529 )
530
531
532Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``::
533
534 some_table = Table(
535 "some_table",
536 metadata,
537 Column(
538 "id",
539 Integer,
540 primary_key=True,
541 sqlite_on_conflict_primary_key="FAIL",
542 ),
543 )
544
545SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict
546resolution algorithm is applied to the constraint itself:
547
548.. sourcecode:: sql
549
550 CREATE TABLE some_table (
551 id INTEGER NOT NULL,
552 PRIMARY KEY (id) ON CONFLICT FAIL
553 )
554
555.. _sqlite_on_conflict_insert:
556
557INSERT...ON CONFLICT (Upsert)
558-----------------------------
559
560.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for
561 SQLite, which occurs within an INSERT statement. For "ON CONFLICT" as
562 applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`.
563
564From version 3.24.0 onwards, SQLite supports "upserts" (update or insert)
565of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT``
566statement. A candidate row will only be inserted if that row does not violate
567any unique or primary key constraints. In the case of a unique constraint violation, a
568secondary action can occur which can be either "DO UPDATE", indicating that
569the data in the target row should be updated, or "DO NOTHING", which indicates
570to silently skip this row.
571
572Conflicts are determined using columns that are part of existing unique
573constraints and indexes. These constraints are identified by stating the
574columns and conditions that comprise the indexes.
575
576SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific
577:func:`_sqlite.insert()` function, which provides
578the generative methods :meth:`_sqlite.Insert.on_conflict_do_update`
579and :meth:`_sqlite.Insert.on_conflict_do_nothing`:
580
581.. sourcecode:: pycon+sql
582
583 >>> from sqlalchemy.dialects.sqlite import insert
584
585 >>> insert_stmt = insert(my_table).values(
586 ... id="some_existing_id", data="inserted value"
587 ... )
588
589 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
590 ... index_elements=["id"], set_=dict(data="updated value")
591 ... )
592
593 >>> print(do_update_stmt)
594 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
595 ON CONFLICT (id) DO UPDATE SET data = ?{stop}
596
597 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"])
598
599 >>> print(do_nothing_stmt)
600 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
601 ON CONFLICT (id) DO NOTHING
602
603.. versionadded:: 1.4
604
605.. seealso::
606
607 `Upsert
608 <https://sqlite.org/lang_UPSERT.html>`_
609 - in the SQLite documentation.
610
611
612Specifying the Target
613^^^^^^^^^^^^^^^^^^^^^
614
615Both methods supply the "target" of the conflict using column inference:
616
617* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument
618 specifies a sequence containing string column names, :class:`_schema.Column`
619 objects, and/or SQL expression elements, which would identify a unique index
620 or unique constraint.
621
622* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements`
623 to infer an index, a partial index can be inferred by also specifying the
624 :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter:
625
626 .. sourcecode:: pycon+sql
627
628 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data")
629
630 >>> do_update_stmt = stmt.on_conflict_do_update(
631 ... index_elements=[my_table.c.user_email],
632 ... index_where=my_table.c.user_email.like("%@gmail.com"),
633 ... set_=dict(data=stmt.excluded.data),
634 ... )
635
636 >>> print(do_update_stmt)
637 {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?)
638 ON CONFLICT (user_email)
639 WHERE user_email LIKE '%@gmail.com'
640 DO UPDATE SET data = excluded.data
641
642The SET Clause
643^^^^^^^^^^^^^^^
644
645``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
646existing row, using any combination of new values as well as values
647from the proposed insertion. These values are specified using the
648:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter. This
649parameter accepts a dictionary which consists of direct values
650for UPDATE:
651
652.. sourcecode:: pycon+sql
653
654 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
655
656 >>> do_update_stmt = stmt.on_conflict_do_update(
657 ... index_elements=["id"], set_=dict(data="updated value")
658 ... )
659
660 >>> print(do_update_stmt)
661 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
662 ON CONFLICT (id) DO UPDATE SET data = ?
663
664.. warning::
665
666 The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take
667 into account Python-side default UPDATE values or generation functions,
668 e.g. those specified using :paramref:`_schema.Column.onupdate`. These
669 values will not be exercised for an ON CONFLICT style of UPDATE, unless
670 they are manually specified in the
671 :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary.
672
673Updating using the Excluded INSERT Values
674^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
675
676In order to refer to the proposed insertion row, the special alias
677:attr:`~.sqlite.Insert.excluded` is available as an attribute on
678the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix
679on a column, that informs the DO UPDATE to update the row with the value that
680would have been inserted had the constraint not failed:
681
682.. sourcecode:: pycon+sql
683
684 >>> stmt = insert(my_table).values(
685 ... id="some_id", data="inserted value", author="jlh"
686 ... )
687
688 >>> do_update_stmt = stmt.on_conflict_do_update(
689 ... index_elements=["id"],
690 ... set_=dict(data="updated value", author=stmt.excluded.author),
691 ... )
692
693 >>> print(do_update_stmt)
694 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
695 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
696
697Additional WHERE Criteria
698^^^^^^^^^^^^^^^^^^^^^^^^^
699
700The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts
701a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where`
702parameter, which will limit those rows which receive an UPDATE:
703
704.. sourcecode:: pycon+sql
705
706 >>> stmt = insert(my_table).values(
707 ... id="some_id", data="inserted value", author="jlh"
708 ... )
709
710 >>> on_update_stmt = stmt.on_conflict_do_update(
711 ... index_elements=["id"],
712 ... set_=dict(data="updated value", author=stmt.excluded.author),
713 ... where=(my_table.c.status == 2),
714 ... )
715 >>> print(on_update_stmt)
716 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
717 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
718 WHERE my_table.status = ?
719
720
721Skipping Rows with DO NOTHING
722^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
723
724``ON CONFLICT`` may be used to skip inserting a row entirely
725if any conflict with a unique constraint occurs; below this is illustrated
726using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method:
727
728.. sourcecode:: pycon+sql
729
730 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
731 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
732 >>> print(stmt)
733 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING
734
735
736If ``DO NOTHING`` is used without specifying any columns or constraint,
737it has the effect of skipping the INSERT for any unique violation which
738occurs:
739
740.. sourcecode:: pycon+sql
741
742 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
743 >>> stmt = stmt.on_conflict_do_nothing()
744 >>> print(stmt)
745 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING
746
747.. _sqlite_type_reflection:
748
749Type Reflection
750---------------
751
752SQLite types are unlike those of most other database backends, in that
753the string name of the type usually does not correspond to a "type" in a
754one-to-one fashion. Instead, SQLite links per-column typing behavior
755to one of five so-called "type affinities" based on a string matching
756pattern for the type.
757
758SQLAlchemy's reflection process, when inspecting types, uses a simple
759lookup table to link the keywords returned to provided SQLAlchemy types.
760This lookup table is present within the SQLite dialect as it is for all
761other dialects. However, the SQLite dialect has a different "fallback"
762routine for when a particular type name is not located in the lookup map;
763it instead implements the SQLite "type affinity" scheme located at
764https://www.sqlite.org/datatype3.html section 2.1.
765
766The provided typemap will make direct associations from an exact string
767name match for the following types:
768
769:class:`_types.BIGINT`, :class:`_types.BLOB`,
770:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`,
771:class:`_types.CHAR`, :class:`_types.DATE`,
772:class:`_types.DATETIME`, :class:`_types.FLOAT`,
773:class:`_types.DECIMAL`, :class:`_types.FLOAT`,
774:class:`_types.INTEGER`, :class:`_types.INTEGER`,
775:class:`_types.NUMERIC`, :class:`_types.REAL`,
776:class:`_types.SMALLINT`, :class:`_types.TEXT`,
777:class:`_types.TIME`, :class:`_types.TIMESTAMP`,
778:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`,
779:class:`_types.NCHAR`
780
781When a type name does not match one of the above types, the "type affinity"
782lookup is used instead:
783
784* :class:`_types.INTEGER` is returned if the type name includes the
785 string ``INT``
786* :class:`_types.TEXT` is returned if the type name includes the
787 string ``CHAR``, ``CLOB`` or ``TEXT``
788* :class:`_types.NullType` is returned if the type name includes the
789 string ``BLOB``
790* :class:`_types.REAL` is returned if the type name includes the string
791 ``REAL``, ``FLOA`` or ``DOUB``.
792* Otherwise, the :class:`_types.NUMERIC` type is used.
793
794.. _sqlite_partial_index:
795
796Partial Indexes
797---------------
798
799A partial index, e.g. one which uses a WHERE clause, can be specified
800with the DDL system using the argument ``sqlite_where``::
801
802 tbl = Table("testtbl", m, Column("data", Integer))
803 idx = Index(
804 "test_idx1",
805 tbl.c.data,
806 sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10),
807 )
808
809The index will be rendered at create time as:
810
811.. sourcecode:: sql
812
813 CREATE INDEX test_idx1 ON testtbl (data)
814 WHERE data > 5 AND data < 10
815
816.. _sqlite_dotted_column_names:
817
818Dotted Column Names
819-------------------
820
821Using table or column names that explicitly have periods in them is
822**not recommended**. While this is generally a bad idea for relational
823databases in general, as the dot is a syntactically significant character,
824the SQLite driver up until version **3.10.0** of SQLite has a bug which
825requires that SQLAlchemy filter out these dots in result sets.
826
827The bug, entirely outside of SQLAlchemy, can be illustrated thusly::
828
829 import sqlite3
830
831 assert sqlite3.sqlite_version_info < (
832 3,
833 10,
834 0,
835 ), "bug is fixed in this version"
836
837 conn = sqlite3.connect(":memory:")
838 cursor = conn.cursor()
839
840 cursor.execute("create table x (a integer, b integer)")
841 cursor.execute("insert into x (a, b) values (1, 1)")
842 cursor.execute("insert into x (a, b) values (2, 2)")
843
844 cursor.execute("select x.a, x.b from x")
845 assert [c[0] for c in cursor.description] == ["a", "b"]
846
847 cursor.execute("""
848 select x.a, x.b from x where a=1
849 union
850 select x.a, x.b from x where a=2
851 """)
852 assert [c[0] for c in cursor.description] == ["a", "b"], [
853 c[0] for c in cursor.description
854 ]
855
856The second assertion fails:
857
858.. sourcecode:: text
859
860 Traceback (most recent call last):
861 File "test.py", line 19, in <module>
862 [c[0] for c in cursor.description]
863 AssertionError: ['x.a', 'x.b']
864
865Where above, the driver incorrectly reports the names of the columns
866including the name of the table, which is entirely inconsistent vs.
867when the UNION is not present.
868
869SQLAlchemy relies upon column names being predictable in how they match
870to the original statement, so the SQLAlchemy dialect has no choice but
871to filter these out::
872
873
874 from sqlalchemy import create_engine
875
876 eng = create_engine("sqlite://")
877 conn = eng.connect()
878
879 conn.exec_driver_sql("create table x (a integer, b integer)")
880 conn.exec_driver_sql("insert into x (a, b) values (1, 1)")
881 conn.exec_driver_sql("insert into x (a, b) values (2, 2)")
882
883 result = conn.exec_driver_sql("select x.a, x.b from x")
884 assert result.keys() == ["a", "b"]
885
886 result = conn.exec_driver_sql("""
887 select x.a, x.b from x where a=1
888 union
889 select x.a, x.b from x where a=2
890 """)
891 assert result.keys() == ["a", "b"]
892
893Note that above, even though SQLAlchemy filters out the dots, *both
894names are still addressable*::
895
896 >>> row = result.first()
897 >>> row["a"]
898 1
899 >>> row["x.a"]
900 1
901 >>> row["b"]
902 1
903 >>> row["x.b"]
904 1
905
906Therefore, the workaround applied by SQLAlchemy only impacts
907:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In
908the very specific case where an application is forced to use column names that
909contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and
910:meth:`.Row.keys()` is required to return these dotted names unmodified,
911the ``sqlite_raw_colnames`` execution option may be provided, either on a
912per-:class:`_engine.Connection` basis::
913
914 result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql(
915 """
916 select x.a, x.b from x where a=1
917 union
918 select x.a, x.b from x where a=2
919 """
920 )
921 assert result.keys() == ["x.a", "x.b"]
922
923or on a per-:class:`_engine.Engine` basis::
924
925 engine = create_engine(
926 "sqlite://", execution_options={"sqlite_raw_colnames": True}
927 )
928
929When using the per-:class:`_engine.Engine` execution option, note that
930**Core and ORM queries that use UNION may not function properly**.
931
932SQLite-specific table options
933-----------------------------
934
935One option for CREATE TABLE is supported directly by the SQLite
936dialect in conjunction with the :class:`_schema.Table` construct:
937
938* ``WITHOUT ROWID``::
939
940 Table("some_table", metadata, ..., sqlite_with_rowid=False)
941
942*
943 ``STRICT``::
944
945 Table("some_table", metadata, ..., sqlite_strict=True)
946
947 .. versionadded:: 2.0.37
948
949.. seealso::
950
951 `SQLite CREATE TABLE options
952 <https://www.sqlite.org/lang_createtable.html>`_
953
954.. _sqlite_include_internal:
955
956Reflecting internal schema tables
957----------------------------------
958
959Reflection methods that return lists of tables will omit so-called
960"SQLite internal schema object" names, which are considered by SQLite
961as any object name that is prefixed with ``sqlite_``. An example of
962such an object is the ``sqlite_sequence`` table that's generated when
963the ``AUTOINCREMENT`` column parameter is used. In order to return
964these objects, the parameter ``sqlite_include_internal=True`` may be
965passed to methods such as :meth:`_schema.MetaData.reflect` or
966:meth:`.Inspector.get_table_names`.
967
968.. versionadded:: 2.0 Added the ``sqlite_include_internal=True`` parameter.
969 Previously, these tables were not ignored by SQLAlchemy reflection
970 methods.
971
972.. note::
973
974 The ``sqlite_include_internal`` parameter does not refer to the
975 "system" tables that are present in schemas such as ``sqlite_master``.
976
977.. seealso::
978
979 `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite
980 documentation.
981
982''' # noqa
983
984from __future__ import annotations
985
986import datetime
987import numbers
988import re
989from typing import Any
990from typing import Callable
991from typing import Optional
992from typing import TYPE_CHECKING
993
994from .json import JSON
995from .json import JSONB
996from .json import JSONIndexType
997from .json import JSONPathType
998from ... import exc
999from ... import schema as sa_schema
1000from ... import sql
1001from ... import text
1002from ... import types as sqltypes
1003from ... import util
1004from ...engine import default
1005from ...engine import processors
1006from ...engine import reflection
1007from ...engine.reflection import ReflectionDefaults
1008from ...sql import coercions
1009from ...sql import compiler
1010from ...sql import ddl as sa_ddl
1011from ...sql import elements
1012from ...sql import roles
1013from ...sql import schema
1014from ...types import BLOB # noqa
1015from ...types import BOOLEAN # noqa
1016from ...types import CHAR # noqa
1017from ...types import DECIMAL # noqa
1018from ...types import FLOAT # noqa
1019from ...types import INTEGER # noqa
1020from ...types import NUMERIC # noqa
1021from ...types import REAL # noqa
1022from ...types import SMALLINT # noqa
1023from ...types import TEXT # noqa
1024from ...types import TIMESTAMP # noqa
1025from ...types import VARCHAR # noqa
1026
1027if TYPE_CHECKING:
1028 from ...engine.interfaces import DBAPIConnection
1029 from ...engine.interfaces import Dialect
1030 from ...engine.interfaces import IsolationLevel
1031 from ...sql.sqltypes import _JSON_VALUE
1032 from ...sql.type_api import _BindProcessorType
1033 from ...sql.type_api import _ResultProcessorType
1034
1035
1036class _SQliteJson(JSON):
1037 def result_processor(self, dialect, coltype):
1038 default_processor = super().result_processor(dialect, coltype)
1039
1040 def process(value):
1041 try:
1042 return default_processor(value)
1043 except TypeError:
1044 if isinstance(value, numbers.Number):
1045 return value
1046 else:
1047 raise
1048
1049 return process
1050
1051
1052class _DateTimeMixin:
1053 _reg = None
1054 _storage_format = None
1055
1056 def __init__(self, storage_format=None, regexp=None, **kw):
1057 super().__init__(**kw)
1058 if regexp is not None:
1059 self._reg = re.compile(regexp)
1060 if storage_format is not None:
1061 self._storage_format = storage_format
1062
1063 @property
1064 def format_is_text_affinity(self):
1065 """return True if the storage format will automatically imply
1066 a TEXT affinity.
1067
1068 If the storage format contains no non-numeric characters,
1069 it will imply a NUMERIC storage format on SQLite; in this case,
1070 the type will generate its DDL as DATE_CHAR, DATETIME_CHAR,
1071 TIME_CHAR.
1072
1073 """
1074 spec = self._storage_format % {
1075 "year": 0,
1076 "month": 0,
1077 "day": 0,
1078 "hour": 0,
1079 "minute": 0,
1080 "second": 0,
1081 "microsecond": 0,
1082 }
1083 return bool(re.search(r"[^0-9]", spec))
1084
1085 def adapt(self, cls, **kw):
1086 if issubclass(cls, _DateTimeMixin):
1087 if self._storage_format:
1088 kw["storage_format"] = self._storage_format
1089 if self._reg:
1090 kw["regexp"] = self._reg
1091 return super().adapt(cls, **kw)
1092
1093 def literal_processor(self, dialect):
1094 bp = self.bind_processor(dialect)
1095
1096 def process(value):
1097 return "'%s'" % bp(value)
1098
1099 return process
1100
1101
1102class DATETIME(_DateTimeMixin, sqltypes.DateTime):
1103 r"""Represent a Python datetime object in SQLite using a string.
1104
1105 The default string storage format is::
1106
1107 "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1108
1109 e.g.:
1110
1111 .. sourcecode:: text
1112
1113 2021-03-15 12:05:57.105542
1114
1115 The incoming storage format is by default parsed using the
1116 Python ``datetime.fromisoformat()`` function.
1117
1118 .. versionchanged:: 2.0 ``datetime.fromisoformat()`` is used for default
1119 datetime string parsing.
1120
1121 The storage format can be customized to some degree using the
1122 ``storage_format`` and ``regexp`` parameters, such as::
1123
1124 import re
1125 from sqlalchemy.dialects.sqlite import DATETIME
1126
1127 dt = DATETIME(
1128 storage_format=(
1129 "%(year)04d/%(month)02d/%(day)02d %(hour)02d:%(minute)02d:%(second)02d"
1130 ),
1131 regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)",
1132 )
1133
1134 :param truncate_microseconds: when ``True`` microseconds will be truncated
1135 from the datetime. Can't be specified together with ``storage_format``
1136 or ``regexp``.
1137
1138 :param storage_format: format string which will be applied to the dict
1139 with keys year, month, day, hour, minute, second, and microsecond.
1140
1141 :param regexp: regular expression which will be applied to incoming result
1142 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming
1143 strings. If the regexp contains named groups, the resulting match dict is
1144 applied to the Python datetime() constructor as keyword arguments.
1145 Otherwise, if positional groups are used, the datetime() constructor
1146 is called with positional arguments via
1147 ``*map(int, match_obj.groups(0))``.
1148
1149 """ # noqa
1150
1151 _storage_format = (
1152 "%(year)04d-%(month)02d-%(day)02d "
1153 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1154 )
1155
1156 def __init__(self, *args, **kwargs):
1157 truncate_microseconds = kwargs.pop("truncate_microseconds", False)
1158 super().__init__(*args, **kwargs)
1159 if truncate_microseconds:
1160 assert "storage_format" not in kwargs, (
1161 "You can specify only "
1162 "one of truncate_microseconds or storage_format."
1163 )
1164 assert "regexp" not in kwargs, (
1165 "You can specify only one of "
1166 "truncate_microseconds or regexp."
1167 )
1168 self._storage_format = (
1169 "%(year)04d-%(month)02d-%(day)02d "
1170 "%(hour)02d:%(minute)02d:%(second)02d"
1171 )
1172
1173 def bind_processor(
1174 self, dialect: Dialect
1175 ) -> Optional[_BindProcessorType[Any]]:
1176 datetime_datetime = datetime.datetime
1177 datetime_date = datetime.date
1178 format_ = self._storage_format
1179
1180 def process(value):
1181 if value is None:
1182 return None
1183 elif isinstance(value, datetime_datetime):
1184 return format_ % {
1185 "year": value.year,
1186 "month": value.month,
1187 "day": value.day,
1188 "hour": value.hour,
1189 "minute": value.minute,
1190 "second": value.second,
1191 "microsecond": value.microsecond,
1192 }
1193 elif isinstance(value, datetime_date):
1194 return format_ % {
1195 "year": value.year,
1196 "month": value.month,
1197 "day": value.day,
1198 "hour": 0,
1199 "minute": 0,
1200 "second": 0,
1201 "microsecond": 0,
1202 }
1203 else:
1204 raise TypeError(
1205 "SQLite DateTime type only accepts Python "
1206 "datetime and date objects as input."
1207 )
1208
1209 return process
1210
1211 def result_processor(
1212 self, dialect: Dialect, coltype: object
1213 ) -> Optional[_ResultProcessorType[Any]]:
1214 if self._reg:
1215 return processors.str_to_datetime_processor_factory(
1216 self._reg, datetime.datetime
1217 )
1218 else:
1219 return processors.str_to_datetime
1220
1221
1222class DATE(_DateTimeMixin, sqltypes.Date):
1223 r"""Represent a Python date object in SQLite using a string.
1224
1225 The default string storage format is::
1226
1227 "%(year)04d-%(month)02d-%(day)02d"
1228
1229 e.g.:
1230
1231 .. sourcecode:: text
1232
1233 2011-03-15
1234
1235 The incoming storage format is by default parsed using the
1236 Python ``date.fromisoformat()`` function.
1237
1238 .. versionchanged:: 2.0 ``date.fromisoformat()`` is used for default
1239 date string parsing.
1240
1241
1242 The storage format can be customized to some degree using the
1243 ``storage_format`` and ``regexp`` parameters, such as::
1244
1245 import re
1246 from sqlalchemy.dialects.sqlite import DATE
1247
1248 d = DATE(
1249 storage_format="%(month)02d/%(day)02d/%(year)04d",
1250 regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)"),
1251 )
1252
1253 :param storage_format: format string which will be applied to the
1254 dict with keys year, month, and day.
1255
1256 :param regexp: regular expression which will be applied to
1257 incoming result rows, replacing the use of ``date.fromisoformat()`` to
1258 parse incoming strings. If the regexp contains named groups, the resulting
1259 match dict is applied to the Python date() constructor as keyword
1260 arguments. Otherwise, if positional groups are used, the date()
1261 constructor is called with positional arguments via
1262 ``*map(int, match_obj.groups(0))``.
1263
1264 """
1265
1266 _storage_format = "%(year)04d-%(month)02d-%(day)02d"
1267
1268 def bind_processor(
1269 self, dialect: Dialect
1270 ) -> Optional[_BindProcessorType[Any]]:
1271 datetime_date = datetime.date
1272 format_ = self._storage_format
1273
1274 def process(value):
1275 if value is None:
1276 return None
1277 elif isinstance(value, datetime_date):
1278 return format_ % {
1279 "year": value.year,
1280 "month": value.month,
1281 "day": value.day,
1282 }
1283 else:
1284 raise TypeError(
1285 "SQLite Date type only accepts Python "
1286 "date objects as input."
1287 )
1288
1289 return process
1290
1291 def result_processor(
1292 self, dialect: Dialect, coltype: object
1293 ) -> Optional[_ResultProcessorType[Any]]:
1294 if self._reg:
1295 return processors.str_to_datetime_processor_factory(
1296 self._reg, datetime.date
1297 )
1298 else:
1299 return processors.str_to_date
1300
1301
1302class TIME(_DateTimeMixin, sqltypes.Time):
1303 r"""Represent a Python time object in SQLite using a string.
1304
1305 The default string storage format is::
1306
1307 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1308
1309 e.g.:
1310
1311 .. sourcecode:: text
1312
1313 12:05:57.10558
1314
1315 The incoming storage format is by default parsed using the
1316 Python ``time.fromisoformat()`` function.
1317
1318 .. versionchanged:: 2.0 ``time.fromisoformat()`` is used for default
1319 time string parsing.
1320
1321 The storage format can be customized to some degree using the
1322 ``storage_format`` and ``regexp`` parameters, such as::
1323
1324 import re
1325 from sqlalchemy.dialects.sqlite import TIME
1326
1327 t = TIME(
1328 storage_format="%(hour)02d-%(minute)02d-%(second)02d-%(microsecond)06d",
1329 regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?"),
1330 )
1331
1332 :param truncate_microseconds: when ``True`` microseconds will be truncated
1333 from the time. Can't be specified together with ``storage_format``
1334 or ``regexp``.
1335
1336 :param storage_format: format string which will be applied to the dict
1337 with keys hour, minute, second, and microsecond.
1338
1339 :param regexp: regular expression which will be applied to incoming result
1340 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming
1341 strings. If the regexp contains named groups, the resulting match dict is
1342 applied to the Python time() constructor as keyword arguments. Otherwise,
1343 if positional groups are used, the time() constructor is called with
1344 positional arguments via ``*map(int, match_obj.groups(0))``.
1345
1346 """
1347
1348 _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1349
1350 def __init__(self, *args, **kwargs):
1351 truncate_microseconds = kwargs.pop("truncate_microseconds", False)
1352 super().__init__(*args, **kwargs)
1353 if truncate_microseconds:
1354 assert "storage_format" not in kwargs, (
1355 "You can specify only "
1356 "one of truncate_microseconds or storage_format."
1357 )
1358 assert "regexp" not in kwargs, (
1359 "You can specify only one of "
1360 "truncate_microseconds or regexp."
1361 )
1362 self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d"
1363
1364 def bind_processor(self, dialect):
1365 datetime_time = datetime.time
1366 format_ = self._storage_format
1367
1368 def process(value):
1369 if value is None:
1370 return None
1371 elif isinstance(value, datetime_time):
1372 return format_ % {
1373 "hour": value.hour,
1374 "minute": value.minute,
1375 "second": value.second,
1376 "microsecond": value.microsecond,
1377 }
1378 else:
1379 raise TypeError(
1380 "SQLite Time type only accepts Python "
1381 "time objects as input."
1382 )
1383
1384 return process
1385
1386 def result_processor(self, dialect, coltype):
1387 if self._reg:
1388 return processors.str_to_datetime_processor_factory(
1389 self._reg, datetime.time
1390 )
1391 else:
1392 return processors.str_to_time
1393
1394
1395colspecs = {
1396 sqltypes.Date: DATE,
1397 sqltypes.DateTime: DATETIME,
1398 sqltypes.JSON: _SQliteJson,
1399 sqltypes.JSON.JSONIndexType: JSONIndexType,
1400 sqltypes.JSON.JSONPathType: JSONPathType,
1401 sqltypes.Time: TIME,
1402 JSONB: JSONB,
1403}
1404
1405ischema_names = {
1406 "BIGINT": sqltypes.BIGINT,
1407 "BLOB": sqltypes.BLOB,
1408 "BOOL": sqltypes.BOOLEAN,
1409 "BOOLEAN": sqltypes.BOOLEAN,
1410 "CHAR": sqltypes.CHAR,
1411 "DATE": sqltypes.DATE,
1412 "DATE_CHAR": sqltypes.DATE,
1413 "DATETIME": sqltypes.DATETIME,
1414 "DATETIME_CHAR": sqltypes.DATETIME,
1415 "DOUBLE": sqltypes.DOUBLE,
1416 "DECIMAL": sqltypes.DECIMAL,
1417 "FLOAT": sqltypes.FLOAT,
1418 "INT": sqltypes.INTEGER,
1419 "INTEGER": sqltypes.INTEGER,
1420 "JSON": JSON,
1421 "JSONB": JSONB,
1422 "NUMERIC": sqltypes.NUMERIC,
1423 "REAL": sqltypes.REAL,
1424 "SMALLINT": sqltypes.SMALLINT,
1425 "TEXT": sqltypes.TEXT,
1426 "TIME": sqltypes.TIME,
1427 "TIME_CHAR": sqltypes.TIME,
1428 "TIMESTAMP": sqltypes.TIMESTAMP,
1429 "VARCHAR": sqltypes.VARCHAR,
1430 "NVARCHAR": sqltypes.NVARCHAR,
1431 "NCHAR": sqltypes.NCHAR,
1432}
1433
1434
1435class SQLiteCompiler(compiler.SQLCompiler):
1436 extract_map = util.update_copy(
1437 compiler.SQLCompiler.extract_map,
1438 {
1439 "month": "%m",
1440 "day": "%d",
1441 "year": "%Y",
1442 "second": "%S",
1443 "hour": "%H",
1444 "doy": "%j",
1445 "minute": "%M",
1446 "epoch": "%s",
1447 "dow": "%w",
1448 "week": "%W",
1449 },
1450 )
1451
1452 def visit_truediv_binary(self, binary, operator, **kw):
1453 return (
1454 self.process(binary.left, **kw)
1455 + " / "
1456 + "(%s + 0.0)" % self.process(binary.right, **kw)
1457 )
1458
1459 def visit_now_func(self, fn, **kw):
1460 return "CURRENT_TIMESTAMP"
1461
1462 def visit_localtimestamp_func(self, func, **kw):
1463 return "DATETIME(CURRENT_TIMESTAMP, 'localtime')"
1464
1465 def visit_true(self, expr, **kw):
1466 return "1"
1467
1468 def visit_false(self, expr, **kw):
1469 return "0"
1470
1471 def visit_char_length_func(self, fn, **kw):
1472 return "length%s" % self.function_argspec(fn)
1473
1474 def visit_aggregate_strings_func(self, fn, **kw):
1475 return super().visit_aggregate_strings_func(
1476 fn, use_function_name="group_concat", **kw
1477 )
1478
1479 def visit_cast(self, cast, **kwargs):
1480 if self.dialect.supports_cast:
1481 return super().visit_cast(cast, **kwargs)
1482 else:
1483 return self.process(cast.clause, **kwargs)
1484
1485 def visit_extract(self, extract, **kw):
1486 try:
1487 return "CAST(STRFTIME('%s', %s) AS INTEGER)" % (
1488 self.extract_map[extract.field],
1489 self.process(extract.expr, **kw),
1490 )
1491 except KeyError as err:
1492 raise exc.CompileError(
1493 "%s is not a valid extract argument." % extract.field
1494 ) from err
1495
1496 def returning_clause(
1497 self,
1498 stmt,
1499 returning_cols,
1500 *,
1501 populate_result_map,
1502 **kw,
1503 ):
1504 kw["include_table"] = False
1505 return super().returning_clause(
1506 stmt, returning_cols, populate_result_map=populate_result_map, **kw
1507 )
1508
1509 def limit_clause(self, select, **kw):
1510 text = ""
1511 if select._limit_clause is not None:
1512 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
1513 if select._offset_clause is not None:
1514 if select._limit_clause is None:
1515 text += "\n LIMIT " + self.process(sql.literal(-1))
1516 text += " OFFSET " + self.process(select._offset_clause, **kw)
1517 else:
1518 text += " OFFSET " + self.process(sql.literal(0), **kw)
1519 return text
1520
1521 def for_update_clause(self, select, **kw):
1522 # sqlite has no "FOR UPDATE" AFAICT
1523 return ""
1524
1525 def update_from_clause(
1526 self, update_stmt, from_table, extra_froms, from_hints, **kw
1527 ):
1528 kw["asfrom"] = True
1529 return "FROM " + ", ".join(
1530 t._compiler_dispatch(self, fromhints=from_hints, **kw)
1531 for t in extra_froms
1532 )
1533
1534 def visit_is_distinct_from_binary(self, binary, operator, **kw):
1535 return "%s IS NOT %s" % (
1536 self.process(binary.left),
1537 self.process(binary.right),
1538 )
1539
1540 def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
1541 return "%s IS %s" % (
1542 self.process(binary.left),
1543 self.process(binary.right),
1544 )
1545
1546 def visit_json_getitem_op_binary(
1547 self, binary, operator, _cast_applied=False, **kw
1548 ):
1549 if (
1550 not _cast_applied
1551 and binary.type._type_affinity is not sqltypes.JSON
1552 ):
1553 kw["_cast_applied"] = True
1554 return self.process(sql.cast(binary, binary.type), **kw)
1555
1556 if binary.type._type_affinity is sqltypes.JSON:
1557 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
1558 else:
1559 expr = "JSON_EXTRACT(%s, %s)"
1560
1561 return expr % (
1562 self.process(binary.left, **kw),
1563 self.process(binary.right, **kw),
1564 )
1565
1566 def visit_json_path_getitem_op_binary(
1567 self, binary, operator, _cast_applied=False, **kw
1568 ):
1569 if (
1570 not _cast_applied
1571 and binary.type._type_affinity is not sqltypes.JSON
1572 ):
1573 kw["_cast_applied"] = True
1574 return self.process(sql.cast(binary, binary.type), **kw)
1575
1576 if binary.type._type_affinity is sqltypes.JSON:
1577 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
1578 else:
1579 expr = "JSON_EXTRACT(%s, %s)"
1580
1581 return expr % (
1582 self.process(binary.left, **kw),
1583 self.process(binary.right, **kw),
1584 )
1585
1586 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
1587 # slightly old SQLite versions don't seem to be able to handle
1588 # the empty set impl
1589 return self.visit_empty_set_expr(type_)
1590
1591 def visit_empty_set_expr(self, element_types, **kw):
1592 return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % (
1593 ", ".join("1" for type_ in element_types or [INTEGER()]),
1594 ", ".join("1" for type_ in element_types or [INTEGER()]),
1595 )
1596
1597 def visit_regexp_match_op_binary(self, binary, operator, **kw):
1598 return self._generate_generic_binary(binary, " REGEXP ", **kw)
1599
1600 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
1601 return self._generate_generic_binary(binary, " NOT REGEXP ", **kw)
1602
1603 def _on_conflict_target(self, clause, **kw):
1604 if clause.inferred_target_elements is not None:
1605 target_text = "(%s)" % ", ".join(
1606 (
1607 self.preparer.quote(c)
1608 if isinstance(c, str)
1609 else self.process(c, include_table=False, use_schema=False)
1610 )
1611 for c in clause.inferred_target_elements
1612 )
1613 if clause.inferred_target_whereclause is not None:
1614 whereclause_kw = dict(kw)
1615 whereclause_kw.update(
1616 include_table=False,
1617 use_schema=False,
1618 literal_execute=True,
1619 )
1620 target_text += " WHERE %s" % self.process(
1621 clause.inferred_target_whereclause,
1622 **whereclause_kw,
1623 )
1624
1625 else:
1626 target_text = ""
1627
1628 return target_text
1629
1630 def visit_on_conflict_do_nothing(self, on_conflict, **kw):
1631 target_text = self._on_conflict_target(on_conflict, **kw)
1632
1633 if target_text:
1634 return "ON CONFLICT %s DO NOTHING" % target_text
1635 else:
1636 return "ON CONFLICT DO NOTHING"
1637
1638 def visit_on_conflict_do_update(self, on_conflict, **kw):
1639 clause = on_conflict
1640
1641 target_text = self._on_conflict_target(on_conflict, **kw)
1642
1643 action_set_ops = []
1644
1645 set_parameters = dict(clause.update_values_to_set)
1646 # create a list of column assignment clauses as tuples
1647
1648 insert_statement = self.stack[-1]["selectable"]
1649 cols = insert_statement.table.c
1650 set_kw = dict(kw)
1651 set_kw.update(use_schema=False)
1652 for c in cols:
1653 col_key = c.key
1654
1655 if col_key in set_parameters:
1656 value = set_parameters.pop(col_key)
1657 elif c in set_parameters:
1658 value = set_parameters.pop(c)
1659 else:
1660 continue
1661
1662 if (
1663 isinstance(value, elements.BindParameter)
1664 and value.type._isnull
1665 ):
1666 value = value._with_binary_element_type(c.type)
1667
1668 value_text = self.process(
1669 value.self_group(), is_upsert_set=True, **set_kw
1670 )
1671
1672 key_text = self.preparer.quote(c.name)
1673 action_set_ops.append("%s = %s" % (key_text, value_text))
1674
1675 # check for names that don't match columns
1676 if set_parameters:
1677 util.warn(
1678 "Additional column names not matching "
1679 "any column keys in table '%s': %s"
1680 % (
1681 self.current_executable.table.name,
1682 (", ".join("'%s'" % c for c in set_parameters)),
1683 )
1684 )
1685 for k, v in set_parameters.items():
1686 key_text = (
1687 self.preparer.quote(k)
1688 if isinstance(k, str)
1689 else self.process(k, **set_kw)
1690 )
1691 value_text = self.process(
1692 coercions.expect(roles.ExpressionElementRole, v),
1693 is_upsert_set=True,
1694 **set_kw,
1695 )
1696 action_set_ops.append("%s = %s" % (key_text, value_text))
1697
1698 action_text = ", ".join(action_set_ops)
1699 if clause.update_whereclause is not None:
1700 where_kw = dict(kw)
1701 where_kw.update(include_table=True, use_schema=False)
1702 action_text += " WHERE %s" % self.process(
1703 clause.update_whereclause, **where_kw
1704 )
1705
1706 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
1707
1708 def visit_bitwise_xor_op_binary(self, binary, operator, **kw):
1709 # sqlite has no xor. Use "a XOR b" = "(a | b) - (a & b)".
1710 kw["eager_grouping"] = True
1711 or_ = self._generate_generic_binary(binary, " | ", **kw)
1712 and_ = self._generate_generic_binary(binary, " & ", **kw)
1713 return f"({or_} - {and_})"
1714
1715
1716class SQLiteDDLCompiler(compiler.DDLCompiler):
1717 def get_column_specification(self, column, **kwargs):
1718 coltype = self.dialect.type_compiler_instance.process(
1719 column.type, type_expression=column
1720 )
1721 colspec = self.preparer.format_column(column) + " " + coltype
1722 default = self.get_column_default_string(column)
1723 if default is not None:
1724
1725 if not re.match(r"""^\s*[\'\"\(]""", default) and re.match(
1726 r".*\W.*", default
1727 ):
1728 colspec += f" DEFAULT ({default})"
1729 else:
1730 colspec += f" DEFAULT {default}"
1731
1732 if not column.nullable:
1733 colspec += " NOT NULL"
1734
1735 on_conflict_clause = column.dialect_options["sqlite"][
1736 "on_conflict_not_null"
1737 ]
1738 if on_conflict_clause is not None:
1739 colspec += " ON CONFLICT " + on_conflict_clause
1740
1741 if column.primary_key:
1742 if (
1743 column.autoincrement is True
1744 and len(column.table.primary_key.columns) != 1
1745 ):
1746 raise exc.CompileError(
1747 "SQLite does not support autoincrement for "
1748 "composite primary keys"
1749 )
1750
1751 if (
1752 column.table.dialect_options["sqlite"]["autoincrement"]
1753 and len(column.table.primary_key.columns) == 1
1754 and issubclass(column.type._type_affinity, sqltypes.Integer)
1755 and not column.foreign_keys
1756 ):
1757 colspec += " PRIMARY KEY"
1758
1759 on_conflict_clause = column.dialect_options["sqlite"][
1760 "on_conflict_primary_key"
1761 ]
1762 if on_conflict_clause is not None:
1763 colspec += " ON CONFLICT " + on_conflict_clause
1764
1765 colspec += " AUTOINCREMENT"
1766
1767 if column.computed is not None:
1768 colspec += " " + self.process(column.computed)
1769
1770 return colspec
1771
1772 def visit_primary_key_constraint(self, constraint, **kw):
1773 # for columns with sqlite_autoincrement=True,
1774 # the PRIMARY KEY constraint can only be inline
1775 # with the column itself.
1776 if len(constraint.columns) == 1:
1777 c = list(constraint)[0]
1778 if (
1779 c.primary_key
1780 and c.table.dialect_options["sqlite"]["autoincrement"]
1781 and issubclass(c.type._type_affinity, sqltypes.Integer)
1782 and not c.foreign_keys
1783 ):
1784 return None
1785
1786 text = super().visit_primary_key_constraint(constraint)
1787
1788 on_conflict_clause = constraint.dialect_options["sqlite"][
1789 "on_conflict"
1790 ]
1791 if on_conflict_clause is None and len(constraint.columns) == 1:
1792 on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][
1793 "on_conflict_primary_key"
1794 ]
1795
1796 if on_conflict_clause is not None:
1797 text += " ON CONFLICT " + on_conflict_clause
1798
1799 return text
1800
1801 def visit_unique_constraint(self, constraint, **kw):
1802 text = super().visit_unique_constraint(constraint)
1803
1804 on_conflict_clause = constraint.dialect_options["sqlite"][
1805 "on_conflict"
1806 ]
1807 if on_conflict_clause is None and len(constraint.columns) == 1:
1808 col1 = list(constraint)[0]
1809 if isinstance(col1, schema.SchemaItem):
1810 on_conflict_clause = list(constraint)[0].dialect_options[
1811 "sqlite"
1812 ]["on_conflict_unique"]
1813
1814 if on_conflict_clause is not None:
1815 text += " ON CONFLICT " + on_conflict_clause
1816
1817 return text
1818
1819 def visit_check_constraint(self, constraint, **kw):
1820 text = super().visit_check_constraint(constraint)
1821
1822 on_conflict_clause = constraint.dialect_options["sqlite"][
1823 "on_conflict"
1824 ]
1825
1826 if on_conflict_clause is not None:
1827 text += " ON CONFLICT " + on_conflict_clause
1828
1829 return text
1830
1831 def visit_column_check_constraint(self, constraint, **kw):
1832 text = super().visit_column_check_constraint(constraint)
1833
1834 if constraint.dialect_options["sqlite"]["on_conflict"] is not None:
1835 raise exc.CompileError(
1836 "SQLite does not support on conflict clause for "
1837 "column check constraint"
1838 )
1839
1840 return text
1841
1842 def visit_foreign_key_constraint(self, constraint, **kw):
1843 local_table = constraint.elements[0].parent.table
1844 remote_table = constraint.elements[0].column.table
1845
1846 if local_table.schema != remote_table.schema:
1847 return None
1848 else:
1849 return super().visit_foreign_key_constraint(constraint)
1850
1851 def define_constraint_remote_table(self, constraint, table, preparer):
1852 """Format the remote table clause of a CREATE CONSTRAINT clause."""
1853
1854 return preparer.format_table(table, use_schema=False)
1855
1856 def visit_create_index(
1857 self, create, include_schema=False, include_table_schema=True, **kw
1858 ):
1859 index = create.element
1860 self._verify_index_table(index)
1861 preparer = self.preparer
1862 text = "CREATE "
1863 if index.unique:
1864 text += "UNIQUE "
1865
1866 text += "INDEX "
1867
1868 if create.if_not_exists:
1869 text += "IF NOT EXISTS "
1870
1871 text += "%s ON %s (%s)" % (
1872 self._prepared_index_name(index, include_schema=True),
1873 preparer.format_table(index.table, use_schema=False),
1874 ", ".join(
1875 self.sql_compiler.process(
1876 expr, include_table=False, literal_binds=True
1877 )
1878 for expr in index.expressions
1879 ),
1880 )
1881
1882 whereclause = index.dialect_options["sqlite"]["where"]
1883 if whereclause is not None:
1884 where_compiled = self.sql_compiler.process(
1885 whereclause, include_table=False, literal_binds=True
1886 )
1887 text += " WHERE " + where_compiled
1888
1889 return text
1890
1891 def post_create_table(self, table):
1892 table_options = []
1893
1894 if not table.dialect_options["sqlite"]["with_rowid"]:
1895 table_options.append("WITHOUT ROWID")
1896
1897 if table.dialect_options["sqlite"]["strict"]:
1898 table_options.append("STRICT")
1899
1900 if table_options:
1901 return "\n " + ",\n ".join(table_options)
1902 else:
1903 return ""
1904
1905 def visit_create_view(self, create, **kw):
1906 """Handle SQLite if_not_exists dialect option for CREATE VIEW."""
1907 # Get the if_not_exists dialect option from the CreateView object
1908 if_not_exists = create.dialect_options["sqlite"].get(
1909 "if_not_exists", False
1910 )
1911
1912 # Pass if_not_exists through kw to the parent's _generate_table_select
1913 kw["if_not_exists"] = if_not_exists
1914 return super().visit_create_view(create, **kw)
1915
1916
1917class SQLiteTypeCompiler(compiler.GenericTypeCompiler):
1918 def visit_large_binary(self, type_, **kw):
1919 return self.visit_BLOB(type_)
1920
1921 def visit_DATETIME(self, type_, **kw):
1922 if (
1923 not isinstance(type_, _DateTimeMixin)
1924 or type_.format_is_text_affinity
1925 ):
1926 return super().visit_DATETIME(type_)
1927 else:
1928 return "DATETIME_CHAR"
1929
1930 def visit_DATE(self, type_, **kw):
1931 if (
1932 not isinstance(type_, _DateTimeMixin)
1933 or type_.format_is_text_affinity
1934 ):
1935 return super().visit_DATE(type_)
1936 else:
1937 return "DATE_CHAR"
1938
1939 def visit_TIME(self, type_, **kw):
1940 if (
1941 not isinstance(type_, _DateTimeMixin)
1942 or type_.format_is_text_affinity
1943 ):
1944 return super().visit_TIME(type_)
1945 else:
1946 return "TIME_CHAR"
1947
1948 def visit_JSON(self, type_, **kw):
1949 # note this name provides NUMERIC affinity, not TEXT.
1950 # should not be an issue unless the JSON value consists of a single
1951 # numeric value. JSONTEXT can be used if this case is required.
1952 return "JSON"
1953
1954 def visit_JSONB(self, type_, **kw):
1955 return "JSONB"
1956
1957
1958class SQLiteIdentifierPreparer(compiler.IdentifierPreparer):
1959 reserved_words = {
1960 "add",
1961 "after",
1962 "all",
1963 "alter",
1964 "analyze",
1965 "and",
1966 "as",
1967 "asc",
1968 "attach",
1969 "autoincrement",
1970 "before",
1971 "begin",
1972 "between",
1973 "by",
1974 "cascade",
1975 "case",
1976 "cast",
1977 "check",
1978 "collate",
1979 "column",
1980 "commit",
1981 "conflict",
1982 "constraint",
1983 "create",
1984 "cross",
1985 "current_date",
1986 "current_time",
1987 "current_timestamp",
1988 "database",
1989 "default",
1990 "deferrable",
1991 "deferred",
1992 "delete",
1993 "desc",
1994 "detach",
1995 "distinct",
1996 "drop",
1997 "each",
1998 "else",
1999 "end",
2000 "escape",
2001 "except",
2002 "exclusive",
2003 "exists",
2004 "explain",
2005 "false",
2006 "fail",
2007 "for",
2008 "foreign",
2009 "from",
2010 "full",
2011 "glob",
2012 "group",
2013 "having",
2014 "if",
2015 "ignore",
2016 "immediate",
2017 "in",
2018 "index",
2019 "indexed",
2020 "initially",
2021 "inner",
2022 "insert",
2023 "instead",
2024 "intersect",
2025 "into",
2026 "is",
2027 "isnull",
2028 "join",
2029 "key",
2030 "left",
2031 "like",
2032 "limit",
2033 "match",
2034 "natural",
2035 "not",
2036 "notnull",
2037 "null",
2038 "of",
2039 "offset",
2040 "on",
2041 "or",
2042 "order",
2043 "outer",
2044 "plan",
2045 "pragma",
2046 "primary",
2047 "query",
2048 "raise",
2049 "references",
2050 "reindex",
2051 "rename",
2052 "replace",
2053 "restrict",
2054 "right",
2055 "rollback",
2056 "row",
2057 "select",
2058 "set",
2059 "table",
2060 "temp",
2061 "temporary",
2062 "then",
2063 "to",
2064 "transaction",
2065 "trigger",
2066 "true",
2067 "union",
2068 "unique",
2069 "update",
2070 "using",
2071 "vacuum",
2072 "values",
2073 "view",
2074 "virtual",
2075 "when",
2076 "where",
2077 }
2078
2079
2080class SQLiteExecutionContext(default.DefaultExecutionContext):
2081 @util.memoized_property
2082 def _preserve_raw_colnames(self):
2083 return (
2084 not self.dialect._broken_dotted_colnames
2085 or self.execution_options.get("sqlite_raw_colnames", False)
2086 )
2087
2088 def _translate_colname(self, colname):
2089 # TODO: detect SQLite version 3.10.0 or greater;
2090 # see [ticket:3633]
2091
2092 # adjust for dotted column names. SQLite
2093 # in the case of UNION may store col names as
2094 # "tablename.colname", or if using an attached database,
2095 # "database.tablename.colname", in cursor.description
2096 if not self._preserve_raw_colnames and "." in colname:
2097 return colname.split(".")[-1], colname
2098 else:
2099 return colname, None
2100
2101
2102class SQLiteDialect(default.DefaultDialect):
2103 name = "sqlite"
2104 supports_alter = False
2105
2106 # SQlite supports "DEFAULT VALUES" but *does not* support
2107 # "VALUES (DEFAULT)"
2108 supports_default_values = True
2109 supports_default_metavalue = False
2110
2111 # sqlite issue:
2112 # https://github.com/python/cpython/issues/93421
2113 # note this parameter is no longer used by the ORM or default dialect
2114 # see #9414
2115 supports_sane_rowcount_returning = False
2116
2117 supports_empty_insert = False
2118 supports_cast = True
2119 supports_multivalues_insert = True
2120 use_insertmanyvalues = True
2121 tuple_in_values = True
2122 supports_statement_cache = True
2123 insert_null_pk_still_autoincrements = True
2124 insert_returning = True
2125 update_returning = True
2126 update_returning_multifrom = True
2127 delete_returning = True
2128 update_returning_multifrom = True
2129
2130 supports_default_metavalue = True
2131 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
2132
2133 default_metavalue_token = "NULL"
2134 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
2135 parenthesis."""
2136
2137 default_paramstyle = "qmark"
2138 execution_ctx_cls = SQLiteExecutionContext
2139 statement_compiler = SQLiteCompiler
2140 ddl_compiler = SQLiteDDLCompiler
2141 type_compiler_cls = SQLiteTypeCompiler
2142 preparer = SQLiteIdentifierPreparer
2143 ischema_names = ischema_names
2144 colspecs = colspecs
2145
2146 construct_arguments = [
2147 (
2148 sa_schema.Table,
2149 {
2150 "autoincrement": False,
2151 "with_rowid": True,
2152 "strict": False,
2153 },
2154 ),
2155 (sa_schema.Index, {"where": None}),
2156 (
2157 sa_schema.Column,
2158 {
2159 "on_conflict_primary_key": None,
2160 "on_conflict_not_null": None,
2161 "on_conflict_unique": None,
2162 },
2163 ),
2164 (sa_schema.Constraint, {"on_conflict": None}),
2165 (sa_ddl.CreateView, {"if_not_exists": False}),
2166 ]
2167
2168 _broken_fk_pragma_quotes = False
2169 _broken_dotted_colnames = False
2170
2171 def __init__(
2172 self,
2173 native_datetime: bool = False,
2174 json_serializer: Callable[[_JSON_VALUE], str] | None = None,
2175 json_deserializer: Callable[[str], _JSON_VALUE] | None = None,
2176 **kwargs: Any,
2177 ) -> None:
2178 default.DefaultDialect.__init__(self, **kwargs)
2179
2180 self._json_serializer = json_serializer
2181 self._json_deserializer = json_deserializer
2182
2183 # this flag used by pysqlite dialect, and perhaps others in the
2184 # future, to indicate the driver is handling date/timestamp
2185 # conversions (and perhaps datetime/time as well on some hypothetical
2186 # driver ?)
2187 self.native_datetime = native_datetime
2188
2189 if self.dbapi is not None:
2190 if self.dbapi.sqlite_version_info < (3, 7, 16):
2191 util.warn(
2192 "SQLite version %s is older than 3.7.16, and will not "
2193 "support right nested joins, as are sometimes used in "
2194 "more complex ORM scenarios. SQLAlchemy 1.4 and above "
2195 "no longer tries to rewrite these joins."
2196 % (self.dbapi.sqlite_version_info,)
2197 )
2198
2199 # NOTE: python 3.7 on fedora for me has SQLite 3.34.1. These
2200 # version checks are getting very stale.
2201 self._broken_dotted_colnames = self.dbapi.sqlite_version_info < (
2202 3,
2203 10,
2204 0,
2205 )
2206 self.supports_default_values = self.dbapi.sqlite_version_info >= (
2207 3,
2208 3,
2209 8,
2210 )
2211 self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3)
2212 self.supports_multivalues_insert = (
2213 # https://www.sqlite.org/releaselog/3_7_11.html
2214 self.dbapi.sqlite_version_info
2215 >= (3, 7, 11)
2216 )
2217 # see https://www.sqlalchemy.org/trac/ticket/2568
2218 # as well as https://www.sqlite.org/src/info/600482d161
2219 self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < (
2220 3,
2221 6,
2222 14,
2223 )
2224
2225 if self.dbapi.sqlite_version_info < (3, 35) or util.pypy:
2226 self.update_returning = self.delete_returning = (
2227 self.insert_returning
2228 ) = False
2229
2230 if self.dbapi.sqlite_version_info < (3, 32, 0):
2231 # https://www.sqlite.org/limits.html
2232 self.insertmanyvalues_max_parameters = 999
2233
2234 _isolation_lookup = util.immutabledict(
2235 {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0}
2236 )
2237
2238 def get_isolation_level_values(self, dbapi_connection):
2239 return list(self._isolation_lookup)
2240
2241 def set_isolation_level(
2242 self, dbapi_connection: DBAPIConnection, level: IsolationLevel
2243 ) -> None:
2244 isolation_level = self._isolation_lookup[level]
2245
2246 cursor = dbapi_connection.cursor()
2247 cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}")
2248 cursor.close()
2249
2250 def get_isolation_level(self, dbapi_connection):
2251 cursor = dbapi_connection.cursor()
2252 cursor.execute("PRAGMA read_uncommitted")
2253 res = cursor.fetchone()
2254 if res:
2255 value = res[0]
2256 else:
2257 # https://www.sqlite.org/changes.html#version_3_3_3
2258 # "Optional READ UNCOMMITTED isolation (instead of the
2259 # default isolation level of SERIALIZABLE) and
2260 # table level locking when database connections
2261 # share a common cache.""
2262 # pre-SQLite 3.3.0 default to 0
2263 value = 0
2264 cursor.close()
2265 if value == 0:
2266 return "SERIALIZABLE"
2267 elif value == 1:
2268 return "READ UNCOMMITTED"
2269 else:
2270 assert False, "Unknown isolation level %s" % value
2271
2272 @reflection.cache
2273 def get_schema_names(self, connection, **kw):
2274 s = "PRAGMA database_list"
2275 dl = connection.exec_driver_sql(s)
2276
2277 return [db[1] for db in dl if db[1] != "temp"]
2278
2279 def _format_schema(self, schema, table_name):
2280 if schema is not None:
2281 qschema = self.identifier_preparer.quote_identifier(schema)
2282 name = f"{qschema}.{table_name}"
2283 else:
2284 name = table_name
2285 return name
2286
2287 def _sqlite_main_query(
2288 self,
2289 table: str,
2290 type_: str,
2291 schema: Optional[str],
2292 sqlite_include_internal: bool,
2293 ):
2294 main = self._format_schema(schema, table)
2295 if not sqlite_include_internal:
2296 filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'"
2297 else:
2298 filter_table = ""
2299 query = (
2300 f"SELECT name FROM {main} "
2301 f"WHERE type='{type_}'{filter_table} "
2302 "ORDER BY name"
2303 )
2304 return query
2305
2306 @reflection.cache
2307 def get_table_names(
2308 self, connection, schema=None, sqlite_include_internal=False, **kw
2309 ):
2310 query = self._sqlite_main_query(
2311 "sqlite_master", "table", schema, sqlite_include_internal
2312 )
2313 names = connection.exec_driver_sql(query).scalars().all()
2314 return names
2315
2316 @reflection.cache
2317 def get_temp_table_names(
2318 self, connection, sqlite_include_internal=False, **kw
2319 ):
2320 query = self._sqlite_main_query(
2321 "sqlite_temp_master", "table", None, sqlite_include_internal
2322 )
2323 names = connection.exec_driver_sql(query).scalars().all()
2324 return names
2325
2326 @reflection.cache
2327 def get_temp_view_names(
2328 self, connection, sqlite_include_internal=False, **kw
2329 ):
2330 query = self._sqlite_main_query(
2331 "sqlite_temp_master", "view", None, sqlite_include_internal
2332 )
2333 names = connection.exec_driver_sql(query).scalars().all()
2334 return names
2335
2336 @reflection.cache
2337 def has_table(self, connection, table_name, schema=None, **kw):
2338 self._ensure_has_table_connection(connection)
2339
2340 if schema is not None and schema not in self.get_schema_names(
2341 connection, **kw
2342 ):
2343 return False
2344
2345 info = self._get_table_pragma(
2346 connection, "table_info", table_name, schema=schema
2347 )
2348 return bool(info)
2349
2350 def _get_default_schema_name(self, connection):
2351 return "main"
2352
2353 @reflection.cache
2354 def get_view_names(
2355 self, connection, schema=None, sqlite_include_internal=False, **kw
2356 ):
2357 query = self._sqlite_main_query(
2358 "sqlite_master", "view", schema, sqlite_include_internal
2359 )
2360 names = connection.exec_driver_sql(query).scalars().all()
2361 return names
2362
2363 @reflection.cache
2364 def get_view_definition(self, connection, view_name, schema=None, **kw):
2365 if schema is not None:
2366 qschema = self.identifier_preparer.quote_identifier(schema)
2367 master = f"{qschema}.sqlite_master"
2368 s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % (
2369 master,
2370 )
2371 rs = connection.exec_driver_sql(s, (view_name,))
2372 else:
2373 try:
2374 s = (
2375 "SELECT sql FROM "
2376 " (SELECT * FROM sqlite_master UNION ALL "
2377 " SELECT * FROM sqlite_temp_master) "
2378 "WHERE name = ? "
2379 "AND type='view'"
2380 )
2381 rs = connection.exec_driver_sql(s, (view_name,))
2382 except exc.DBAPIError:
2383 s = (
2384 "SELECT sql FROM sqlite_master WHERE name = ? "
2385 "AND type='view'"
2386 )
2387 rs = connection.exec_driver_sql(s, (view_name,))
2388
2389 result = rs.fetchall()
2390 if result:
2391 return result[0].sql
2392 else:
2393 raise exc.NoSuchTableError(
2394 f"{schema}.{view_name}" if schema else view_name
2395 )
2396
2397 @reflection.cache
2398 def get_columns(self, connection, table_name, schema=None, **kw):
2399 pragma = "table_info"
2400 # computed columns are threaded as hidden, they require table_xinfo
2401 if self.server_version_info >= (3, 31):
2402 pragma = "table_xinfo"
2403 info = self._get_table_pragma(
2404 connection, pragma, table_name, schema=schema
2405 )
2406 columns = []
2407 tablesql = None
2408 for row in info:
2409 name = row[1]
2410 type_ = row[2].upper()
2411 nullable = not row[3]
2412 default = row[4]
2413 primary_key = row[5]
2414 hidden = row[6] if pragma == "table_xinfo" else 0
2415
2416 # hidden has value 0 for normal columns, 1 for hidden columns,
2417 # 2 for computed virtual columns and 3 for computed stored columns
2418 # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b
2419 if hidden == 1:
2420 continue
2421
2422 generated = bool(hidden)
2423 persisted = hidden == 3
2424
2425 if tablesql is None and generated:
2426 tablesql = self._get_table_sql(
2427 connection, table_name, schema, **kw
2428 )
2429 # remove create table
2430 match = re.match(
2431 (
2432 r"create table .*?\((.*)\)"
2433 r"(?:\s*,?\s*(?:WITHOUT\s+ROWID|STRICT))*$"
2434 ),
2435 tablesql.strip(),
2436 re.DOTALL | re.IGNORECASE,
2437 )
2438 assert match, f"create table not found in {tablesql}"
2439 tablesql = match.group(1).strip()
2440
2441 columns.append(
2442 self._get_column_info(
2443 name,
2444 type_,
2445 nullable,
2446 default,
2447 primary_key,
2448 generated,
2449 persisted,
2450 tablesql,
2451 )
2452 )
2453 if columns:
2454 return columns
2455 elif not self.has_table(connection, table_name, schema):
2456 raise exc.NoSuchTableError(
2457 f"{schema}.{table_name}" if schema else table_name
2458 )
2459 else:
2460 return ReflectionDefaults.columns()
2461
2462 def _get_column_info(
2463 self,
2464 name,
2465 type_,
2466 nullable,
2467 default,
2468 primary_key,
2469 generated,
2470 persisted,
2471 tablesql,
2472 ):
2473 if generated:
2474 # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)"
2475 # somehow is "INTEGER GENERATED ALWAYS"
2476 type_ = re.sub("generated", "", type_, flags=re.IGNORECASE)
2477 type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip()
2478
2479 coltype = self._resolve_type_affinity(type_)
2480
2481 if default is not None:
2482 default = str(default)
2483
2484 colspec = {
2485 "name": name,
2486 "type": coltype,
2487 "nullable": nullable,
2488 "default": default,
2489 "primary_key": primary_key,
2490 }
2491 if generated:
2492 sqltext = ""
2493 if tablesql:
2494 pattern = (
2495 r"[^,]*\s+GENERATED\s+ALWAYS\s+AS"
2496 r"\s+\((.*)\)\s*(?:virtual|stored)?"
2497 )
2498 match = re.search(
2499 re.escape(name) + pattern, tablesql, re.IGNORECASE
2500 )
2501 if match:
2502 sqltext = match.group(1)
2503 colspec["computed"] = {"sqltext": sqltext, "persisted": persisted}
2504 return colspec
2505
2506 def _resolve_type_affinity(self, type_):
2507 """Return a data type from a reflected column, using affinity rules.
2508
2509 SQLite's goal for universal compatibility introduces some complexity
2510 during reflection, as a column's defined type might not actually be a
2511 type that SQLite understands - or indeed, my not be defined *at all*.
2512 Internally, SQLite handles this with a 'data type affinity' for each
2513 column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER',
2514 'REAL', or 'NONE' (raw bits). The algorithm that determines this is
2515 listed in https://www.sqlite.org/datatype3.html section 2.1.
2516
2517 This method allows SQLAlchemy to support that algorithm, while still
2518 providing access to smarter reflection utilities by recognizing
2519 column definitions that SQLite only supports through affinity (like
2520 DATE and DOUBLE).
2521
2522 """
2523 match = re.match(r"([\w ]+)(\(.*?\))?", type_)
2524 if match:
2525 coltype = match.group(1)
2526 args = match.group(2)
2527 else:
2528 coltype = ""
2529 args = ""
2530
2531 if coltype in self.ischema_names:
2532 coltype = self.ischema_names[coltype]
2533 elif "INT" in coltype:
2534 coltype = sqltypes.INTEGER
2535 elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype:
2536 coltype = sqltypes.TEXT
2537 elif "BLOB" in coltype or not coltype:
2538 coltype = sqltypes.NullType
2539 elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype:
2540 coltype = sqltypes.REAL
2541 else:
2542 coltype = sqltypes.NUMERIC
2543
2544 if args is not None:
2545 args = re.findall(r"(\d+)", args)
2546 try:
2547 coltype = coltype(*[int(a) for a in args])
2548 except TypeError:
2549 util.warn(
2550 "Could not instantiate type %s with "
2551 "reflected arguments %s; using no arguments."
2552 % (coltype, args)
2553 )
2554 coltype = coltype()
2555 else:
2556 coltype = coltype()
2557
2558 return coltype
2559
2560 @reflection.cache
2561 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
2562 constraint_name = None
2563 table_data = self._get_table_sql(connection, table_name, schema=schema)
2564 if table_data:
2565 PK_PATTERN = r'CONSTRAINT +(?:"(.+?)"|(\w+)) +PRIMARY KEY'
2566 result = re.search(PK_PATTERN, table_data, re.I)
2567 if result:
2568 constraint_name = result.group(1) or result.group(2)
2569 else:
2570 constraint_name = None
2571
2572 cols = self.get_columns(connection, table_name, schema, **kw)
2573 # consider only pk columns. This also avoids sorting the cached
2574 # value returned by get_columns
2575 cols = [col for col in cols if col.get("primary_key", 0) > 0]
2576 cols.sort(key=lambda col: col.get("primary_key"))
2577 pkeys = [col["name"] for col in cols]
2578
2579 if pkeys:
2580 return {"constrained_columns": pkeys, "name": constraint_name}
2581 else:
2582 return ReflectionDefaults.pk_constraint()
2583
2584 @reflection.cache
2585 def get_foreign_keys(self, connection, table_name, schema=None, **kw):
2586 # sqlite makes this *extremely difficult*.
2587 # First, use the pragma to get the actual FKs.
2588 pragma_fks = self._get_table_pragma(
2589 connection, "foreign_key_list", table_name, schema=schema
2590 )
2591
2592 fks = {}
2593
2594 for row in pragma_fks:
2595 numerical_id, rtbl, lcol, rcol = (row[0], row[2], row[3], row[4])
2596
2597 if not rcol:
2598 # no referred column, which means it was not named in the
2599 # original DDL. The referred columns of the foreign key
2600 # constraint are therefore the primary key of the referred
2601 # table.
2602 try:
2603 referred_pk = self.get_pk_constraint(
2604 connection, rtbl, schema=schema, **kw
2605 )
2606 referred_columns = referred_pk["constrained_columns"]
2607 except exc.NoSuchTableError:
2608 # ignore not existing parents
2609 referred_columns = []
2610 else:
2611 # note we use this list only if this is the first column
2612 # in the constraint. for subsequent columns we ignore the
2613 # list and append "rcol" if present.
2614 referred_columns = []
2615
2616 if self._broken_fk_pragma_quotes:
2617 rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl)
2618
2619 if numerical_id in fks:
2620 fk = fks[numerical_id]
2621 else:
2622 fk = fks[numerical_id] = {
2623 "name": None,
2624 "constrained_columns": [],
2625 "referred_schema": schema,
2626 "referred_table": rtbl,
2627 "referred_columns": referred_columns,
2628 "options": {},
2629 }
2630 fks[numerical_id] = fk
2631
2632 fk["constrained_columns"].append(lcol)
2633
2634 if rcol:
2635 fk["referred_columns"].append(rcol)
2636
2637 def fk_sig(constrained_columns, referred_table, referred_columns):
2638 return (
2639 tuple(constrained_columns)
2640 + (referred_table,)
2641 + tuple(referred_columns)
2642 )
2643
2644 # then, parse the actual SQL and attempt to find DDL that matches
2645 # the names as well. SQLite saves the DDL in whatever format
2646 # it was typed in as, so need to be liberal here.
2647
2648 keys_by_signature = {
2649 fk_sig(
2650 fk["constrained_columns"],
2651 fk["referred_table"],
2652 fk["referred_columns"],
2653 ): fk
2654 for fk in fks.values()
2655 }
2656
2657 table_data = self._get_table_sql(connection, table_name, schema=schema)
2658
2659 def parse_fks():
2660 if table_data is None:
2661 # system tables, etc.
2662 return
2663
2664 # note that we already have the FKs from PRAGMA above. This whole
2665 # regexp thing is trying to locate additional detail about the
2666 # FKs, namely the name of the constraint and other options.
2667 # so parsing the columns is really about matching it up to what
2668 # we already have.
2669 FK_PATTERN = (
2670 r'(?:CONSTRAINT +(?:"(.+?)"|(\w+)) +)?'
2671 r"FOREIGN KEY *\( *(.+?) *\) +"
2672 r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *' # noqa: E501
2673 r"((?:ON (?:DELETE|UPDATE) "
2674 r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)"
2675 r"((?:NOT +)?DEFERRABLE)?"
2676 r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?"
2677 )
2678 for match in re.finditer(FK_PATTERN, table_data, re.I):
2679 (
2680 constraint_quoted_name,
2681 constraint_name,
2682 constrained_columns,
2683 referred_quoted_name,
2684 referred_name,
2685 referred_columns,
2686 onupdatedelete,
2687 deferrable,
2688 initially,
2689 ) = match.group(1, 2, 3, 4, 5, 6, 7, 8, 9)
2690 constraint_name = constraint_quoted_name or constraint_name
2691 constrained_columns = list(
2692 self._find_cols_in_sig(constrained_columns)
2693 )
2694 if not referred_columns:
2695 referred_columns = constrained_columns
2696 else:
2697 referred_columns = list(
2698 self._find_cols_in_sig(referred_columns)
2699 )
2700 referred_name = referred_quoted_name or referred_name
2701 options = {}
2702
2703 for token in re.split(r" *\bON\b *", onupdatedelete.upper()):
2704 if token.startswith("DELETE"):
2705 ondelete = token[6:].strip()
2706 if ondelete and ondelete != "NO ACTION":
2707 options["ondelete"] = ondelete
2708 elif token.startswith("UPDATE"):
2709 onupdate = token[6:].strip()
2710 if onupdate and onupdate != "NO ACTION":
2711 options["onupdate"] = onupdate
2712
2713 if deferrable:
2714 options["deferrable"] = "NOT" not in deferrable.upper()
2715 if initially:
2716 options["initially"] = initially.upper()
2717
2718 yield (
2719 constraint_name,
2720 constrained_columns,
2721 referred_name,
2722 referred_columns,
2723 options,
2724 )
2725
2726 fkeys = []
2727
2728 for (
2729 constraint_name,
2730 constrained_columns,
2731 referred_name,
2732 referred_columns,
2733 options,
2734 ) in parse_fks():
2735 sig = fk_sig(constrained_columns, referred_name, referred_columns)
2736 if sig not in keys_by_signature:
2737 util.warn(
2738 "WARNING: SQL-parsed foreign key constraint "
2739 "'%s' could not be located in PRAGMA "
2740 "foreign_keys for table %s" % (sig, table_name)
2741 )
2742 continue
2743 key = keys_by_signature.pop(sig)
2744 key["name"] = constraint_name
2745 key["options"] = options
2746 fkeys.append(key)
2747 # assume the remainders are the unnamed, inline constraints, just
2748 # use them as is as it's extremely difficult to parse inline
2749 # constraints
2750 fkeys.extend(keys_by_signature.values())
2751 if fkeys:
2752 return fkeys
2753 else:
2754 return ReflectionDefaults.foreign_keys()
2755
2756 def _find_cols_in_sig(self, sig):
2757 for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I):
2758 yield match.group(1) or match.group(2)
2759
2760 @reflection.cache
2761 def get_unique_constraints(
2762 self, connection, table_name, schema=None, **kw
2763 ):
2764 auto_index_by_sig = {}
2765 for idx in self.get_indexes(
2766 connection,
2767 table_name,
2768 schema=schema,
2769 include_auto_indexes=True,
2770 **kw,
2771 ):
2772 if not idx["name"].startswith("sqlite_autoindex"):
2773 continue
2774 sig = tuple(idx["column_names"])
2775 auto_index_by_sig[sig] = idx
2776
2777 table_data = self._get_table_sql(
2778 connection, table_name, schema=schema, **kw
2779 )
2780 unique_constraints = []
2781
2782 def parse_uqs():
2783 if table_data is None:
2784 return
2785 UNIQUE_PATTERN = (
2786 r'(?:CONSTRAINT +(?:"(.+?)"|(\w+)) +)?UNIQUE *\((.+?)\)'
2787 )
2788 INLINE_UNIQUE_PATTERN = (
2789 r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?)[\t ]'
2790 r"+[a-z0-9_ ]+?[\t ]+UNIQUE"
2791 )
2792
2793 for match in re.finditer(UNIQUE_PATTERN, table_data, re.I):
2794 quoted_name, unquoted_name, cols = match.group(1, 2, 3)
2795 name = quoted_name or unquoted_name
2796 yield name, list(self._find_cols_in_sig(cols))
2797
2798 # we need to match inlines as well, as we seek to differentiate
2799 # a UNIQUE constraint from a UNIQUE INDEX, even though these
2800 # are kind of the same thing :)
2801 for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I):
2802 cols = list(
2803 self._find_cols_in_sig(match.group(1) or match.group(2))
2804 )
2805 yield None, cols
2806
2807 for name, cols in parse_uqs():
2808 sig = tuple(cols)
2809 if sig in auto_index_by_sig:
2810 auto_index_by_sig.pop(sig)
2811 parsed_constraint = {"name": name, "column_names": cols}
2812 unique_constraints.append(parsed_constraint)
2813 # NOTE: auto_index_by_sig might not be empty here,
2814 # the PRIMARY KEY may have an entry.
2815 if unique_constraints:
2816 return unique_constraints
2817 else:
2818 return ReflectionDefaults.unique_constraints()
2819
2820 @reflection.cache
2821 def get_check_constraints(self, connection, table_name, schema=None, **kw):
2822 table_data = self._get_table_sql(
2823 connection, table_name, schema=schema, **kw
2824 )
2825
2826 # Extract CHECK constraints by properly handling balanced parentheses
2827 # and avoiding false matches when CHECK/CONSTRAINT appear in table
2828 # names. See #12924 for context.
2829 #
2830 # SQLite supports 4 identifier quote styles (see
2831 # sqlite.org/lang_keywords.html):
2832 # - Double quotes "..." (standard SQL)
2833 # - Brackets [...] (MS Access/SQL Server compatibility)
2834 # - Backticks `...` (MySQL compatibility)
2835 # - Single quotes '...' (SQLite extension)
2836 #
2837 # NOTE: there is not currently a way to parse CHECK constraints that
2838 # contain newlines as the approach here relies upon each individual
2839 # CHECK constraint being on a single line by itself. This necessarily
2840 # makes assumptions as to how the CREATE TABLE was emitted.
2841 CHECK_PATTERN = re.compile(
2842 r"""
2843 (?<![A-Za-z0-9_]) # Negative lookbehind: ensure CHECK is not
2844 # part of an identifier (e.g., table name
2845 # like "tableCHECK")
2846
2847 (?: # Optional CONSTRAINT clause
2848 CONSTRAINT\s+
2849 ( # Group 1: Constraint name (quoted or unquoted)
2850 "(?:[^"]|"")+" # Double-quoted: "name" or "na""me"
2851 |'(?:[^']|'')+' # Single-quoted: 'name' or 'na''me'
2852 |\[(?:[^\]]|\]\])+\] # Bracket-quoted: [name] or [na]]me]
2853 |`(?:[^`]|``)+` # Backtick-quoted: `name` or `na``me`
2854 |\S+ # Unquoted: simple_name
2855 )
2856 \s+
2857 )?
2858
2859 CHECK\s*\( # CHECK keyword followed by opening paren
2860 """,
2861 re.VERBOSE | re.IGNORECASE,
2862 )
2863 cks = []
2864
2865 for match in re.finditer(CHECK_PATTERN, table_data or ""):
2866 constraint_name = match.group(1)
2867
2868 if constraint_name:
2869 # Remove surrounding quotes if present
2870 # Double quotes: "name" -> name
2871 # Single quotes: 'name' -> name
2872 # Brackets: [name] -> name
2873 # Backticks: `name` -> name
2874 constraint_name = re.sub(
2875 r'^(["\'`])(.+)\1$|^\[(.+)\]$',
2876 lambda m: m.group(2) or m.group(3),
2877 constraint_name,
2878 flags=re.DOTALL,
2879 )
2880
2881 # Find the matching closing parenthesis by counting balanced parens
2882 # Must track string context to ignore parens inside string literals
2883 start = match.end() # Position after 'CHECK ('
2884 paren_count = 1
2885 in_single_quote = False
2886 in_double_quote = False
2887
2888 for pos, char in enumerate(table_data[start:], start):
2889 # Track string literal context
2890 if char == "'" and not in_double_quote:
2891 in_single_quote = not in_single_quote
2892 elif char == '"' and not in_single_quote:
2893 in_double_quote = not in_double_quote
2894 # Only count parens when not inside a string literal
2895 elif not in_single_quote and not in_double_quote:
2896 if char == "(":
2897 paren_count += 1
2898 elif char == ")":
2899 paren_count -= 1
2900 if paren_count == 0:
2901 # Successfully found matching closing parenthesis
2902 sqltext = table_data[start:pos].strip()
2903 cks.append(
2904 {"sqltext": sqltext, "name": constraint_name}
2905 )
2906 break
2907
2908 cks.sort(key=lambda d: d["name"] or "~") # sort None as last
2909 if cks:
2910 return cks
2911 else:
2912 return ReflectionDefaults.check_constraints()
2913
2914 @reflection.cache
2915 def get_indexes(self, connection, table_name, schema=None, **kw):
2916 pragma_indexes = self._get_table_pragma(
2917 connection, "index_list", table_name, schema=schema
2918 )
2919 indexes = []
2920
2921 # regular expression to extract the filter predicate of a partial
2922 # index. this could fail to extract the predicate correctly on
2923 # indexes created like
2924 # CREATE INDEX i ON t (col || ') where') WHERE col <> ''
2925 # but as this function does not support expression-based indexes
2926 # this case does not occur.
2927 partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE)
2928
2929 if schema:
2930 schema_expr = "%s." % self.identifier_preparer.quote_identifier(
2931 schema
2932 )
2933 else:
2934 schema_expr = ""
2935
2936 include_auto_indexes = kw.pop("include_auto_indexes", False)
2937 for row in pragma_indexes:
2938 # ignore implicit primary key index.
2939 # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html
2940 if not include_auto_indexes and row[1].startswith(
2941 "sqlite_autoindex"
2942 ):
2943 continue
2944 indexes.append(
2945 dict(
2946 name=row[1],
2947 column_names=[],
2948 unique=row[2],
2949 dialect_options={},
2950 )
2951 )
2952
2953 # check partial indexes
2954 if len(row) >= 5 and row[4]:
2955 s = (
2956 "SELECT sql FROM %(schema)ssqlite_master "
2957 "WHERE name = ? "
2958 "AND type = 'index'" % {"schema": schema_expr}
2959 )
2960 rs = connection.exec_driver_sql(s, (row[1],))
2961 index_sql = rs.scalar()
2962 predicate_match = partial_pred_re.search(index_sql)
2963 if predicate_match is None:
2964 # unless the regex is broken this case shouldn't happen
2965 # because we know this is a partial index, so the
2966 # definition sql should match the regex
2967 util.warn(
2968 "Failed to look up filter predicate of "
2969 "partial index %s" % row[1]
2970 )
2971 else:
2972 predicate = predicate_match.group(1)
2973 indexes[-1]["dialect_options"]["sqlite_where"] = text(
2974 predicate
2975 )
2976
2977 # loop thru unique indexes to get the column names.
2978 for idx in list(indexes):
2979 pragma_index = self._get_table_pragma(
2980 connection, "index_info", idx["name"], schema=schema
2981 )
2982
2983 for row in pragma_index:
2984 if row[2] is None:
2985 util.warn(
2986 "Skipped unsupported reflection of "
2987 "expression-based index %s" % idx["name"]
2988 )
2989 indexes.remove(idx)
2990 break
2991 else:
2992 idx["column_names"].append(row[2])
2993
2994 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last
2995 if indexes:
2996 return indexes
2997 elif not self.has_table(connection, table_name, schema):
2998 raise exc.NoSuchTableError(
2999 f"{schema}.{table_name}" if schema else table_name
3000 )
3001 else:
3002 return ReflectionDefaults.indexes()
3003
3004 def _is_sys_table(self, table_name):
3005 return table_name in {
3006 "sqlite_schema",
3007 "sqlite_master",
3008 "sqlite_temp_schema",
3009 "sqlite_temp_master",
3010 }
3011
3012 @reflection.cache
3013 def _get_table_sql(self, connection, table_name, schema=None, **kw):
3014 if schema:
3015 schema_expr = "%s." % (
3016 self.identifier_preparer.quote_identifier(schema)
3017 )
3018 else:
3019 schema_expr = ""
3020 try:
3021 s = (
3022 "SELECT sql FROM "
3023 " (SELECT * FROM %(schema)ssqlite_master UNION ALL "
3024 " SELECT * FROM %(schema)ssqlite_temp_master) "
3025 "WHERE name = ? "
3026 "AND type in ('table', 'view')" % {"schema": schema_expr}
3027 )
3028 rs = connection.exec_driver_sql(s, (table_name,))
3029 except exc.DBAPIError:
3030 s = (
3031 "SELECT sql FROM %(schema)ssqlite_master "
3032 "WHERE name = ? "
3033 "AND type in ('table', 'view')" % {"schema": schema_expr}
3034 )
3035 rs = connection.exec_driver_sql(s, (table_name,))
3036 value = rs.scalar()
3037 if value is None and not self._is_sys_table(table_name):
3038 raise exc.NoSuchTableError(f"{schema_expr}{table_name}")
3039 return value
3040
3041 def _get_table_pragma(self, connection, pragma, table_name, schema=None):
3042 quote = self.identifier_preparer.quote_identifier
3043 if schema is not None:
3044 statements = [f"PRAGMA {quote(schema)}."]
3045 else:
3046 # because PRAGMA looks in all attached databases if no schema
3047 # given, need to specify "main" schema, however since we want
3048 # 'temp' tables in the same namespace as 'main', need to run
3049 # the PRAGMA twice
3050 statements = ["PRAGMA main.", "PRAGMA temp."]
3051
3052 qtable = quote(table_name)
3053 for statement in statements:
3054 statement = f"{statement}{pragma}({qtable})"
3055 cursor = connection.exec_driver_sql(statement)
3056 if not cursor._soft_closed:
3057 # work around SQLite issue whereby cursor.description
3058 # is blank when PRAGMA returns no rows:
3059 # https://www.sqlite.org/cvstrac/tktview?tn=1884
3060 result = cursor.fetchall()
3061 else:
3062 result = []
3063 if result:
3064 return result
3065 else:
3066 return []