1# dialects/sqlite/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10r'''
11.. dialect:: sqlite
12 :name: SQLite
13 :normal_support: 3.12+
14 :best_effort: 3.7.16+
15
16.. _sqlite_datetime:
17
18Date and Time Types
19-------------------
20
21SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does
22not provide out of the box functionality for translating values between Python
23`datetime` objects and a SQLite-supported format. SQLAlchemy's own
24:class:`~sqlalchemy.types.DateTime` and related types provide date formatting
25and parsing functionality when SQLite is used. The implementation classes are
26:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`.
27These types represent dates and times as ISO formatted strings, which also
28nicely support ordering. There's no reliance on typical "libc" internals for
29these functions so historical dates are fully supported.
30
31Ensuring Text affinity
32^^^^^^^^^^^^^^^^^^^^^^
33
34The DDL rendered for these types is the standard ``DATE``, ``TIME``
35and ``DATETIME`` indicators. However, custom storage formats can also be
36applied to these types. When the
37storage format is detected as containing no alpha characters, the DDL for
38these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``,
39so that the column continues to have textual affinity.
40
41.. seealso::
42
43 `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ -
44 in the SQLite documentation
45
46.. _sqlite_autoincrement:
47
48SQLite Auto Incrementing Behavior
49----------------------------------
50
51Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html
52
53Key concepts:
54
55* SQLite has an implicit "auto increment" feature that takes place for any
56 non-composite primary-key column that is specifically created using
57 "INTEGER PRIMARY KEY" for the type + primary key.
58
59* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not**
60 equivalent to the implicit autoincrement feature; this keyword is not
61 recommended for general use. SQLAlchemy does not render this keyword
62 unless a special SQLite-specific directive is used (see below). However,
63 it still requires that the column's type is named "INTEGER".
64
65Using the AUTOINCREMENT Keyword
66^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
67
68To specifically render the AUTOINCREMENT keyword on the primary key column
69when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table
70construct::
71
72 Table(
73 "sometable",
74 metadata,
75 Column("id", Integer, primary_key=True),
76 sqlite_autoincrement=True,
77 )
78
79Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER
80^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
81
82SQLite's typing model is based on naming conventions. Among other things, this
83means that any type name which contains the substring ``"INT"`` will be
84determined to be of "integer affinity". A type named ``"BIGINT"``,
85``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be
86of "integer" affinity. However, **the SQLite autoincrement feature, whether
87implicitly or explicitly enabled, requires that the name of the column's type
88is exactly the string "INTEGER"**. Therefore, if an application uses a type
89like :class:`.BigInteger` for a primary key, on SQLite this type will need to
90be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE
91TABLE`` statement in order for the autoincrement behavior to be available.
92
93One approach to achieve this is to use :class:`.Integer` on SQLite
94only using :meth:`.TypeEngine.with_variant`::
95
96 table = Table(
97 "my_table",
98 metadata,
99 Column(
100 "id",
101 BigInteger().with_variant(Integer, "sqlite"),
102 primary_key=True,
103 ),
104 )
105
106Another is to use a subclass of :class:`.BigInteger` that overrides its DDL
107name to be ``INTEGER`` when compiled against SQLite::
108
109 from sqlalchemy import BigInteger
110 from sqlalchemy.ext.compiler import compiles
111
112
113 class SLBigInteger(BigInteger):
114 pass
115
116
117 @compiles(SLBigInteger, "sqlite")
118 def bi_c(element, compiler, **kw):
119 return "INTEGER"
120
121
122 @compiles(SLBigInteger)
123 def bi_c(element, compiler, **kw):
124 return compiler.visit_BIGINT(element, **kw)
125
126
127 table = Table(
128 "my_table", metadata, Column("id", SLBigInteger(), primary_key=True)
129 )
130
131.. seealso::
132
133 :meth:`.TypeEngine.with_variant`
134
135 :ref:`sqlalchemy.ext.compiler_toplevel`
136
137 `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_
138
139.. _sqlite_transactions:
140
141Transactions with SQLite and the sqlite3 driver
142-----------------------------------------------
143
144As a file-based database, SQLite's approach to transactions differs from
145traditional databases in many ways. Additionally, the ``sqlite3`` driver
146standard with Python (as well as the async version ``aiosqlite`` which builds
147on top of it) has several quirks, workarounds, and API features in the
148area of transaction control, all of which generally need to be addressed when
149constructing a SQLAlchemy application that uses SQLite.
150
151Legacy Transaction Mode with the sqlite3 driver
152^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
153
154The most important aspect of transaction handling with the sqlite3 driver is
155that it defaults (which will continue through Python 3.15 before being
156removed in Python 3.16) to legacy transactional behavior which does
157not strictly follow :pep:`249`. The way in which the driver diverges from the
158PEP is that it does not "begin" a transaction automatically as dictated by
159:pep:`249` except in the case of DML statements, e.g. INSERT, UPDATE, and
160DELETE. Normally, :pep:`249` dictates that a BEGIN must be emitted upon
161the first SQL statement of any kind, so that all subsequent operations will
162be established within a transaction until ``connection.commit()`` has been
163called. The ``sqlite3`` driver, in an effort to be easier to use in
164highly concurrent environments, skips this step for DQL (e.g. SELECT) statements,
165and also skips it for DDL (e.g. CREATE TABLE etc.) statements for more legacy
166reasons. Statements such as SAVEPOINT are also skipped.
167
168In modern versions of the ``sqlite3`` driver as of Python 3.12, this legacy
169mode of operation is referred to as
170`"legacy transaction control" <https://docs.python.org/3/library/sqlite3.html#sqlite3-transaction-control-isolation-level>`_, and is in
171effect by default due to the ``Connection.autocommit`` parameter being set to
172the constant ``sqlite3.LEGACY_TRANSACTION_CONTROL``. Prior to Python 3.12,
173the ``Connection.autocommit`` attribute did not exist.
174
175The implications of legacy transaction mode include:
176
177* **Incorrect support for transactional DDL** - statements like CREATE TABLE, ALTER TABLE,
178 CREATE INDEX etc. will not automatically BEGIN a transaction if one were not
179 started already, leading to the changes by each statement being
180 "autocommitted" immediately unless BEGIN were otherwise emitted first. Very
181 old (pre Python 3.6) versions of SQLite would also force a COMMIT for these
182 operations even if a transaction were present, however this is no longer the
183 case.
184* **SERIALIZABLE behavior not fully functional** - SQLite's transaction isolation
185 behavior is normally consistent with SERIALIZABLE isolation, as it is a file-
186 based system that locks the database file entirely for write operations,
187 preventing COMMIT until all reader transactions (and associated file locks)
188 have completed. However, sqlite3's legacy transaction mode fails to emit BEGIN for SELECT
189 statements, which causes these SELECT statements to no longer be "repeatable",
190 failing one of the consistency guarantees of SERIALIZABLE.
191* **Incorrect behavior for SAVEPOINT** - as the SAVEPOINT statement does not
192 imply a BEGIN, a new SAVEPOINT emitted before a BEGIN will function on its
193 own but fails to participate in the enclosing transaction, meaning a ROLLBACK
194 of the transaction will not rollback elements that were part of a released
195 savepoint.
196
197Legacy transaction mode first existed in order to faciliate working around
198SQLite's file locks. Because SQLite relies upon whole-file locks, it is easy to
199get "database is locked" errors, particularly when newer features like "write
200ahead logging" are disabled. This is a key reason why ``sqlite3``'s legacy
201transaction mode is still the default mode of operation; disabling it will
202produce behavior that is more susceptible to locked database errors. However
203note that **legacy transaction mode will no longer be the default** in a future
204Python version (3.16 as of this writing).
205
206.. _sqlite_enabling_transactions:
207
208Enabling Non-Legacy SQLite Transactional Modes with the sqlite3 or aiosqlite driver
209^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
210
211Current SQLAlchemy support allows either for setting the
212``.Connection.autocommit`` attribute, most directly by using a
213:func:`._sa.create_engine` parameter, or if on an older version of Python where
214the attribute is not available, using event hooks to control the behavior of
215BEGIN.
216
217* **Enabling modern sqlite3 transaction control via the autocommit connect parameter** (Python 3.12 and above)
218
219 To use SQLite in the mode described at `Transaction control via the autocommit attribute <https://docs.python.org/3/library/sqlite3.html#transaction-control-via-the-autocommit-attribute>`_,
220 the most straightforward approach is to set the attribute to its recommended value
221 of ``False`` at the connect level using :paramref:`_sa.create_engine.connect_args``::
222
223 from sqlalchemy import create_engine
224
225 engine = create_engine(
226 "sqlite:///myfile.db", connect_args={"autocommit": False}
227 )
228
229 This parameter is also passed through when using the aiosqlite driver::
230
231 from sqlalchemy.ext.asyncio import create_async_engine
232
233 engine = create_async_engine(
234 "sqlite+aiosqlite:///myfile.db", connect_args={"autocommit": False}
235 )
236
237 The parameter can also be set at the attribute level using the :meth:`.PoolEvents.connect`
238 event hook, however this will only work for sqlite3, as aiosqlite does not yet expose this
239 attribute on its ``Connection`` object::
240
241 from sqlalchemy import create_engine, event
242
243 engine = create_engine("sqlite:///myfile.db")
244
245
246 @event.listens_for(engine, "connect")
247 def do_connect(dbapi_connection, connection_record):
248 # enable autocommit=False mode
249 dbapi_connection.autocommit = False
250
251* **Using SQLAlchemy to emit BEGIN in lieu of SQLite's transaction control** (all Python versions, sqlite3 and aiosqlite)
252
253 For older versions of ``sqlite3`` or for cross-compatiblity with older and
254 newer versions, SQLAlchemy can also take over the job of transaction control.
255 This is achieved by using the :meth:`.ConnectionEvents.begin` hook
256 to emit the "BEGIN" command directly, while also disabling SQLite's control
257 of this command using the :meth:`.PoolEvents.connect` event hook to set the
258 ``Connection.isolation_level`` attribute to ``None``::
259
260
261 from sqlalchemy import create_engine, event
262
263 engine = create_engine("sqlite:///myfile.db")
264
265
266 @event.listens_for(engine, "connect")
267 def do_connect(dbapi_connection, connection_record):
268 # disable sqlite3's emitting of the BEGIN statement entirely.
269 dbapi_connection.isolation_level = None
270
271
272 @event.listens_for(engine, "begin")
273 def do_begin(conn):
274 # emit our own BEGIN. sqlite3 still emits COMMIT/ROLLBACK correctly
275 conn.exec_driver_sql("BEGIN")
276
277 When using the asyncio variant ``aiosqlite``, refer to ``engine.sync_engine``
278 as in the example below::
279
280 from sqlalchemy import create_engine, event
281 from sqlalchemy.ext.asyncio import create_async_engine
282
283 engine = create_async_engine("sqlite+aiosqlite:///myfile.db")
284
285
286 @event.listens_for(engine.sync_engine, "connect")
287 def do_connect(dbapi_connection, connection_record):
288 # disable aiosqlite's emitting of the BEGIN statement entirely.
289 dbapi_connection.isolation_level = None
290
291
292 @event.listens_for(engine.sync_engine, "begin")
293 def do_begin(conn):
294 # emit our own BEGIN. aiosqlite still emits COMMIT/ROLLBACK correctly
295 conn.exec_driver_sql("BEGIN")
296
297.. _sqlite_isolation_level:
298
299Using SQLAlchemy's Driver Level AUTOCOMMIT Feature with SQLite
300^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
301
302SQLAlchemy has a comprehensive database isolation feature with optional
303autocommit support that is introduced in the section :ref:`dbapi_autocommit`.
304
305For the ``sqlite3`` and ``aiosqlite`` drivers, SQLAlchemy only includes
306built-in support for "AUTOCOMMIT". Note that this mode is currently incompatible
307with the non-legacy isolation mode hooks documented in the previous
308section at :ref:`sqlite_enabling_transactions`.
309
310To use the ``sqlite3`` driver with SQLAlchemy driver-level autocommit,
311create an engine setting the :paramref:`_sa.create_engine.isolation_level`
312parameter to "AUTOCOMMIT"::
313
314 eng = create_engine("sqlite:///myfile.db", isolation_level="AUTOCOMMIT")
315
316When using the above mode, any event hooks that set the sqlite3 ``Connection.autocommit``
317parameter away from its default of ``sqlite3.LEGACY_TRANSACTION_CONTROL``
318as well as hooks that emit ``BEGIN`` should be disabled.
319
320Additional Reading for SQLite / sqlite3 transaction control
321^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
322
323Links with important information on SQLite, the sqlite3 driver,
324as well as long historical conversations on how things got to their current state:
325
326* `Isolation in SQLite <https://www.sqlite.org/isolation.html>`_ - on the SQLite website
327* `Transaction control <https://docs.python.org/3/library/sqlite3.html#transaction-control>`_ - describes the sqlite3 autocommit attribute as well
328 as the legacy isolation_level attribute.
329* `sqlite3 SELECT does not BEGIN a transaction, but should according to spec <https://github.com/python/cpython/issues/54133>`_ - imported Python standard library issue on github
330* `sqlite3 module breaks transactions and potentially corrupts data <https://github.com/python/cpython/issues/54949>`_ - imported Python standard library issue on github
331
332
333INSERT/UPDATE/DELETE...RETURNING
334---------------------------------
335
336The SQLite dialect supports SQLite 3.35's ``INSERT|UPDATE|DELETE..RETURNING``
337syntax. ``INSERT..RETURNING`` may be used
338automatically in some cases in order to fetch newly generated identifiers in
339place of the traditional approach of using ``cursor.lastrowid``, however
340``cursor.lastrowid`` is currently still preferred for simple single-statement
341cases for its better performance.
342
343To specify an explicit ``RETURNING`` clause, use the
344:meth:`._UpdateBase.returning` method on a per-statement basis::
345
346 # INSERT..RETURNING
347 result = connection.execute(
348 table.insert().values(name="foo").returning(table.c.col1, table.c.col2)
349 )
350 print(result.all())
351
352 # UPDATE..RETURNING
353 result = connection.execute(
354 table.update()
355 .where(table.c.name == "foo")
356 .values(name="bar")
357 .returning(table.c.col1, table.c.col2)
358 )
359 print(result.all())
360
361 # DELETE..RETURNING
362 result = connection.execute(
363 table.delete()
364 .where(table.c.name == "foo")
365 .returning(table.c.col1, table.c.col2)
366 )
367 print(result.all())
368
369.. versionadded:: 2.0 Added support for SQLite RETURNING
370
371
372.. _sqlite_foreign_keys:
373
374Foreign Key Support
375-------------------
376
377SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables,
378however by default these constraints have no effect on the operation of the
379table.
380
381Constraint checking on SQLite has three prerequisites:
382
383* At least version 3.6.19 of SQLite must be in use
384* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY
385 or SQLITE_OMIT_TRIGGER symbols enabled.
386* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all
387 connections before use -- including the initial call to
388 :meth:`sqlalchemy.schema.MetaData.create_all`.
389
390SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for
391new connections through the usage of events::
392
393 from sqlalchemy.engine import Engine
394 from sqlalchemy import event
395
396
397 @event.listens_for(Engine, "connect")
398 def set_sqlite_pragma(dbapi_connection, connection_record):
399 # the sqlite3 driver will not set PRAGMA foreign_keys
400 # if autocommit=False; set to True temporarily
401 ac = dbapi_connection.autocommit
402 dbapi_connection.autocommit = True
403
404 cursor = dbapi_connection.cursor()
405 cursor.execute("PRAGMA foreign_keys=ON")
406 cursor.close()
407
408 # restore previous autocommit setting
409 dbapi_connection.autocommit = ac
410
411.. warning::
412
413 When SQLite foreign keys are enabled, it is **not possible**
414 to emit CREATE or DROP statements for tables that contain
415 mutually-dependent foreign key constraints;
416 to emit the DDL for these tables requires that ALTER TABLE be used to
417 create or drop these constraints separately, for which SQLite has
418 no support.
419
420.. seealso::
421
422 `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_
423 - on the SQLite web site.
424
425 :ref:`event_toplevel` - SQLAlchemy event API.
426
427 :ref:`use_alter` - more information on SQLAlchemy's facilities for handling
428 mutually-dependent foreign key constraints.
429
430.. _sqlite_on_conflict_ddl:
431
432ON CONFLICT support for constraints
433-----------------------------------
434
435.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for
436 SQLite, which occurs within a CREATE TABLE statement. For "ON CONFLICT" as
437 applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`.
438
439SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied
440to primary key, unique, check, and not null constraints. In DDL, it is
441rendered either within the "CONSTRAINT" clause or within the column definition
442itself depending on the location of the target constraint. To render this
443clause within DDL, the extension parameter ``sqlite_on_conflict`` can be
444specified with a string conflict resolution algorithm within the
445:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`,
446:class:`.CheckConstraint` objects. Within the :class:`_schema.Column` object,
447there
448are individual parameters ``sqlite_on_conflict_not_null``,
449``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each
450correspond to the three types of relevant constraint types that can be
451indicated from a :class:`_schema.Column` object.
452
453.. seealso::
454
455 `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite
456 documentation
457
458The ``sqlite_on_conflict`` parameters accept a string argument which is just
459the resolution name to be chosen, which on SQLite can be one of ROLLBACK,
460ABORT, FAIL, IGNORE, and REPLACE. For example, to add a UNIQUE constraint
461that specifies the IGNORE algorithm::
462
463 some_table = Table(
464 "some_table",
465 metadata,
466 Column("id", Integer, primary_key=True),
467 Column("data", Integer),
468 UniqueConstraint("id", "data", sqlite_on_conflict="IGNORE"),
469 )
470
471The above renders CREATE TABLE DDL as:
472
473.. sourcecode:: sql
474
475 CREATE TABLE some_table (
476 id INTEGER NOT NULL,
477 data INTEGER,
478 PRIMARY KEY (id),
479 UNIQUE (id, data) ON CONFLICT IGNORE
480 )
481
482
483When using the :paramref:`_schema.Column.unique`
484flag to add a UNIQUE constraint
485to a single column, the ``sqlite_on_conflict_unique`` parameter can
486be added to the :class:`_schema.Column` as well, which will be added to the
487UNIQUE constraint in the DDL::
488
489 some_table = Table(
490 "some_table",
491 metadata,
492 Column("id", Integer, primary_key=True),
493 Column(
494 "data", Integer, unique=True, sqlite_on_conflict_unique="IGNORE"
495 ),
496 )
497
498rendering:
499
500.. sourcecode:: sql
501
502 CREATE TABLE some_table (
503 id INTEGER NOT NULL,
504 data INTEGER,
505 PRIMARY KEY (id),
506 UNIQUE (data) ON CONFLICT IGNORE
507 )
508
509To apply the FAIL algorithm for a NOT NULL constraint,
510``sqlite_on_conflict_not_null`` is used::
511
512 some_table = Table(
513 "some_table",
514 metadata,
515 Column("id", Integer, primary_key=True),
516 Column(
517 "data", Integer, nullable=False, sqlite_on_conflict_not_null="FAIL"
518 ),
519 )
520
521this renders the column inline ON CONFLICT phrase:
522
523.. sourcecode:: sql
524
525 CREATE TABLE some_table (
526 id INTEGER NOT NULL,
527 data INTEGER NOT NULL ON CONFLICT FAIL,
528 PRIMARY KEY (id)
529 )
530
531
532Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``::
533
534 some_table = Table(
535 "some_table",
536 metadata,
537 Column(
538 "id",
539 Integer,
540 primary_key=True,
541 sqlite_on_conflict_primary_key="FAIL",
542 ),
543 )
544
545SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict
546resolution algorithm is applied to the constraint itself:
547
548.. sourcecode:: sql
549
550 CREATE TABLE some_table (
551 id INTEGER NOT NULL,
552 PRIMARY KEY (id) ON CONFLICT FAIL
553 )
554
555.. _sqlite_on_conflict_insert:
556
557INSERT...ON CONFLICT (Upsert)
558-----------------------------
559
560.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for
561 SQLite, which occurs within an INSERT statement. For "ON CONFLICT" as
562 applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`.
563
564From version 3.24.0 onwards, SQLite supports "upserts" (update or insert)
565of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT``
566statement. A candidate row will only be inserted if that row does not violate
567any unique or primary key constraints. In the case of a unique constraint violation, a
568secondary action can occur which can be either "DO UPDATE", indicating that
569the data in the target row should be updated, or "DO NOTHING", which indicates
570to silently skip this row.
571
572Conflicts are determined using columns that are part of existing unique
573constraints and indexes. These constraints are identified by stating the
574columns and conditions that comprise the indexes.
575
576SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific
577:func:`_sqlite.insert()` function, which provides
578the generative methods :meth:`_sqlite.Insert.on_conflict_do_update`
579and :meth:`_sqlite.Insert.on_conflict_do_nothing`:
580
581.. sourcecode:: pycon+sql
582
583 >>> from sqlalchemy.dialects.sqlite import insert
584
585 >>> insert_stmt = insert(my_table).values(
586 ... id="some_existing_id", data="inserted value"
587 ... )
588
589 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
590 ... index_elements=["id"], set_=dict(data="updated value")
591 ... )
592
593 >>> print(do_update_stmt)
594 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
595 ON CONFLICT (id) DO UPDATE SET data = ?{stop}
596
597 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"])
598
599 >>> print(do_nothing_stmt)
600 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
601 ON CONFLICT (id) DO NOTHING
602
603.. versionadded:: 1.4
604
605.. seealso::
606
607 `Upsert
608 <https://sqlite.org/lang_UPSERT.html>`_
609 - in the SQLite documentation.
610
611
612Specifying the Target
613^^^^^^^^^^^^^^^^^^^^^
614
615Both methods supply the "target" of the conflict using column inference:
616
617* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument
618 specifies a sequence containing string column names, :class:`_schema.Column`
619 objects, and/or SQL expression elements, which would identify a unique index
620 or unique constraint.
621
622* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements`
623 to infer an index, a partial index can be inferred by also specifying the
624 :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter:
625
626 .. sourcecode:: pycon+sql
627
628 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data")
629
630 >>> do_update_stmt = stmt.on_conflict_do_update(
631 ... index_elements=[my_table.c.user_email],
632 ... index_where=my_table.c.user_email.like("%@gmail.com"),
633 ... set_=dict(data=stmt.excluded.data),
634 ... )
635
636 >>> print(do_update_stmt)
637 {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?)
638 ON CONFLICT (user_email)
639 WHERE user_email LIKE '%@gmail.com'
640 DO UPDATE SET data = excluded.data
641
642The SET Clause
643^^^^^^^^^^^^^^^
644
645``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
646existing row, using any combination of new values as well as values
647from the proposed insertion. These values are specified using the
648:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter. This
649parameter accepts a dictionary which consists of direct values
650for UPDATE:
651
652.. sourcecode:: pycon+sql
653
654 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
655
656 >>> do_update_stmt = stmt.on_conflict_do_update(
657 ... index_elements=["id"], set_=dict(data="updated value")
658 ... )
659
660 >>> print(do_update_stmt)
661 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
662 ON CONFLICT (id) DO UPDATE SET data = ?
663
664.. warning::
665
666 The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take
667 into account Python-side default UPDATE values or generation functions,
668 e.g. those specified using :paramref:`_schema.Column.onupdate`. These
669 values will not be exercised for an ON CONFLICT style of UPDATE, unless
670 they are manually specified in the
671 :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary.
672
673Updating using the Excluded INSERT Values
674^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
675
676In order to refer to the proposed insertion row, the special alias
677:attr:`~.sqlite.Insert.excluded` is available as an attribute on
678the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix
679on a column, that informs the DO UPDATE to update the row with the value that
680would have been inserted had the constraint not failed:
681
682.. sourcecode:: pycon+sql
683
684 >>> stmt = insert(my_table).values(
685 ... id="some_id", data="inserted value", author="jlh"
686 ... )
687
688 >>> do_update_stmt = stmt.on_conflict_do_update(
689 ... index_elements=["id"],
690 ... set_=dict(data="updated value", author=stmt.excluded.author),
691 ... )
692
693 >>> print(do_update_stmt)
694 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
695 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
696
697Additional WHERE Criteria
698^^^^^^^^^^^^^^^^^^^^^^^^^
699
700The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts
701a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where`
702parameter, which will limit those rows which receive an UPDATE:
703
704.. sourcecode:: pycon+sql
705
706 >>> stmt = insert(my_table).values(
707 ... id="some_id", data="inserted value", author="jlh"
708 ... )
709
710 >>> on_update_stmt = stmt.on_conflict_do_update(
711 ... index_elements=["id"],
712 ... set_=dict(data="updated value", author=stmt.excluded.author),
713 ... where=(my_table.c.status == 2),
714 ... )
715 >>> print(on_update_stmt)
716 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
717 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
718 WHERE my_table.status = ?
719
720
721Skipping Rows with DO NOTHING
722^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
723
724``ON CONFLICT`` may be used to skip inserting a row entirely
725if any conflict with a unique constraint occurs; below this is illustrated
726using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method:
727
728.. sourcecode:: pycon+sql
729
730 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
731 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
732 >>> print(stmt)
733 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING
734
735
736If ``DO NOTHING`` is used without specifying any columns or constraint,
737it has the effect of skipping the INSERT for any unique violation which
738occurs:
739
740.. sourcecode:: pycon+sql
741
742 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
743 >>> stmt = stmt.on_conflict_do_nothing()
744 >>> print(stmt)
745 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING
746
747.. _sqlite_type_reflection:
748
749Type Reflection
750---------------
751
752SQLite types are unlike those of most other database backends, in that
753the string name of the type usually does not correspond to a "type" in a
754one-to-one fashion. Instead, SQLite links per-column typing behavior
755to one of five so-called "type affinities" based on a string matching
756pattern for the type.
757
758SQLAlchemy's reflection process, when inspecting types, uses a simple
759lookup table to link the keywords returned to provided SQLAlchemy types.
760This lookup table is present within the SQLite dialect as it is for all
761other dialects. However, the SQLite dialect has a different "fallback"
762routine for when a particular type name is not located in the lookup map;
763it instead implements the SQLite "type affinity" scheme located at
764https://www.sqlite.org/datatype3.html section 2.1.
765
766The provided typemap will make direct associations from an exact string
767name match for the following types:
768
769:class:`_types.BIGINT`, :class:`_types.BLOB`,
770:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`,
771:class:`_types.CHAR`, :class:`_types.DATE`,
772:class:`_types.DATETIME`, :class:`_types.FLOAT`,
773:class:`_types.DECIMAL`, :class:`_types.FLOAT`,
774:class:`_types.INTEGER`, :class:`_types.INTEGER`,
775:class:`_types.NUMERIC`, :class:`_types.REAL`,
776:class:`_types.SMALLINT`, :class:`_types.TEXT`,
777:class:`_types.TIME`, :class:`_types.TIMESTAMP`,
778:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`,
779:class:`_types.NCHAR`
780
781When a type name does not match one of the above types, the "type affinity"
782lookup is used instead:
783
784* :class:`_types.INTEGER` is returned if the type name includes the
785 string ``INT``
786* :class:`_types.TEXT` is returned if the type name includes the
787 string ``CHAR``, ``CLOB`` or ``TEXT``
788* :class:`_types.NullType` is returned if the type name includes the
789 string ``BLOB``
790* :class:`_types.REAL` is returned if the type name includes the string
791 ``REAL``, ``FLOA`` or ``DOUB``.
792* Otherwise, the :class:`_types.NUMERIC` type is used.
793
794.. _sqlite_partial_index:
795
796Partial Indexes
797---------------
798
799A partial index, e.g. one which uses a WHERE clause, can be specified
800with the DDL system using the argument ``sqlite_where``::
801
802 tbl = Table("testtbl", m, Column("data", Integer))
803 idx = Index(
804 "test_idx1",
805 tbl.c.data,
806 sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10),
807 )
808
809The index will be rendered at create time as:
810
811.. sourcecode:: sql
812
813 CREATE INDEX test_idx1 ON testtbl (data)
814 WHERE data > 5 AND data < 10
815
816.. _sqlite_dotted_column_names:
817
818Dotted Column Names
819-------------------
820
821Using table or column names that explicitly have periods in them is
822**not recommended**. While this is generally a bad idea for relational
823databases in general, as the dot is a syntactically significant character,
824the SQLite driver up until version **3.10.0** of SQLite has a bug which
825requires that SQLAlchemy filter out these dots in result sets.
826
827The bug, entirely outside of SQLAlchemy, can be illustrated thusly::
828
829 import sqlite3
830
831 assert sqlite3.sqlite_version_info < (
832 3,
833 10,
834 0,
835 ), "bug is fixed in this version"
836
837 conn = sqlite3.connect(":memory:")
838 cursor = conn.cursor()
839
840 cursor.execute("create table x (a integer, b integer)")
841 cursor.execute("insert into x (a, b) values (1, 1)")
842 cursor.execute("insert into x (a, b) values (2, 2)")
843
844 cursor.execute("select x.a, x.b from x")
845 assert [c[0] for c in cursor.description] == ["a", "b"]
846
847 cursor.execute(
848 """
849 select x.a, x.b from x where a=1
850 union
851 select x.a, x.b from x where a=2
852 """
853 )
854 assert [c[0] for c in cursor.description] == ["a", "b"], [
855 c[0] for c in cursor.description
856 ]
857
858The second assertion fails:
859
860.. sourcecode:: text
861
862 Traceback (most recent call last):
863 File "test.py", line 19, in <module>
864 [c[0] for c in cursor.description]
865 AssertionError: ['x.a', 'x.b']
866
867Where above, the driver incorrectly reports the names of the columns
868including the name of the table, which is entirely inconsistent vs.
869when the UNION is not present.
870
871SQLAlchemy relies upon column names being predictable in how they match
872to the original statement, so the SQLAlchemy dialect has no choice but
873to filter these out::
874
875
876 from sqlalchemy import create_engine
877
878 eng = create_engine("sqlite://")
879 conn = eng.connect()
880
881 conn.exec_driver_sql("create table x (a integer, b integer)")
882 conn.exec_driver_sql("insert into x (a, b) values (1, 1)")
883 conn.exec_driver_sql("insert into x (a, b) values (2, 2)")
884
885 result = conn.exec_driver_sql("select x.a, x.b from x")
886 assert result.keys() == ["a", "b"]
887
888 result = conn.exec_driver_sql(
889 """
890 select x.a, x.b from x where a=1
891 union
892 select x.a, x.b from x where a=2
893 """
894 )
895 assert result.keys() == ["a", "b"]
896
897Note that above, even though SQLAlchemy filters out the dots, *both
898names are still addressable*::
899
900 >>> row = result.first()
901 >>> row["a"]
902 1
903 >>> row["x.a"]
904 1
905 >>> row["b"]
906 1
907 >>> row["x.b"]
908 1
909
910Therefore, the workaround applied by SQLAlchemy only impacts
911:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In
912the very specific case where an application is forced to use column names that
913contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and
914:meth:`.Row.keys()` is required to return these dotted names unmodified,
915the ``sqlite_raw_colnames`` execution option may be provided, either on a
916per-:class:`_engine.Connection` basis::
917
918 result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql(
919 """
920 select x.a, x.b from x where a=1
921 union
922 select x.a, x.b from x where a=2
923 """
924 )
925 assert result.keys() == ["x.a", "x.b"]
926
927or on a per-:class:`_engine.Engine` basis::
928
929 engine = create_engine(
930 "sqlite://", execution_options={"sqlite_raw_colnames": True}
931 )
932
933When using the per-:class:`_engine.Engine` execution option, note that
934**Core and ORM queries that use UNION may not function properly**.
935
936SQLite-specific table options
937-----------------------------
938
939One option for CREATE TABLE is supported directly by the SQLite
940dialect in conjunction with the :class:`_schema.Table` construct:
941
942* ``WITHOUT ROWID``::
943
944 Table("some_table", metadata, ..., sqlite_with_rowid=False)
945
946*
947 ``STRICT``::
948
949 Table("some_table", metadata, ..., sqlite_strict=True)
950
951 .. versionadded:: 2.0.37
952
953.. seealso::
954
955 `SQLite CREATE TABLE options
956 <https://www.sqlite.org/lang_createtable.html>`_
957
958.. _sqlite_include_internal:
959
960Reflecting internal schema tables
961----------------------------------
962
963Reflection methods that return lists of tables will omit so-called
964"SQLite internal schema object" names, which are considered by SQLite
965as any object name that is prefixed with ``sqlite_``. An example of
966such an object is the ``sqlite_sequence`` table that's generated when
967the ``AUTOINCREMENT`` column parameter is used. In order to return
968these objects, the parameter ``sqlite_include_internal=True`` may be
969passed to methods such as :meth:`_schema.MetaData.reflect` or
970:meth:`.Inspector.get_table_names`.
971
972.. versionadded:: 2.0 Added the ``sqlite_include_internal=True`` parameter.
973 Previously, these tables were not ignored by SQLAlchemy reflection
974 methods.
975
976.. note::
977
978 The ``sqlite_include_internal`` parameter does not refer to the
979 "system" tables that are present in schemas such as ``sqlite_master``.
980
981.. seealso::
982
983 `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite
984 documentation.
985
986''' # noqa
987from __future__ import annotations
988
989import datetime
990import numbers
991import re
992from typing import Any
993from typing import Callable
994from typing import Optional
995from typing import TYPE_CHECKING
996
997from .json import JSON
998from .json import JSONIndexType
999from .json import JSONPathType
1000from ... import exc
1001from ... import schema as sa_schema
1002from ... import sql
1003from ... import text
1004from ... import types as sqltypes
1005from ... import util
1006from ...engine import default
1007from ...engine import processors
1008from ...engine import reflection
1009from ...engine.reflection import ReflectionDefaults
1010from ...sql import coercions
1011from ...sql import compiler
1012from ...sql import elements
1013from ...sql import roles
1014from ...sql import schema
1015from ...types import BLOB # noqa
1016from ...types import BOOLEAN # noqa
1017from ...types import CHAR # noqa
1018from ...types import DECIMAL # noqa
1019from ...types import FLOAT # noqa
1020from ...types import INTEGER # noqa
1021from ...types import NUMERIC # noqa
1022from ...types import REAL # noqa
1023from ...types import SMALLINT # noqa
1024from ...types import TEXT # noqa
1025from ...types import TIMESTAMP # noqa
1026from ...types import VARCHAR # noqa
1027
1028if TYPE_CHECKING:
1029 from ...engine.interfaces import DBAPIConnection
1030 from ...engine.interfaces import Dialect
1031 from ...engine.interfaces import IsolationLevel
1032 from ...sql.type_api import _BindProcessorType
1033 from ...sql.type_api import _ResultProcessorType
1034
1035
1036class _SQliteJson(JSON):
1037 def result_processor(self, dialect, coltype):
1038 default_processor = super().result_processor(dialect, coltype)
1039
1040 def process(value):
1041 try:
1042 return default_processor(value)
1043 except TypeError:
1044 if isinstance(value, numbers.Number):
1045 return value
1046 else:
1047 raise
1048
1049 return process
1050
1051
1052class _DateTimeMixin:
1053 _reg = None
1054 _storage_format = None
1055
1056 def __init__(self, storage_format=None, regexp=None, **kw):
1057 super().__init__(**kw)
1058 if regexp is not None:
1059 self._reg = re.compile(regexp)
1060 if storage_format is not None:
1061 self._storage_format = storage_format
1062
1063 @property
1064 def format_is_text_affinity(self):
1065 """return True if the storage format will automatically imply
1066 a TEXT affinity.
1067
1068 If the storage format contains no non-numeric characters,
1069 it will imply a NUMERIC storage format on SQLite; in this case,
1070 the type will generate its DDL as DATE_CHAR, DATETIME_CHAR,
1071 TIME_CHAR.
1072
1073 """
1074 spec = self._storage_format % {
1075 "year": 0,
1076 "month": 0,
1077 "day": 0,
1078 "hour": 0,
1079 "minute": 0,
1080 "second": 0,
1081 "microsecond": 0,
1082 }
1083 return bool(re.search(r"[^0-9]", spec))
1084
1085 def adapt(self, cls, **kw):
1086 if issubclass(cls, _DateTimeMixin):
1087 if self._storage_format:
1088 kw["storage_format"] = self._storage_format
1089 if self._reg:
1090 kw["regexp"] = self._reg
1091 return super().adapt(cls, **kw)
1092
1093 def literal_processor(self, dialect):
1094 bp = self.bind_processor(dialect)
1095
1096 def process(value):
1097 return "'%s'" % bp(value)
1098
1099 return process
1100
1101
1102class DATETIME(_DateTimeMixin, sqltypes.DateTime):
1103 r"""Represent a Python datetime object in SQLite using a string.
1104
1105 The default string storage format is::
1106
1107 "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1108
1109 e.g.:
1110
1111 .. sourcecode:: text
1112
1113 2021-03-15 12:05:57.105542
1114
1115 The incoming storage format is by default parsed using the
1116 Python ``datetime.fromisoformat()`` function.
1117
1118 .. versionchanged:: 2.0 ``datetime.fromisoformat()`` is used for default
1119 datetime string parsing.
1120
1121 The storage format can be customized to some degree using the
1122 ``storage_format`` and ``regexp`` parameters, such as::
1123
1124 import re
1125 from sqlalchemy.dialects.sqlite import DATETIME
1126
1127 dt = DATETIME(
1128 storage_format=(
1129 "%(year)04d/%(month)02d/%(day)02d %(hour)02d:%(minute)02d:%(second)02d"
1130 ),
1131 regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)",
1132 )
1133
1134 :param truncate_microseconds: when ``True`` microseconds will be truncated
1135 from the datetime. Can't be specified together with ``storage_format``
1136 or ``regexp``.
1137
1138 :param storage_format: format string which will be applied to the dict
1139 with keys year, month, day, hour, minute, second, and microsecond.
1140
1141 :param regexp: regular expression which will be applied to incoming result
1142 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming
1143 strings. If the regexp contains named groups, the resulting match dict is
1144 applied to the Python datetime() constructor as keyword arguments.
1145 Otherwise, if positional groups are used, the datetime() constructor
1146 is called with positional arguments via
1147 ``*map(int, match_obj.groups(0))``.
1148
1149 """ # noqa
1150
1151 _storage_format = (
1152 "%(year)04d-%(month)02d-%(day)02d "
1153 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1154 )
1155
1156 def __init__(self, *args, **kwargs):
1157 truncate_microseconds = kwargs.pop("truncate_microseconds", False)
1158 super().__init__(*args, **kwargs)
1159 if truncate_microseconds:
1160 assert "storage_format" not in kwargs, (
1161 "You can specify only "
1162 "one of truncate_microseconds or storage_format."
1163 )
1164 assert "regexp" not in kwargs, (
1165 "You can specify only one of "
1166 "truncate_microseconds or regexp."
1167 )
1168 self._storage_format = (
1169 "%(year)04d-%(month)02d-%(day)02d "
1170 "%(hour)02d:%(minute)02d:%(second)02d"
1171 )
1172
1173 def bind_processor(
1174 self, dialect: Dialect
1175 ) -> Optional[_BindProcessorType[Any]]:
1176 datetime_datetime = datetime.datetime
1177 datetime_date = datetime.date
1178 format_ = self._storage_format
1179
1180 def process(value):
1181 if value is None:
1182 return None
1183 elif isinstance(value, datetime_datetime):
1184 return format_ % {
1185 "year": value.year,
1186 "month": value.month,
1187 "day": value.day,
1188 "hour": value.hour,
1189 "minute": value.minute,
1190 "second": value.second,
1191 "microsecond": value.microsecond,
1192 }
1193 elif isinstance(value, datetime_date):
1194 return format_ % {
1195 "year": value.year,
1196 "month": value.month,
1197 "day": value.day,
1198 "hour": 0,
1199 "minute": 0,
1200 "second": 0,
1201 "microsecond": 0,
1202 }
1203 else:
1204 raise TypeError(
1205 "SQLite DateTime type only accepts Python "
1206 "datetime and date objects as input."
1207 )
1208
1209 return process
1210
1211 def result_processor(
1212 self, dialect: Dialect, coltype: object
1213 ) -> Optional[_ResultProcessorType[Any]]:
1214 if self._reg:
1215 return processors.str_to_datetime_processor_factory(
1216 self._reg, datetime.datetime
1217 )
1218 else:
1219 return processors.str_to_datetime
1220
1221
1222class DATE(_DateTimeMixin, sqltypes.Date):
1223 r"""Represent a Python date object in SQLite using a string.
1224
1225 The default string storage format is::
1226
1227 "%(year)04d-%(month)02d-%(day)02d"
1228
1229 e.g.:
1230
1231 .. sourcecode:: text
1232
1233 2011-03-15
1234
1235 The incoming storage format is by default parsed using the
1236 Python ``date.fromisoformat()`` function.
1237
1238 .. versionchanged:: 2.0 ``date.fromisoformat()`` is used for default
1239 date string parsing.
1240
1241
1242 The storage format can be customized to some degree using the
1243 ``storage_format`` and ``regexp`` parameters, such as::
1244
1245 import re
1246 from sqlalchemy.dialects.sqlite import DATE
1247
1248 d = DATE(
1249 storage_format="%(month)02d/%(day)02d/%(year)04d",
1250 regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)"),
1251 )
1252
1253 :param storage_format: format string which will be applied to the
1254 dict with keys year, month, and day.
1255
1256 :param regexp: regular expression which will be applied to
1257 incoming result rows, replacing the use of ``date.fromisoformat()`` to
1258 parse incoming strings. If the regexp contains named groups, the resulting
1259 match dict is applied to the Python date() constructor as keyword
1260 arguments. Otherwise, if positional groups are used, the date()
1261 constructor is called with positional arguments via
1262 ``*map(int, match_obj.groups(0))``.
1263
1264 """
1265
1266 _storage_format = "%(year)04d-%(month)02d-%(day)02d"
1267
1268 def bind_processor(
1269 self, dialect: Dialect
1270 ) -> Optional[_BindProcessorType[Any]]:
1271 datetime_date = datetime.date
1272 format_ = self._storage_format
1273
1274 def process(value):
1275 if value is None:
1276 return None
1277 elif isinstance(value, datetime_date):
1278 return format_ % {
1279 "year": value.year,
1280 "month": value.month,
1281 "day": value.day,
1282 }
1283 else:
1284 raise TypeError(
1285 "SQLite Date type only accepts Python "
1286 "date objects as input."
1287 )
1288
1289 return process
1290
1291 def result_processor(
1292 self, dialect: Dialect, coltype: object
1293 ) -> Optional[_ResultProcessorType[Any]]:
1294 if self._reg:
1295 return processors.str_to_datetime_processor_factory(
1296 self._reg, datetime.date
1297 )
1298 else:
1299 return processors.str_to_date
1300
1301
1302class TIME(_DateTimeMixin, sqltypes.Time):
1303 r"""Represent a Python time object in SQLite using a string.
1304
1305 The default string storage format is::
1306
1307 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1308
1309 e.g.:
1310
1311 .. sourcecode:: text
1312
1313 12:05:57.10558
1314
1315 The incoming storage format is by default parsed using the
1316 Python ``time.fromisoformat()`` function.
1317
1318 .. versionchanged:: 2.0 ``time.fromisoformat()`` is used for default
1319 time string parsing.
1320
1321 The storage format can be customized to some degree using the
1322 ``storage_format`` and ``regexp`` parameters, such as::
1323
1324 import re
1325 from sqlalchemy.dialects.sqlite import TIME
1326
1327 t = TIME(
1328 storage_format="%(hour)02d-%(minute)02d-%(second)02d-%(microsecond)06d",
1329 regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?"),
1330 )
1331
1332 :param truncate_microseconds: when ``True`` microseconds will be truncated
1333 from the time. Can't be specified together with ``storage_format``
1334 or ``regexp``.
1335
1336 :param storage_format: format string which will be applied to the dict
1337 with keys hour, minute, second, and microsecond.
1338
1339 :param regexp: regular expression which will be applied to incoming result
1340 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming
1341 strings. If the regexp contains named groups, the resulting match dict is
1342 applied to the Python time() constructor as keyword arguments. Otherwise,
1343 if positional groups are used, the time() constructor is called with
1344 positional arguments via ``*map(int, match_obj.groups(0))``.
1345
1346 """
1347
1348 _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1349
1350 def __init__(self, *args, **kwargs):
1351 truncate_microseconds = kwargs.pop("truncate_microseconds", False)
1352 super().__init__(*args, **kwargs)
1353 if truncate_microseconds:
1354 assert "storage_format" not in kwargs, (
1355 "You can specify only "
1356 "one of truncate_microseconds or storage_format."
1357 )
1358 assert "regexp" not in kwargs, (
1359 "You can specify only one of "
1360 "truncate_microseconds or regexp."
1361 )
1362 self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d"
1363
1364 def bind_processor(self, dialect):
1365 datetime_time = datetime.time
1366 format_ = self._storage_format
1367
1368 def process(value):
1369 if value is None:
1370 return None
1371 elif isinstance(value, datetime_time):
1372 return format_ % {
1373 "hour": value.hour,
1374 "minute": value.minute,
1375 "second": value.second,
1376 "microsecond": value.microsecond,
1377 }
1378 else:
1379 raise TypeError(
1380 "SQLite Time type only accepts Python "
1381 "time objects as input."
1382 )
1383
1384 return process
1385
1386 def result_processor(self, dialect, coltype):
1387 if self._reg:
1388 return processors.str_to_datetime_processor_factory(
1389 self._reg, datetime.time
1390 )
1391 else:
1392 return processors.str_to_time
1393
1394
1395colspecs = {
1396 sqltypes.Date: DATE,
1397 sqltypes.DateTime: DATETIME,
1398 sqltypes.JSON: _SQliteJson,
1399 sqltypes.JSON.JSONIndexType: JSONIndexType,
1400 sqltypes.JSON.JSONPathType: JSONPathType,
1401 sqltypes.Time: TIME,
1402}
1403
1404ischema_names = {
1405 "BIGINT": sqltypes.BIGINT,
1406 "BLOB": sqltypes.BLOB,
1407 "BOOL": sqltypes.BOOLEAN,
1408 "BOOLEAN": sqltypes.BOOLEAN,
1409 "CHAR": sqltypes.CHAR,
1410 "DATE": sqltypes.DATE,
1411 "DATE_CHAR": sqltypes.DATE,
1412 "DATETIME": sqltypes.DATETIME,
1413 "DATETIME_CHAR": sqltypes.DATETIME,
1414 "DOUBLE": sqltypes.DOUBLE,
1415 "DECIMAL": sqltypes.DECIMAL,
1416 "FLOAT": sqltypes.FLOAT,
1417 "INT": sqltypes.INTEGER,
1418 "INTEGER": sqltypes.INTEGER,
1419 "JSON": JSON,
1420 "NUMERIC": sqltypes.NUMERIC,
1421 "REAL": sqltypes.REAL,
1422 "SMALLINT": sqltypes.SMALLINT,
1423 "TEXT": sqltypes.TEXT,
1424 "TIME": sqltypes.TIME,
1425 "TIME_CHAR": sqltypes.TIME,
1426 "TIMESTAMP": sqltypes.TIMESTAMP,
1427 "VARCHAR": sqltypes.VARCHAR,
1428 "NVARCHAR": sqltypes.NVARCHAR,
1429 "NCHAR": sqltypes.NCHAR,
1430}
1431
1432
1433class SQLiteCompiler(compiler.SQLCompiler):
1434 extract_map = util.update_copy(
1435 compiler.SQLCompiler.extract_map,
1436 {
1437 "month": "%m",
1438 "day": "%d",
1439 "year": "%Y",
1440 "second": "%S",
1441 "hour": "%H",
1442 "doy": "%j",
1443 "minute": "%M",
1444 "epoch": "%s",
1445 "dow": "%w",
1446 "week": "%W",
1447 },
1448 )
1449
1450 def visit_truediv_binary(self, binary, operator, **kw):
1451 return (
1452 self.process(binary.left, **kw)
1453 + " / "
1454 + "(%s + 0.0)" % self.process(binary.right, **kw)
1455 )
1456
1457 def visit_now_func(self, fn, **kw):
1458 return "CURRENT_TIMESTAMP"
1459
1460 def visit_localtimestamp_func(self, func, **kw):
1461 return "DATETIME(CURRENT_TIMESTAMP, 'localtime')"
1462
1463 def visit_true(self, expr, **kw):
1464 return "1"
1465
1466 def visit_false(self, expr, **kw):
1467 return "0"
1468
1469 def visit_char_length_func(self, fn, **kw):
1470 return "length%s" % self.function_argspec(fn)
1471
1472 def visit_aggregate_strings_func(self, fn, **kw):
1473 return super().visit_aggregate_strings_func(
1474 fn, use_function_name="group_concat", **kw
1475 )
1476
1477 def visit_cast(self, cast, **kwargs):
1478 if self.dialect.supports_cast:
1479 return super().visit_cast(cast, **kwargs)
1480 else:
1481 return self.process(cast.clause, **kwargs)
1482
1483 def visit_extract(self, extract, **kw):
1484 try:
1485 return "CAST(STRFTIME('%s', %s) AS INTEGER)" % (
1486 self.extract_map[extract.field],
1487 self.process(extract.expr, **kw),
1488 )
1489 except KeyError as err:
1490 raise exc.CompileError(
1491 "%s is not a valid extract argument." % extract.field
1492 ) from err
1493
1494 def returning_clause(
1495 self,
1496 stmt,
1497 returning_cols,
1498 *,
1499 populate_result_map,
1500 **kw,
1501 ):
1502 kw["include_table"] = False
1503 return super().returning_clause(
1504 stmt, returning_cols, populate_result_map=populate_result_map, **kw
1505 )
1506
1507 def limit_clause(self, select, **kw):
1508 text = ""
1509 if select._limit_clause is not None:
1510 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
1511 if select._offset_clause is not None:
1512 if select._limit_clause is None:
1513 text += "\n LIMIT " + self.process(sql.literal(-1))
1514 text += " OFFSET " + self.process(select._offset_clause, **kw)
1515 else:
1516 text += " OFFSET " + self.process(sql.literal(0), **kw)
1517 return text
1518
1519 def for_update_clause(self, select, **kw):
1520 # sqlite has no "FOR UPDATE" AFAICT
1521 return ""
1522
1523 def update_from_clause(
1524 self, update_stmt, from_table, extra_froms, from_hints, **kw
1525 ):
1526 kw["asfrom"] = True
1527 return "FROM " + ", ".join(
1528 t._compiler_dispatch(self, fromhints=from_hints, **kw)
1529 for t in extra_froms
1530 )
1531
1532 def visit_is_distinct_from_binary(self, binary, operator, **kw):
1533 return "%s IS NOT %s" % (
1534 self.process(binary.left),
1535 self.process(binary.right),
1536 )
1537
1538 def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
1539 return "%s IS %s" % (
1540 self.process(binary.left),
1541 self.process(binary.right),
1542 )
1543
1544 def visit_json_getitem_op_binary(
1545 self, binary, operator, _cast_applied=False, **kw
1546 ):
1547 if (
1548 not _cast_applied
1549 and binary.type._type_affinity is not sqltypes.JSON
1550 ):
1551 kw["_cast_applied"] = True
1552 return self.process(sql.cast(binary, binary.type), **kw)
1553
1554 if binary.type._type_affinity is sqltypes.JSON:
1555 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
1556 else:
1557 expr = "JSON_EXTRACT(%s, %s)"
1558
1559 return expr % (
1560 self.process(binary.left, **kw),
1561 self.process(binary.right, **kw),
1562 )
1563
1564 def visit_json_path_getitem_op_binary(
1565 self, binary, operator, _cast_applied=False, **kw
1566 ):
1567 if (
1568 not _cast_applied
1569 and binary.type._type_affinity is not sqltypes.JSON
1570 ):
1571 kw["_cast_applied"] = True
1572 return self.process(sql.cast(binary, binary.type), **kw)
1573
1574 if binary.type._type_affinity is sqltypes.JSON:
1575 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
1576 else:
1577 expr = "JSON_EXTRACT(%s, %s)"
1578
1579 return expr % (
1580 self.process(binary.left, **kw),
1581 self.process(binary.right, **kw),
1582 )
1583
1584 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
1585 # slightly old SQLite versions don't seem to be able to handle
1586 # the empty set impl
1587 return self.visit_empty_set_expr(type_)
1588
1589 def visit_empty_set_expr(self, element_types, **kw):
1590 return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % (
1591 ", ".join("1" for type_ in element_types or [INTEGER()]),
1592 ", ".join("1" for type_ in element_types or [INTEGER()]),
1593 )
1594
1595 def visit_regexp_match_op_binary(self, binary, operator, **kw):
1596 return self._generate_generic_binary(binary, " REGEXP ", **kw)
1597
1598 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
1599 return self._generate_generic_binary(binary, " NOT REGEXP ", **kw)
1600
1601 def _on_conflict_target(self, clause, **kw):
1602 if clause.inferred_target_elements is not None:
1603 target_text = "(%s)" % ", ".join(
1604 (
1605 self.preparer.quote(c)
1606 if isinstance(c, str)
1607 else self.process(c, include_table=False, use_schema=False)
1608 )
1609 for c in clause.inferred_target_elements
1610 )
1611 if clause.inferred_target_whereclause is not None:
1612 target_text += " WHERE %s" % self.process(
1613 clause.inferred_target_whereclause,
1614 include_table=False,
1615 use_schema=False,
1616 literal_execute=True,
1617 )
1618
1619 else:
1620 target_text = ""
1621
1622 return target_text
1623
1624 def visit_on_conflict_do_nothing(self, on_conflict, **kw):
1625 target_text = self._on_conflict_target(on_conflict, **kw)
1626
1627 if target_text:
1628 return "ON CONFLICT %s DO NOTHING" % target_text
1629 else:
1630 return "ON CONFLICT DO NOTHING"
1631
1632 def visit_on_conflict_do_update(self, on_conflict, **kw):
1633 clause = on_conflict
1634
1635 target_text = self._on_conflict_target(on_conflict, **kw)
1636
1637 action_set_ops = []
1638
1639 set_parameters = dict(clause.update_values_to_set)
1640 # create a list of column assignment clauses as tuples
1641
1642 insert_statement = self.stack[-1]["selectable"]
1643 cols = insert_statement.table.c
1644 for c in cols:
1645 col_key = c.key
1646
1647 if col_key in set_parameters:
1648 value = set_parameters.pop(col_key)
1649 elif c in set_parameters:
1650 value = set_parameters.pop(c)
1651 else:
1652 continue
1653
1654 if (
1655 isinstance(value, elements.BindParameter)
1656 and value.type._isnull
1657 ):
1658 value = value._with_binary_element_type(c.type)
1659 value_text = self.process(value.self_group(), use_schema=False)
1660
1661 key_text = self.preparer.quote(c.name)
1662 action_set_ops.append("%s = %s" % (key_text, value_text))
1663
1664 # check for names that don't match columns
1665 if set_parameters:
1666 util.warn(
1667 "Additional column names not matching "
1668 "any column keys in table '%s': %s"
1669 % (
1670 self.current_executable.table.name,
1671 (", ".join("'%s'" % c for c in set_parameters)),
1672 )
1673 )
1674 for k, v in set_parameters.items():
1675 key_text = (
1676 self.preparer.quote(k)
1677 if isinstance(k, str)
1678 else self.process(k, use_schema=False)
1679 )
1680 value_text = self.process(
1681 coercions.expect(roles.ExpressionElementRole, v),
1682 use_schema=False,
1683 )
1684 action_set_ops.append("%s = %s" % (key_text, value_text))
1685
1686 action_text = ", ".join(action_set_ops)
1687 if clause.update_whereclause is not None:
1688 action_text += " WHERE %s" % self.process(
1689 clause.update_whereclause, include_table=True, use_schema=False
1690 )
1691
1692 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
1693
1694 def visit_bitwise_xor_op_binary(self, binary, operator, **kw):
1695 # sqlite has no xor. Use "a XOR b" = "(a | b) - (a & b)".
1696 kw["eager_grouping"] = True
1697 or_ = self._generate_generic_binary(binary, " | ", **kw)
1698 and_ = self._generate_generic_binary(binary, " & ", **kw)
1699 return f"({or_} - {and_})"
1700
1701
1702class SQLiteDDLCompiler(compiler.DDLCompiler):
1703 def get_column_specification(self, column, **kwargs):
1704 coltype = self.dialect.type_compiler_instance.process(
1705 column.type, type_expression=column
1706 )
1707 colspec = self.preparer.format_column(column) + " " + coltype
1708 default = self.get_column_default_string(column)
1709 if default is not None:
1710
1711 if not re.match(r"""^\s*[\'\"\(]""", default) and re.match(
1712 r".*\W.*", default
1713 ):
1714 colspec += f" DEFAULT ({default})"
1715 else:
1716 colspec += f" DEFAULT {default}"
1717
1718 if not column.nullable:
1719 colspec += " NOT NULL"
1720
1721 on_conflict_clause = column.dialect_options["sqlite"][
1722 "on_conflict_not_null"
1723 ]
1724 if on_conflict_clause is not None:
1725 colspec += " ON CONFLICT " + on_conflict_clause
1726
1727 if column.primary_key:
1728 if (
1729 column.autoincrement is True
1730 and len(column.table.primary_key.columns) != 1
1731 ):
1732 raise exc.CompileError(
1733 "SQLite does not support autoincrement for "
1734 "composite primary keys"
1735 )
1736
1737 if (
1738 column.table.dialect_options["sqlite"]["autoincrement"]
1739 and len(column.table.primary_key.columns) == 1
1740 and issubclass(column.type._type_affinity, sqltypes.Integer)
1741 and not column.foreign_keys
1742 ):
1743 colspec += " PRIMARY KEY"
1744
1745 on_conflict_clause = column.dialect_options["sqlite"][
1746 "on_conflict_primary_key"
1747 ]
1748 if on_conflict_clause is not None:
1749 colspec += " ON CONFLICT " + on_conflict_clause
1750
1751 colspec += " AUTOINCREMENT"
1752
1753 if column.computed is not None:
1754 colspec += " " + self.process(column.computed)
1755
1756 return colspec
1757
1758 def visit_primary_key_constraint(self, constraint, **kw):
1759 # for columns with sqlite_autoincrement=True,
1760 # the PRIMARY KEY constraint can only be inline
1761 # with the column itself.
1762 if len(constraint.columns) == 1:
1763 c = list(constraint)[0]
1764 if (
1765 c.primary_key
1766 and c.table.dialect_options["sqlite"]["autoincrement"]
1767 and issubclass(c.type._type_affinity, sqltypes.Integer)
1768 and not c.foreign_keys
1769 ):
1770 return None
1771
1772 text = super().visit_primary_key_constraint(constraint)
1773
1774 on_conflict_clause = constraint.dialect_options["sqlite"][
1775 "on_conflict"
1776 ]
1777 if on_conflict_clause is None and len(constraint.columns) == 1:
1778 on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][
1779 "on_conflict_primary_key"
1780 ]
1781
1782 if on_conflict_clause is not None:
1783 text += " ON CONFLICT " + on_conflict_clause
1784
1785 return text
1786
1787 def visit_unique_constraint(self, constraint, **kw):
1788 text = super().visit_unique_constraint(constraint)
1789
1790 on_conflict_clause = constraint.dialect_options["sqlite"][
1791 "on_conflict"
1792 ]
1793 if on_conflict_clause is None and len(constraint.columns) == 1:
1794 col1 = list(constraint)[0]
1795 if isinstance(col1, schema.SchemaItem):
1796 on_conflict_clause = list(constraint)[0].dialect_options[
1797 "sqlite"
1798 ]["on_conflict_unique"]
1799
1800 if on_conflict_clause is not None:
1801 text += " ON CONFLICT " + on_conflict_clause
1802
1803 return text
1804
1805 def visit_check_constraint(self, constraint, **kw):
1806 text = super().visit_check_constraint(constraint)
1807
1808 on_conflict_clause = constraint.dialect_options["sqlite"][
1809 "on_conflict"
1810 ]
1811
1812 if on_conflict_clause is not None:
1813 text += " ON CONFLICT " + on_conflict_clause
1814
1815 return text
1816
1817 def visit_column_check_constraint(self, constraint, **kw):
1818 text = super().visit_column_check_constraint(constraint)
1819
1820 if constraint.dialect_options["sqlite"]["on_conflict"] is not None:
1821 raise exc.CompileError(
1822 "SQLite does not support on conflict clause for "
1823 "column check constraint"
1824 )
1825
1826 return text
1827
1828 def visit_foreign_key_constraint(self, constraint, **kw):
1829 local_table = constraint.elements[0].parent.table
1830 remote_table = constraint.elements[0].column.table
1831
1832 if local_table.schema != remote_table.schema:
1833 return None
1834 else:
1835 return super().visit_foreign_key_constraint(constraint)
1836
1837 def define_constraint_remote_table(self, constraint, table, preparer):
1838 """Format the remote table clause of a CREATE CONSTRAINT clause."""
1839
1840 return preparer.format_table(table, use_schema=False)
1841
1842 def visit_create_index(
1843 self, create, include_schema=False, include_table_schema=True, **kw
1844 ):
1845 index = create.element
1846 self._verify_index_table(index)
1847 preparer = self.preparer
1848 text = "CREATE "
1849 if index.unique:
1850 text += "UNIQUE "
1851
1852 text += "INDEX "
1853
1854 if create.if_not_exists:
1855 text += "IF NOT EXISTS "
1856
1857 text += "%s ON %s (%s)" % (
1858 self._prepared_index_name(index, include_schema=True),
1859 preparer.format_table(index.table, use_schema=False),
1860 ", ".join(
1861 self.sql_compiler.process(
1862 expr, include_table=False, literal_binds=True
1863 )
1864 for expr in index.expressions
1865 ),
1866 )
1867
1868 whereclause = index.dialect_options["sqlite"]["where"]
1869 if whereclause is not None:
1870 where_compiled = self.sql_compiler.process(
1871 whereclause, include_table=False, literal_binds=True
1872 )
1873 text += " WHERE " + where_compiled
1874
1875 return text
1876
1877 def post_create_table(self, table):
1878 table_options = []
1879
1880 if not table.dialect_options["sqlite"]["with_rowid"]:
1881 table_options.append("WITHOUT ROWID")
1882
1883 if table.dialect_options["sqlite"]["strict"]:
1884 table_options.append("STRICT")
1885
1886 if table_options:
1887 return "\n " + ",\n ".join(table_options)
1888 else:
1889 return ""
1890
1891
1892class SQLiteTypeCompiler(compiler.GenericTypeCompiler):
1893 def visit_large_binary(self, type_, **kw):
1894 return self.visit_BLOB(type_)
1895
1896 def visit_DATETIME(self, type_, **kw):
1897 if (
1898 not isinstance(type_, _DateTimeMixin)
1899 or type_.format_is_text_affinity
1900 ):
1901 return super().visit_DATETIME(type_)
1902 else:
1903 return "DATETIME_CHAR"
1904
1905 def visit_DATE(self, type_, **kw):
1906 if (
1907 not isinstance(type_, _DateTimeMixin)
1908 or type_.format_is_text_affinity
1909 ):
1910 return super().visit_DATE(type_)
1911 else:
1912 return "DATE_CHAR"
1913
1914 def visit_TIME(self, type_, **kw):
1915 if (
1916 not isinstance(type_, _DateTimeMixin)
1917 or type_.format_is_text_affinity
1918 ):
1919 return super().visit_TIME(type_)
1920 else:
1921 return "TIME_CHAR"
1922
1923 def visit_JSON(self, type_, **kw):
1924 # note this name provides NUMERIC affinity, not TEXT.
1925 # should not be an issue unless the JSON value consists of a single
1926 # numeric value. JSONTEXT can be used if this case is required.
1927 return "JSON"
1928
1929
1930class SQLiteIdentifierPreparer(compiler.IdentifierPreparer):
1931 reserved_words = {
1932 "add",
1933 "after",
1934 "all",
1935 "alter",
1936 "analyze",
1937 "and",
1938 "as",
1939 "asc",
1940 "attach",
1941 "autoincrement",
1942 "before",
1943 "begin",
1944 "between",
1945 "by",
1946 "cascade",
1947 "case",
1948 "cast",
1949 "check",
1950 "collate",
1951 "column",
1952 "commit",
1953 "conflict",
1954 "constraint",
1955 "create",
1956 "cross",
1957 "current_date",
1958 "current_time",
1959 "current_timestamp",
1960 "database",
1961 "default",
1962 "deferrable",
1963 "deferred",
1964 "delete",
1965 "desc",
1966 "detach",
1967 "distinct",
1968 "drop",
1969 "each",
1970 "else",
1971 "end",
1972 "escape",
1973 "except",
1974 "exclusive",
1975 "exists",
1976 "explain",
1977 "false",
1978 "fail",
1979 "for",
1980 "foreign",
1981 "from",
1982 "full",
1983 "glob",
1984 "group",
1985 "having",
1986 "if",
1987 "ignore",
1988 "immediate",
1989 "in",
1990 "index",
1991 "indexed",
1992 "initially",
1993 "inner",
1994 "insert",
1995 "instead",
1996 "intersect",
1997 "into",
1998 "is",
1999 "isnull",
2000 "join",
2001 "key",
2002 "left",
2003 "like",
2004 "limit",
2005 "match",
2006 "natural",
2007 "not",
2008 "notnull",
2009 "null",
2010 "of",
2011 "offset",
2012 "on",
2013 "or",
2014 "order",
2015 "outer",
2016 "plan",
2017 "pragma",
2018 "primary",
2019 "query",
2020 "raise",
2021 "references",
2022 "reindex",
2023 "rename",
2024 "replace",
2025 "restrict",
2026 "right",
2027 "rollback",
2028 "row",
2029 "select",
2030 "set",
2031 "table",
2032 "temp",
2033 "temporary",
2034 "then",
2035 "to",
2036 "transaction",
2037 "trigger",
2038 "true",
2039 "union",
2040 "unique",
2041 "update",
2042 "using",
2043 "vacuum",
2044 "values",
2045 "view",
2046 "virtual",
2047 "when",
2048 "where",
2049 }
2050
2051
2052class SQLiteExecutionContext(default.DefaultExecutionContext):
2053 @util.memoized_property
2054 def _preserve_raw_colnames(self):
2055 return (
2056 not self.dialect._broken_dotted_colnames
2057 or self.execution_options.get("sqlite_raw_colnames", False)
2058 )
2059
2060 def _translate_colname(self, colname):
2061 # TODO: detect SQLite version 3.10.0 or greater;
2062 # see [ticket:3633]
2063
2064 # adjust for dotted column names. SQLite
2065 # in the case of UNION may store col names as
2066 # "tablename.colname", or if using an attached database,
2067 # "database.tablename.colname", in cursor.description
2068 if not self._preserve_raw_colnames and "." in colname:
2069 return colname.split(".")[-1], colname
2070 else:
2071 return colname, None
2072
2073
2074class SQLiteDialect(default.DefaultDialect):
2075 name = "sqlite"
2076 supports_alter = False
2077
2078 # SQlite supports "DEFAULT VALUES" but *does not* support
2079 # "VALUES (DEFAULT)"
2080 supports_default_values = True
2081 supports_default_metavalue = False
2082
2083 # sqlite issue:
2084 # https://github.com/python/cpython/issues/93421
2085 # note this parameter is no longer used by the ORM or default dialect
2086 # see #9414
2087 supports_sane_rowcount_returning = False
2088
2089 supports_empty_insert = False
2090 supports_cast = True
2091 supports_multivalues_insert = True
2092 use_insertmanyvalues = True
2093 tuple_in_values = True
2094 supports_statement_cache = True
2095 insert_null_pk_still_autoincrements = True
2096 insert_returning = True
2097 update_returning = True
2098 update_returning_multifrom = True
2099 delete_returning = True
2100 update_returning_multifrom = True
2101
2102 supports_default_metavalue = True
2103 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
2104
2105 default_metavalue_token = "NULL"
2106 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
2107 parenthesis."""
2108
2109 default_paramstyle = "qmark"
2110 execution_ctx_cls = SQLiteExecutionContext
2111 statement_compiler = SQLiteCompiler
2112 ddl_compiler = SQLiteDDLCompiler
2113 type_compiler_cls = SQLiteTypeCompiler
2114 preparer = SQLiteIdentifierPreparer
2115 ischema_names = ischema_names
2116 colspecs = colspecs
2117
2118 construct_arguments = [
2119 (
2120 sa_schema.Table,
2121 {
2122 "autoincrement": False,
2123 "with_rowid": True,
2124 "strict": False,
2125 },
2126 ),
2127 (sa_schema.Index, {"where": None}),
2128 (
2129 sa_schema.Column,
2130 {
2131 "on_conflict_primary_key": None,
2132 "on_conflict_not_null": None,
2133 "on_conflict_unique": None,
2134 },
2135 ),
2136 (sa_schema.Constraint, {"on_conflict": None}),
2137 ]
2138
2139 _broken_fk_pragma_quotes = False
2140 _broken_dotted_colnames = False
2141
2142 def __init__(
2143 self,
2144 native_datetime: bool = False,
2145 json_serializer: Optional[Callable[..., Any]] = None,
2146 json_deserializer: Optional[Callable[..., Any]] = None,
2147 **kwargs: Any,
2148 ) -> None:
2149 default.DefaultDialect.__init__(self, **kwargs)
2150
2151 self._json_serializer = json_serializer
2152 self._json_deserializer = json_deserializer
2153
2154 # this flag used by pysqlite dialect, and perhaps others in the
2155 # future, to indicate the driver is handling date/timestamp
2156 # conversions (and perhaps datetime/time as well on some hypothetical
2157 # driver ?)
2158 self.native_datetime = native_datetime
2159
2160 if self.dbapi is not None:
2161 if self.dbapi.sqlite_version_info < (3, 7, 16):
2162 util.warn(
2163 "SQLite version %s is older than 3.7.16, and will not "
2164 "support right nested joins, as are sometimes used in "
2165 "more complex ORM scenarios. SQLAlchemy 1.4 and above "
2166 "no longer tries to rewrite these joins."
2167 % (self.dbapi.sqlite_version_info,)
2168 )
2169
2170 # NOTE: python 3.7 on fedora for me has SQLite 3.34.1. These
2171 # version checks are getting very stale.
2172 self._broken_dotted_colnames = self.dbapi.sqlite_version_info < (
2173 3,
2174 10,
2175 0,
2176 )
2177 self.supports_default_values = self.dbapi.sqlite_version_info >= (
2178 3,
2179 3,
2180 8,
2181 )
2182 self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3)
2183 self.supports_multivalues_insert = (
2184 # https://www.sqlite.org/releaselog/3_7_11.html
2185 self.dbapi.sqlite_version_info
2186 >= (3, 7, 11)
2187 )
2188 # see https://www.sqlalchemy.org/trac/ticket/2568
2189 # as well as https://www.sqlite.org/src/info/600482d161
2190 self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < (
2191 3,
2192 6,
2193 14,
2194 )
2195
2196 if self.dbapi.sqlite_version_info < (3, 35) or util.pypy:
2197 self.update_returning = self.delete_returning = (
2198 self.insert_returning
2199 ) = False
2200
2201 if self.dbapi.sqlite_version_info < (3, 32, 0):
2202 # https://www.sqlite.org/limits.html
2203 self.insertmanyvalues_max_parameters = 999
2204
2205 _isolation_lookup = util.immutabledict(
2206 {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0}
2207 )
2208
2209 def get_isolation_level_values(self, dbapi_connection):
2210 return list(self._isolation_lookup)
2211
2212 def set_isolation_level(
2213 self, dbapi_connection: DBAPIConnection, level: IsolationLevel
2214 ) -> None:
2215 isolation_level = self._isolation_lookup[level]
2216
2217 cursor = dbapi_connection.cursor()
2218 cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}")
2219 cursor.close()
2220
2221 def get_isolation_level(self, dbapi_connection):
2222 cursor = dbapi_connection.cursor()
2223 cursor.execute("PRAGMA read_uncommitted")
2224 res = cursor.fetchone()
2225 if res:
2226 value = res[0]
2227 else:
2228 # https://www.sqlite.org/changes.html#version_3_3_3
2229 # "Optional READ UNCOMMITTED isolation (instead of the
2230 # default isolation level of SERIALIZABLE) and
2231 # table level locking when database connections
2232 # share a common cache.""
2233 # pre-SQLite 3.3.0 default to 0
2234 value = 0
2235 cursor.close()
2236 if value == 0:
2237 return "SERIALIZABLE"
2238 elif value == 1:
2239 return "READ UNCOMMITTED"
2240 else:
2241 assert False, "Unknown isolation level %s" % value
2242
2243 @reflection.cache
2244 def get_schema_names(self, connection, **kw):
2245 s = "PRAGMA database_list"
2246 dl = connection.exec_driver_sql(s)
2247
2248 return [db[1] for db in dl if db[1] != "temp"]
2249
2250 def _format_schema(self, schema, table_name):
2251 if schema is not None:
2252 qschema = self.identifier_preparer.quote_identifier(schema)
2253 name = f"{qschema}.{table_name}"
2254 else:
2255 name = table_name
2256 return name
2257
2258 def _sqlite_main_query(
2259 self,
2260 table: str,
2261 type_: str,
2262 schema: Optional[str],
2263 sqlite_include_internal: bool,
2264 ):
2265 main = self._format_schema(schema, table)
2266 if not sqlite_include_internal:
2267 filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'"
2268 else:
2269 filter_table = ""
2270 query = (
2271 f"SELECT name FROM {main} "
2272 f"WHERE type='{type_}'{filter_table} "
2273 "ORDER BY name"
2274 )
2275 return query
2276
2277 @reflection.cache
2278 def get_table_names(
2279 self, connection, schema=None, sqlite_include_internal=False, **kw
2280 ):
2281 query = self._sqlite_main_query(
2282 "sqlite_master", "table", schema, sqlite_include_internal
2283 )
2284 names = connection.exec_driver_sql(query).scalars().all()
2285 return names
2286
2287 @reflection.cache
2288 def get_temp_table_names(
2289 self, connection, sqlite_include_internal=False, **kw
2290 ):
2291 query = self._sqlite_main_query(
2292 "sqlite_temp_master", "table", None, sqlite_include_internal
2293 )
2294 names = connection.exec_driver_sql(query).scalars().all()
2295 return names
2296
2297 @reflection.cache
2298 def get_temp_view_names(
2299 self, connection, sqlite_include_internal=False, **kw
2300 ):
2301 query = self._sqlite_main_query(
2302 "sqlite_temp_master", "view", None, sqlite_include_internal
2303 )
2304 names = connection.exec_driver_sql(query).scalars().all()
2305 return names
2306
2307 @reflection.cache
2308 def has_table(self, connection, table_name, schema=None, **kw):
2309 self._ensure_has_table_connection(connection)
2310
2311 if schema is not None and schema not in self.get_schema_names(
2312 connection, **kw
2313 ):
2314 return False
2315
2316 info = self._get_table_pragma(
2317 connection, "table_info", table_name, schema=schema
2318 )
2319 return bool(info)
2320
2321 def _get_default_schema_name(self, connection):
2322 return "main"
2323
2324 @reflection.cache
2325 def get_view_names(
2326 self, connection, schema=None, sqlite_include_internal=False, **kw
2327 ):
2328 query = self._sqlite_main_query(
2329 "sqlite_master", "view", schema, sqlite_include_internal
2330 )
2331 names = connection.exec_driver_sql(query).scalars().all()
2332 return names
2333
2334 @reflection.cache
2335 def get_view_definition(self, connection, view_name, schema=None, **kw):
2336 if schema is not None:
2337 qschema = self.identifier_preparer.quote_identifier(schema)
2338 master = f"{qschema}.sqlite_master"
2339 s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % (
2340 master,
2341 )
2342 rs = connection.exec_driver_sql(s, (view_name,))
2343 else:
2344 try:
2345 s = (
2346 "SELECT sql FROM "
2347 " (SELECT * FROM sqlite_master UNION ALL "
2348 " SELECT * FROM sqlite_temp_master) "
2349 "WHERE name = ? "
2350 "AND type='view'"
2351 )
2352 rs = connection.exec_driver_sql(s, (view_name,))
2353 except exc.DBAPIError:
2354 s = (
2355 "SELECT sql FROM sqlite_master WHERE name = ? "
2356 "AND type='view'"
2357 )
2358 rs = connection.exec_driver_sql(s, (view_name,))
2359
2360 result = rs.fetchall()
2361 if result:
2362 return result[0].sql
2363 else:
2364 raise exc.NoSuchTableError(
2365 f"{schema}.{view_name}" if schema else view_name
2366 )
2367
2368 @reflection.cache
2369 def get_columns(self, connection, table_name, schema=None, **kw):
2370 pragma = "table_info"
2371 # computed columns are threaded as hidden, they require table_xinfo
2372 if self.server_version_info >= (3, 31):
2373 pragma = "table_xinfo"
2374 info = self._get_table_pragma(
2375 connection, pragma, table_name, schema=schema
2376 )
2377 columns = []
2378 tablesql = None
2379 for row in info:
2380 name = row[1]
2381 type_ = row[2].upper()
2382 nullable = not row[3]
2383 default = row[4]
2384 primary_key = row[5]
2385 hidden = row[6] if pragma == "table_xinfo" else 0
2386
2387 # hidden has value 0 for normal columns, 1 for hidden columns,
2388 # 2 for computed virtual columns and 3 for computed stored columns
2389 # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b
2390 if hidden == 1:
2391 continue
2392
2393 generated = bool(hidden)
2394 persisted = hidden == 3
2395
2396 if tablesql is None and generated:
2397 tablesql = self._get_table_sql(
2398 connection, table_name, schema, **kw
2399 )
2400 # remove create table
2401 match = re.match(
2402 (
2403 r"create table .*?\((.*)\)"
2404 r"(?:\s*,?\s*(?:WITHOUT\s+ROWID|STRICT))*$"
2405 ),
2406 tablesql.strip(),
2407 re.DOTALL | re.IGNORECASE,
2408 )
2409 assert match, f"create table not found in {tablesql}"
2410 tablesql = match.group(1).strip()
2411
2412 columns.append(
2413 self._get_column_info(
2414 name,
2415 type_,
2416 nullable,
2417 default,
2418 primary_key,
2419 generated,
2420 persisted,
2421 tablesql,
2422 )
2423 )
2424 if columns:
2425 return columns
2426 elif not self.has_table(connection, table_name, schema):
2427 raise exc.NoSuchTableError(
2428 f"{schema}.{table_name}" if schema else table_name
2429 )
2430 else:
2431 return ReflectionDefaults.columns()
2432
2433 def _get_column_info(
2434 self,
2435 name,
2436 type_,
2437 nullable,
2438 default,
2439 primary_key,
2440 generated,
2441 persisted,
2442 tablesql,
2443 ):
2444 if generated:
2445 # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)"
2446 # somehow is "INTEGER GENERATED ALWAYS"
2447 type_ = re.sub("generated", "", type_, flags=re.IGNORECASE)
2448 type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip()
2449
2450 coltype = self._resolve_type_affinity(type_)
2451
2452 if default is not None:
2453 default = str(default)
2454
2455 colspec = {
2456 "name": name,
2457 "type": coltype,
2458 "nullable": nullable,
2459 "default": default,
2460 "primary_key": primary_key,
2461 }
2462 if generated:
2463 sqltext = ""
2464 if tablesql:
2465 pattern = (
2466 r"[^,]*\s+GENERATED\s+ALWAYS\s+AS"
2467 r"\s+\((.*)\)\s*(?:virtual|stored)?"
2468 )
2469 match = re.search(
2470 re.escape(name) + pattern, tablesql, re.IGNORECASE
2471 )
2472 if match:
2473 sqltext = match.group(1)
2474 colspec["computed"] = {"sqltext": sqltext, "persisted": persisted}
2475 return colspec
2476
2477 def _resolve_type_affinity(self, type_):
2478 """Return a data type from a reflected column, using affinity rules.
2479
2480 SQLite's goal for universal compatibility introduces some complexity
2481 during reflection, as a column's defined type might not actually be a
2482 type that SQLite understands - or indeed, my not be defined *at all*.
2483 Internally, SQLite handles this with a 'data type affinity' for each
2484 column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER',
2485 'REAL', or 'NONE' (raw bits). The algorithm that determines this is
2486 listed in https://www.sqlite.org/datatype3.html section 2.1.
2487
2488 This method allows SQLAlchemy to support that algorithm, while still
2489 providing access to smarter reflection utilities by recognizing
2490 column definitions that SQLite only supports through affinity (like
2491 DATE and DOUBLE).
2492
2493 """
2494 match = re.match(r"([\w ]+)(\(.*?\))?", type_)
2495 if match:
2496 coltype = match.group(1)
2497 args = match.group(2)
2498 else:
2499 coltype = ""
2500 args = ""
2501
2502 if coltype in self.ischema_names:
2503 coltype = self.ischema_names[coltype]
2504 elif "INT" in coltype:
2505 coltype = sqltypes.INTEGER
2506 elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype:
2507 coltype = sqltypes.TEXT
2508 elif "BLOB" in coltype or not coltype:
2509 coltype = sqltypes.NullType
2510 elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype:
2511 coltype = sqltypes.REAL
2512 else:
2513 coltype = sqltypes.NUMERIC
2514
2515 if args is not None:
2516 args = re.findall(r"(\d+)", args)
2517 try:
2518 coltype = coltype(*[int(a) for a in args])
2519 except TypeError:
2520 util.warn(
2521 "Could not instantiate type %s with "
2522 "reflected arguments %s; using no arguments."
2523 % (coltype, args)
2524 )
2525 coltype = coltype()
2526 else:
2527 coltype = coltype()
2528
2529 return coltype
2530
2531 @reflection.cache
2532 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
2533 constraint_name = None
2534 table_data = self._get_table_sql(connection, table_name, schema=schema)
2535 if table_data:
2536 PK_PATTERN = r"CONSTRAINT (\w+) PRIMARY KEY"
2537 result = re.search(PK_PATTERN, table_data, re.I)
2538 constraint_name = result.group(1) if result else None
2539
2540 cols = self.get_columns(connection, table_name, schema, **kw)
2541 # consider only pk columns. This also avoids sorting the cached
2542 # value returned by get_columns
2543 cols = [col for col in cols if col.get("primary_key", 0) > 0]
2544 cols.sort(key=lambda col: col.get("primary_key"))
2545 pkeys = [col["name"] for col in cols]
2546
2547 if pkeys:
2548 return {"constrained_columns": pkeys, "name": constraint_name}
2549 else:
2550 return ReflectionDefaults.pk_constraint()
2551
2552 @reflection.cache
2553 def get_foreign_keys(self, connection, table_name, schema=None, **kw):
2554 # sqlite makes this *extremely difficult*.
2555 # First, use the pragma to get the actual FKs.
2556 pragma_fks = self._get_table_pragma(
2557 connection, "foreign_key_list", table_name, schema=schema
2558 )
2559
2560 fks = {}
2561
2562 for row in pragma_fks:
2563 (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4])
2564
2565 if not rcol:
2566 # no referred column, which means it was not named in the
2567 # original DDL. The referred columns of the foreign key
2568 # constraint are therefore the primary key of the referred
2569 # table.
2570 try:
2571 referred_pk = self.get_pk_constraint(
2572 connection, rtbl, schema=schema, **kw
2573 )
2574 referred_columns = referred_pk["constrained_columns"]
2575 except exc.NoSuchTableError:
2576 # ignore not existing parents
2577 referred_columns = []
2578 else:
2579 # note we use this list only if this is the first column
2580 # in the constraint. for subsequent columns we ignore the
2581 # list and append "rcol" if present.
2582 referred_columns = []
2583
2584 if self._broken_fk_pragma_quotes:
2585 rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl)
2586
2587 if numerical_id in fks:
2588 fk = fks[numerical_id]
2589 else:
2590 fk = fks[numerical_id] = {
2591 "name": None,
2592 "constrained_columns": [],
2593 "referred_schema": schema,
2594 "referred_table": rtbl,
2595 "referred_columns": referred_columns,
2596 "options": {},
2597 }
2598 fks[numerical_id] = fk
2599
2600 fk["constrained_columns"].append(lcol)
2601
2602 if rcol:
2603 fk["referred_columns"].append(rcol)
2604
2605 def fk_sig(constrained_columns, referred_table, referred_columns):
2606 return (
2607 tuple(constrained_columns)
2608 + (referred_table,)
2609 + tuple(referred_columns)
2610 )
2611
2612 # then, parse the actual SQL and attempt to find DDL that matches
2613 # the names as well. SQLite saves the DDL in whatever format
2614 # it was typed in as, so need to be liberal here.
2615
2616 keys_by_signature = {
2617 fk_sig(
2618 fk["constrained_columns"],
2619 fk["referred_table"],
2620 fk["referred_columns"],
2621 ): fk
2622 for fk in fks.values()
2623 }
2624
2625 table_data = self._get_table_sql(connection, table_name, schema=schema)
2626
2627 def parse_fks():
2628 if table_data is None:
2629 # system tables, etc.
2630 return
2631
2632 # note that we already have the FKs from PRAGMA above. This whole
2633 # regexp thing is trying to locate additional detail about the
2634 # FKs, namely the name of the constraint and other options.
2635 # so parsing the columns is really about matching it up to what
2636 # we already have.
2637 FK_PATTERN = (
2638 r"(?:CONSTRAINT (\w+) +)?"
2639 r"FOREIGN KEY *\( *(.+?) *\) +"
2640 r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *' # noqa: E501
2641 r"((?:ON (?:DELETE|UPDATE) "
2642 r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)"
2643 r"((?:NOT +)?DEFERRABLE)?"
2644 r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?"
2645 )
2646 for match in re.finditer(FK_PATTERN, table_data, re.I):
2647 (
2648 constraint_name,
2649 constrained_columns,
2650 referred_quoted_name,
2651 referred_name,
2652 referred_columns,
2653 onupdatedelete,
2654 deferrable,
2655 initially,
2656 ) = match.group(1, 2, 3, 4, 5, 6, 7, 8)
2657 constrained_columns = list(
2658 self._find_cols_in_sig(constrained_columns)
2659 )
2660 if not referred_columns:
2661 referred_columns = constrained_columns
2662 else:
2663 referred_columns = list(
2664 self._find_cols_in_sig(referred_columns)
2665 )
2666 referred_name = referred_quoted_name or referred_name
2667 options = {}
2668
2669 for token in re.split(r" *\bON\b *", onupdatedelete.upper()):
2670 if token.startswith("DELETE"):
2671 ondelete = token[6:].strip()
2672 if ondelete and ondelete != "NO ACTION":
2673 options["ondelete"] = ondelete
2674 elif token.startswith("UPDATE"):
2675 onupdate = token[6:].strip()
2676 if onupdate and onupdate != "NO ACTION":
2677 options["onupdate"] = onupdate
2678
2679 if deferrable:
2680 options["deferrable"] = "NOT" not in deferrable.upper()
2681 if initially:
2682 options["initially"] = initially.upper()
2683
2684 yield (
2685 constraint_name,
2686 constrained_columns,
2687 referred_name,
2688 referred_columns,
2689 options,
2690 )
2691
2692 fkeys = []
2693
2694 for (
2695 constraint_name,
2696 constrained_columns,
2697 referred_name,
2698 referred_columns,
2699 options,
2700 ) in parse_fks():
2701 sig = fk_sig(constrained_columns, referred_name, referred_columns)
2702 if sig not in keys_by_signature:
2703 util.warn(
2704 "WARNING: SQL-parsed foreign key constraint "
2705 "'%s' could not be located in PRAGMA "
2706 "foreign_keys for table %s" % (sig, table_name)
2707 )
2708 continue
2709 key = keys_by_signature.pop(sig)
2710 key["name"] = constraint_name
2711 key["options"] = options
2712 fkeys.append(key)
2713 # assume the remainders are the unnamed, inline constraints, just
2714 # use them as is as it's extremely difficult to parse inline
2715 # constraints
2716 fkeys.extend(keys_by_signature.values())
2717 if fkeys:
2718 return fkeys
2719 else:
2720 return ReflectionDefaults.foreign_keys()
2721
2722 def _find_cols_in_sig(self, sig):
2723 for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I):
2724 yield match.group(1) or match.group(2)
2725
2726 @reflection.cache
2727 def get_unique_constraints(
2728 self, connection, table_name, schema=None, **kw
2729 ):
2730 auto_index_by_sig = {}
2731 for idx in self.get_indexes(
2732 connection,
2733 table_name,
2734 schema=schema,
2735 include_auto_indexes=True,
2736 **kw,
2737 ):
2738 if not idx["name"].startswith("sqlite_autoindex"):
2739 continue
2740 sig = tuple(idx["column_names"])
2741 auto_index_by_sig[sig] = idx
2742
2743 table_data = self._get_table_sql(
2744 connection, table_name, schema=schema, **kw
2745 )
2746 unique_constraints = []
2747
2748 def parse_uqs():
2749 if table_data is None:
2750 return
2751 UNIQUE_PATTERN = r'(?:CONSTRAINT "?(.+?)"? +)?UNIQUE *\((.+?)\)'
2752 INLINE_UNIQUE_PATTERN = (
2753 r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?)[\t ]'
2754 r"+[a-z0-9_ ]+?[\t ]+UNIQUE"
2755 )
2756
2757 for match in re.finditer(UNIQUE_PATTERN, table_data, re.I):
2758 name, cols = match.group(1, 2)
2759 yield name, list(self._find_cols_in_sig(cols))
2760
2761 # we need to match inlines as well, as we seek to differentiate
2762 # a UNIQUE constraint from a UNIQUE INDEX, even though these
2763 # are kind of the same thing :)
2764 for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I):
2765 cols = list(
2766 self._find_cols_in_sig(match.group(1) or match.group(2))
2767 )
2768 yield None, cols
2769
2770 for name, cols in parse_uqs():
2771 sig = tuple(cols)
2772 if sig in auto_index_by_sig:
2773 auto_index_by_sig.pop(sig)
2774 parsed_constraint = {"name": name, "column_names": cols}
2775 unique_constraints.append(parsed_constraint)
2776 # NOTE: auto_index_by_sig might not be empty here,
2777 # the PRIMARY KEY may have an entry.
2778 if unique_constraints:
2779 return unique_constraints
2780 else:
2781 return ReflectionDefaults.unique_constraints()
2782
2783 @reflection.cache
2784 def get_check_constraints(self, connection, table_name, schema=None, **kw):
2785 table_data = self._get_table_sql(
2786 connection, table_name, schema=schema, **kw
2787 )
2788
2789 # NOTE NOTE NOTE
2790 # DO NOT CHANGE THIS REGULAR EXPRESSION. There is no known way
2791 # to parse CHECK constraints that contain newlines themselves using
2792 # regular expressions, and the approach here relies upon each
2793 # individual
2794 # CHECK constraint being on a single line by itself. This
2795 # necessarily makes assumptions as to how the CREATE TABLE
2796 # was emitted. A more comprehensive DDL parsing solution would be
2797 # needed to improve upon the current situation. See #11840 for
2798 # background
2799 CHECK_PATTERN = r"(?:CONSTRAINT (.+) +)?CHECK *\( *(.+) *\),? *"
2800 cks = []
2801
2802 for match in re.finditer(CHECK_PATTERN, table_data or "", re.I):
2803
2804 name = match.group(1)
2805
2806 if name:
2807 name = re.sub(r'^"|"$', "", name)
2808
2809 cks.append({"sqltext": match.group(2), "name": name})
2810 cks.sort(key=lambda d: d["name"] or "~") # sort None as last
2811 if cks:
2812 return cks
2813 else:
2814 return ReflectionDefaults.check_constraints()
2815
2816 @reflection.cache
2817 def get_indexes(self, connection, table_name, schema=None, **kw):
2818 pragma_indexes = self._get_table_pragma(
2819 connection, "index_list", table_name, schema=schema
2820 )
2821 indexes = []
2822
2823 # regular expression to extract the filter predicate of a partial
2824 # index. this could fail to extract the predicate correctly on
2825 # indexes created like
2826 # CREATE INDEX i ON t (col || ') where') WHERE col <> ''
2827 # but as this function does not support expression-based indexes
2828 # this case does not occur.
2829 partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE)
2830
2831 if schema:
2832 schema_expr = "%s." % self.identifier_preparer.quote_identifier(
2833 schema
2834 )
2835 else:
2836 schema_expr = ""
2837
2838 include_auto_indexes = kw.pop("include_auto_indexes", False)
2839 for row in pragma_indexes:
2840 # ignore implicit primary key index.
2841 # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html
2842 if not include_auto_indexes and row[1].startswith(
2843 "sqlite_autoindex"
2844 ):
2845 continue
2846 indexes.append(
2847 dict(
2848 name=row[1],
2849 column_names=[],
2850 unique=row[2],
2851 dialect_options={},
2852 )
2853 )
2854
2855 # check partial indexes
2856 if len(row) >= 5 and row[4]:
2857 s = (
2858 "SELECT sql FROM %(schema)ssqlite_master "
2859 "WHERE name = ? "
2860 "AND type = 'index'" % {"schema": schema_expr}
2861 )
2862 rs = connection.exec_driver_sql(s, (row[1],))
2863 index_sql = rs.scalar()
2864 predicate_match = partial_pred_re.search(index_sql)
2865 if predicate_match is None:
2866 # unless the regex is broken this case shouldn't happen
2867 # because we know this is a partial index, so the
2868 # definition sql should match the regex
2869 util.warn(
2870 "Failed to look up filter predicate of "
2871 "partial index %s" % row[1]
2872 )
2873 else:
2874 predicate = predicate_match.group(1)
2875 indexes[-1]["dialect_options"]["sqlite_where"] = text(
2876 predicate
2877 )
2878
2879 # loop thru unique indexes to get the column names.
2880 for idx in list(indexes):
2881 pragma_index = self._get_table_pragma(
2882 connection, "index_info", idx["name"], schema=schema
2883 )
2884
2885 for row in pragma_index:
2886 if row[2] is None:
2887 util.warn(
2888 "Skipped unsupported reflection of "
2889 "expression-based index %s" % idx["name"]
2890 )
2891 indexes.remove(idx)
2892 break
2893 else:
2894 idx["column_names"].append(row[2])
2895
2896 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last
2897 if indexes:
2898 return indexes
2899 elif not self.has_table(connection, table_name, schema):
2900 raise exc.NoSuchTableError(
2901 f"{schema}.{table_name}" if schema else table_name
2902 )
2903 else:
2904 return ReflectionDefaults.indexes()
2905
2906 def _is_sys_table(self, table_name):
2907 return table_name in {
2908 "sqlite_schema",
2909 "sqlite_master",
2910 "sqlite_temp_schema",
2911 "sqlite_temp_master",
2912 }
2913
2914 @reflection.cache
2915 def _get_table_sql(self, connection, table_name, schema=None, **kw):
2916 if schema:
2917 schema_expr = "%s." % (
2918 self.identifier_preparer.quote_identifier(schema)
2919 )
2920 else:
2921 schema_expr = ""
2922 try:
2923 s = (
2924 "SELECT sql FROM "
2925 " (SELECT * FROM %(schema)ssqlite_master UNION ALL "
2926 " SELECT * FROM %(schema)ssqlite_temp_master) "
2927 "WHERE name = ? "
2928 "AND type in ('table', 'view')" % {"schema": schema_expr}
2929 )
2930 rs = connection.exec_driver_sql(s, (table_name,))
2931 except exc.DBAPIError:
2932 s = (
2933 "SELECT sql FROM %(schema)ssqlite_master "
2934 "WHERE name = ? "
2935 "AND type in ('table', 'view')" % {"schema": schema_expr}
2936 )
2937 rs = connection.exec_driver_sql(s, (table_name,))
2938 value = rs.scalar()
2939 if value is None and not self._is_sys_table(table_name):
2940 raise exc.NoSuchTableError(f"{schema_expr}{table_name}")
2941 return value
2942
2943 def _get_table_pragma(self, connection, pragma, table_name, schema=None):
2944 quote = self.identifier_preparer.quote_identifier
2945 if schema is not None:
2946 statements = [f"PRAGMA {quote(schema)}."]
2947 else:
2948 # because PRAGMA looks in all attached databases if no schema
2949 # given, need to specify "main" schema, however since we want
2950 # 'temp' tables in the same namespace as 'main', need to run
2951 # the PRAGMA twice
2952 statements = ["PRAGMA main.", "PRAGMA temp."]
2953
2954 qtable = quote(table_name)
2955 for statement in statements:
2956 statement = f"{statement}{pragma}({qtable})"
2957 cursor = connection.exec_driver_sql(statement)
2958 if not cursor._soft_closed:
2959 # work around SQLite issue whereby cursor.description
2960 # is blank when PRAGMA returns no rows:
2961 # https://www.sqlite.org/cvstrac/tktview?tn=1884
2962 result = cursor.fetchall()
2963 else:
2964 result = []
2965 if result:
2966 return result
2967 else:
2968 return []