1# dialects/sqlite/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10r'''
11.. dialect:: sqlite
12 :name: SQLite
13 :normal_support: 3.12+
14 :best_effort: 3.7.16+
15
16.. _sqlite_datetime:
17
18Date and Time Types
19-------------------
20
21SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does
22not provide out of the box functionality for translating values between Python
23`datetime` objects and a SQLite-supported format. SQLAlchemy's own
24:class:`~sqlalchemy.types.DateTime` and related types provide date formatting
25and parsing functionality when SQLite is used. The implementation classes are
26:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`.
27These types represent dates and times as ISO formatted strings, which also
28nicely support ordering. There's no reliance on typical "libc" internals for
29these functions so historical dates are fully supported.
30
31Ensuring Text affinity
32^^^^^^^^^^^^^^^^^^^^^^
33
34The DDL rendered for these types is the standard ``DATE``, ``TIME``
35and ``DATETIME`` indicators. However, custom storage formats can also be
36applied to these types. When the
37storage format is detected as containing no alpha characters, the DDL for
38these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``,
39so that the column continues to have textual affinity.
40
41.. seealso::
42
43 `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ -
44 in the SQLite documentation
45
46.. _sqlite_autoincrement:
47
48SQLite Auto Incrementing Behavior
49----------------------------------
50
51Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html
52
53Key concepts:
54
55* SQLite has an implicit "auto increment" feature that takes place for any
56 non-composite primary-key column that is specifically created using
57 "INTEGER PRIMARY KEY" for the type + primary key.
58
59* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not**
60 equivalent to the implicit autoincrement feature; this keyword is not
61 recommended for general use. SQLAlchemy does not render this keyword
62 unless a special SQLite-specific directive is used (see below). However,
63 it still requires that the column's type is named "INTEGER".
64
65Using the AUTOINCREMENT Keyword
66^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
67
68To specifically render the AUTOINCREMENT keyword on the primary key column
69when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table
70construct::
71
72 Table(
73 "sometable",
74 metadata,
75 Column("id", Integer, primary_key=True),
76 sqlite_autoincrement=True,
77 )
78
79Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER
80^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
81
82SQLite's typing model is based on naming conventions. Among other things, this
83means that any type name which contains the substring ``"INT"`` will be
84determined to be of "integer affinity". A type named ``"BIGINT"``,
85``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be
86of "integer" affinity. However, **the SQLite autoincrement feature, whether
87implicitly or explicitly enabled, requires that the name of the column's type
88is exactly the string "INTEGER"**. Therefore, if an application uses a type
89like :class:`.BigInteger` for a primary key, on SQLite this type will need to
90be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE
91TABLE`` statement in order for the autoincrement behavior to be available.
92
93One approach to achieve this is to use :class:`.Integer` on SQLite
94only using :meth:`.TypeEngine.with_variant`::
95
96 table = Table(
97 "my_table",
98 metadata,
99 Column(
100 "id",
101 BigInteger().with_variant(Integer, "sqlite"),
102 primary_key=True,
103 ),
104 )
105
106Another is to use a subclass of :class:`.BigInteger` that overrides its DDL
107name to be ``INTEGER`` when compiled against SQLite::
108
109 from sqlalchemy import BigInteger
110 from sqlalchemy.ext.compiler import compiles
111
112
113 class SLBigInteger(BigInteger):
114 pass
115
116
117 @compiles(SLBigInteger, "sqlite")
118 def bi_c(element, compiler, **kw):
119 return "INTEGER"
120
121
122 @compiles(SLBigInteger)
123 def bi_c(element, compiler, **kw):
124 return compiler.visit_BIGINT(element, **kw)
125
126
127 table = Table(
128 "my_table", metadata, Column("id", SLBigInteger(), primary_key=True)
129 )
130
131.. seealso::
132
133 :meth:`.TypeEngine.with_variant`
134
135 :ref:`sqlalchemy.ext.compiler_toplevel`
136
137 `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_
138
139.. _sqlite_transactions:
140
141Transactions with SQLite and the sqlite3 driver
142-----------------------------------------------
143
144As a file-based database, SQLite's approach to transactions differs from
145traditional databases in many ways. Additionally, the ``sqlite3`` driver
146standard with Python (as well as the async version ``aiosqlite`` which builds
147on top of it) has several quirks, workarounds, and API features in the
148area of transaction control, all of which generally need to be addressed when
149constructing a SQLAlchemy application that uses SQLite.
150
151Legacy Transaction Mode with the sqlite3 driver
152^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
153
154The most important aspect of transaction handling with the sqlite3 driver is
155that it defaults (which will continue through Python 3.15 before being
156removed in Python 3.16) to legacy transactional behavior which does
157not strictly follow :pep:`249`. The way in which the driver diverges from the
158PEP is that it does not "begin" a transaction automatically as dictated by
159:pep:`249` except in the case of DML statements, e.g. INSERT, UPDATE, and
160DELETE. Normally, :pep:`249` dictates that a BEGIN must be emitted upon
161the first SQL statement of any kind, so that all subsequent operations will
162be established within a transaction until ``connection.commit()`` has been
163called. The ``sqlite3`` driver, in an effort to be easier to use in
164highly concurrent environments, skips this step for DQL (e.g. SELECT) statements,
165and also skips it for DDL (e.g. CREATE TABLE etc.) statements for more legacy
166reasons. Statements such as SAVEPOINT are also skipped.
167
168In modern versions of the ``sqlite3`` driver as of Python 3.12, this legacy
169mode of operation is referred to as
170`"legacy transaction control" <https://docs.python.org/3/library/sqlite3.html#sqlite3-transaction-control-isolation-level>`_, and is in
171effect by default due to the ``Connection.autocommit`` parameter being set to
172the constant ``sqlite3.LEGACY_TRANSACTION_CONTROL``. Prior to Python 3.12,
173the ``Connection.autocommit`` attribute did not exist.
174
175The implications of legacy transaction mode include:
176
177* **Incorrect support for transactional DDL** - statements like CREATE TABLE, ALTER TABLE,
178 CREATE INDEX etc. will not automatically BEGIN a transaction if one were not
179 started already, leading to the changes by each statement being
180 "autocommitted" immediately unless BEGIN were otherwise emitted first. Very
181 old (pre Python 3.6) versions of SQLite would also force a COMMIT for these
182 operations even if a transaction were present, however this is no longer the
183 case.
184* **SERIALIZABLE behavior not fully functional** - SQLite's transaction isolation
185 behavior is normally consistent with SERIALIZABLE isolation, as it is a file-
186 based system that locks the database file entirely for write operations,
187 preventing COMMIT until all reader transactions (and associated file locks)
188 have completed. However, sqlite3's legacy transaction mode fails to emit BEGIN for SELECT
189 statements, which causes these SELECT statements to no longer be "repeatable",
190 failing one of the consistency guarantees of SERIALIZABLE.
191* **Incorrect behavior for SAVEPOINT** - as the SAVEPOINT statement does not
192 imply a BEGIN, a new SAVEPOINT emitted before a BEGIN will function on its
193 own but fails to participate in the enclosing transaction, meaning a ROLLBACK
194 of the transaction will not rollback elements that were part of a released
195 savepoint.
196
197Legacy transaction mode first existed in order to facilitate working around
198SQLite's file locks. Because SQLite relies upon whole-file locks, it is easy to
199get "database is locked" errors, particularly when newer features like "write
200ahead logging" are disabled. This is a key reason why ``sqlite3``'s legacy
201transaction mode is still the default mode of operation; disabling it will
202produce behavior that is more susceptible to locked database errors. However
203note that **legacy transaction mode will no longer be the default** in a future
204Python version (3.16 as of this writing).
205
206.. _sqlite_enabling_transactions:
207
208Enabling Non-Legacy SQLite Transactional Modes with the sqlite3 or aiosqlite driver
209^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
210
211Current SQLAlchemy support allows either for setting the
212``.Connection.autocommit`` attribute, most directly by using a
213:func:`._sa.create_engine` parameter, or if on an older version of Python where
214the attribute is not available, using event hooks to control the behavior of
215BEGIN.
216
217* **Enabling modern sqlite3 transaction control via the autocommit connect parameter** (Python 3.12 and above)
218
219 To use SQLite in the mode described at `Transaction control via the autocommit attribute <https://docs.python.org/3/library/sqlite3.html#transaction-control-via-the-autocommit-attribute>`_,
220 the most straightforward approach is to set the attribute to its recommended value
221 of ``False`` at the connect level using :paramref:`_sa.create_engine.connect_args``::
222
223 from sqlalchemy import create_engine
224
225 engine = create_engine(
226 "sqlite:///myfile.db", connect_args={"autocommit": False}
227 )
228
229 This parameter is also passed through when using the aiosqlite driver::
230
231 from sqlalchemy.ext.asyncio import create_async_engine
232
233 engine = create_async_engine(
234 "sqlite+aiosqlite:///myfile.db", connect_args={"autocommit": False}
235 )
236
237 The parameter can also be set at the attribute level using the :meth:`.PoolEvents.connect`
238 event hook, however this will only work for sqlite3, as aiosqlite does not yet expose this
239 attribute on its ``Connection`` object::
240
241 from sqlalchemy import create_engine, event
242
243 engine = create_engine("sqlite:///myfile.db")
244
245
246 @event.listens_for(engine, "connect")
247 def do_connect(dbapi_connection, connection_record):
248 # enable autocommit=False mode
249 dbapi_connection.autocommit = False
250
251* **Using SQLAlchemy to emit BEGIN in lieu of SQLite's transaction control** (all Python versions, sqlite3 and aiosqlite)
252
253 For older versions of ``sqlite3`` or for cross-compatiblity with older and
254 newer versions, SQLAlchemy can also take over the job of transaction control.
255 This is achieved by using the :meth:`.ConnectionEvents.begin` hook
256 to emit the "BEGIN" command directly, while also disabling SQLite's control
257 of this command using the :meth:`.PoolEvents.connect` event hook to set the
258 ``Connection.isolation_level`` attribute to ``None``::
259
260
261 from sqlalchemy import create_engine, event
262
263 engine = create_engine("sqlite:///myfile.db")
264
265
266 @event.listens_for(engine, "connect")
267 def do_connect(dbapi_connection, connection_record):
268 # disable sqlite3's emitting of the BEGIN statement entirely.
269 dbapi_connection.isolation_level = None
270
271
272 @event.listens_for(engine, "begin")
273 def do_begin(conn):
274 # emit our own BEGIN. sqlite3 still emits COMMIT/ROLLBACK correctly
275 conn.exec_driver_sql("BEGIN")
276
277 When using the asyncio variant ``aiosqlite``, refer to ``engine.sync_engine``
278 as in the example below::
279
280 from sqlalchemy import create_engine, event
281 from sqlalchemy.ext.asyncio import create_async_engine
282
283 engine = create_async_engine("sqlite+aiosqlite:///myfile.db")
284
285
286 @event.listens_for(engine.sync_engine, "connect")
287 def do_connect(dbapi_connection, connection_record):
288 # disable aiosqlite's emitting of the BEGIN statement entirely.
289 dbapi_connection.isolation_level = None
290
291
292 @event.listens_for(engine.sync_engine, "begin")
293 def do_begin(conn):
294 # emit our own BEGIN. aiosqlite still emits COMMIT/ROLLBACK correctly
295 conn.exec_driver_sql("BEGIN")
296
297.. _sqlite_isolation_level:
298
299Using SQLAlchemy's Driver Level AUTOCOMMIT Feature with SQLite
300^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
301
302SQLAlchemy has a comprehensive database isolation feature with optional
303autocommit support that is introduced in the section :ref:`dbapi_autocommit`.
304
305For the ``sqlite3`` and ``aiosqlite`` drivers, SQLAlchemy only includes
306built-in support for "AUTOCOMMIT". Note that this mode is currently incompatible
307with the non-legacy isolation mode hooks documented in the previous
308section at :ref:`sqlite_enabling_transactions`.
309
310To use the ``sqlite3`` driver with SQLAlchemy driver-level autocommit,
311create an engine setting the :paramref:`_sa.create_engine.isolation_level`
312parameter to "AUTOCOMMIT"::
313
314 eng = create_engine("sqlite:///myfile.db", isolation_level="AUTOCOMMIT")
315
316When using the above mode, any event hooks that set the sqlite3 ``Connection.autocommit``
317parameter away from its default of ``sqlite3.LEGACY_TRANSACTION_CONTROL``
318as well as hooks that emit ``BEGIN`` should be disabled.
319
320Additional Reading for SQLite / sqlite3 transaction control
321^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
322
323Links with important information on SQLite, the sqlite3 driver,
324as well as long historical conversations on how things got to their current state:
325
326* `Isolation in SQLite <https://www.sqlite.org/isolation.html>`_ - on the SQLite website
327* `Transaction control <https://docs.python.org/3/library/sqlite3.html#transaction-control>`_ - describes the sqlite3 autocommit attribute as well
328 as the legacy isolation_level attribute.
329* `sqlite3 SELECT does not BEGIN a transaction, but should according to spec <https://github.com/python/cpython/issues/54133>`_ - imported Python standard library issue on github
330* `sqlite3 module breaks transactions and potentially corrupts data <https://github.com/python/cpython/issues/54949>`_ - imported Python standard library issue on github
331
332
333INSERT/UPDATE/DELETE...RETURNING
334---------------------------------
335
336The SQLite dialect supports SQLite 3.35's ``INSERT|UPDATE|DELETE..RETURNING``
337syntax. ``INSERT..RETURNING`` may be used
338automatically in some cases in order to fetch newly generated identifiers in
339place of the traditional approach of using ``cursor.lastrowid``, however
340``cursor.lastrowid`` is currently still preferred for simple single-statement
341cases for its better performance.
342
343To specify an explicit ``RETURNING`` clause, use the
344:meth:`._UpdateBase.returning` method on a per-statement basis::
345
346 # INSERT..RETURNING
347 result = connection.execute(
348 table.insert().values(name="foo").returning(table.c.col1, table.c.col2)
349 )
350 print(result.all())
351
352 # UPDATE..RETURNING
353 result = connection.execute(
354 table.update()
355 .where(table.c.name == "foo")
356 .values(name="bar")
357 .returning(table.c.col1, table.c.col2)
358 )
359 print(result.all())
360
361 # DELETE..RETURNING
362 result = connection.execute(
363 table.delete()
364 .where(table.c.name == "foo")
365 .returning(table.c.col1, table.c.col2)
366 )
367 print(result.all())
368
369.. versionadded:: 2.0 Added support for SQLite RETURNING
370
371
372.. _sqlite_foreign_keys:
373
374Foreign Key Support
375-------------------
376
377SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables,
378however by default these constraints have no effect on the operation of the
379table.
380
381Constraint checking on SQLite has three prerequisites:
382
383* At least version 3.6.19 of SQLite must be in use
384* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY
385 or SQLITE_OMIT_TRIGGER symbols enabled.
386* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all
387 connections before use -- including the initial call to
388 :meth:`sqlalchemy.schema.MetaData.create_all`.
389
390SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for
391new connections through the usage of events::
392
393 from sqlalchemy.engine import Engine
394 from sqlalchemy import event
395
396
397 @event.listens_for(Engine, "connect")
398 def set_sqlite_pragma(dbapi_connection, connection_record):
399 # the sqlite3 driver will not set PRAGMA foreign_keys
400 # if autocommit=False; set to True temporarily
401 ac = dbapi_connection.autocommit
402 dbapi_connection.autocommit = True
403
404 cursor = dbapi_connection.cursor()
405 cursor.execute("PRAGMA foreign_keys=ON")
406 cursor.close()
407
408 # restore previous autocommit setting
409 dbapi_connection.autocommit = ac
410
411.. warning::
412
413 When SQLite foreign keys are enabled, it is **not possible**
414 to emit CREATE or DROP statements for tables that contain
415 mutually-dependent foreign key constraints;
416 to emit the DDL for these tables requires that ALTER TABLE be used to
417 create or drop these constraints separately, for which SQLite has
418 no support.
419
420.. seealso::
421
422 `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_
423 - on the SQLite web site.
424
425 :ref:`event_toplevel` - SQLAlchemy event API.
426
427 :ref:`use_alter` - more information on SQLAlchemy's facilities for handling
428 mutually-dependent foreign key constraints.
429
430.. _sqlite_on_conflict_ddl:
431
432ON CONFLICT support for constraints
433-----------------------------------
434
435.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for
436 SQLite, which occurs within a CREATE TABLE statement. For "ON CONFLICT" as
437 applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`.
438
439SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied
440to primary key, unique, check, and not null constraints. In DDL, it is
441rendered either within the "CONSTRAINT" clause or within the column definition
442itself depending on the location of the target constraint. To render this
443clause within DDL, the extension parameter ``sqlite_on_conflict`` can be
444specified with a string conflict resolution algorithm within the
445:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`,
446:class:`.CheckConstraint` objects. Within the :class:`_schema.Column` object,
447there
448are individual parameters ``sqlite_on_conflict_not_null``,
449``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each
450correspond to the three types of relevant constraint types that can be
451indicated from a :class:`_schema.Column` object.
452
453.. seealso::
454
455 `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite
456 documentation
457
458.. versionadded:: 1.3
459
460
461The ``sqlite_on_conflict`` parameters accept a string argument which is just
462the resolution name to be chosen, which on SQLite can be one of ROLLBACK,
463ABORT, FAIL, IGNORE, and REPLACE. For example, to add a UNIQUE constraint
464that specifies the IGNORE algorithm::
465
466 some_table = Table(
467 "some_table",
468 metadata,
469 Column("id", Integer, primary_key=True),
470 Column("data", Integer),
471 UniqueConstraint("id", "data", sqlite_on_conflict="IGNORE"),
472 )
473
474The above renders CREATE TABLE DDL as:
475
476.. sourcecode:: sql
477
478 CREATE TABLE some_table (
479 id INTEGER NOT NULL,
480 data INTEGER,
481 PRIMARY KEY (id),
482 UNIQUE (id, data) ON CONFLICT IGNORE
483 )
484
485
486When using the :paramref:`_schema.Column.unique`
487flag to add a UNIQUE constraint
488to a single column, the ``sqlite_on_conflict_unique`` parameter can
489be added to the :class:`_schema.Column` as well, which will be added to the
490UNIQUE constraint in the DDL::
491
492 some_table = Table(
493 "some_table",
494 metadata,
495 Column("id", Integer, primary_key=True),
496 Column(
497 "data", Integer, unique=True, sqlite_on_conflict_unique="IGNORE"
498 ),
499 )
500
501rendering:
502
503.. sourcecode:: sql
504
505 CREATE TABLE some_table (
506 id INTEGER NOT NULL,
507 data INTEGER,
508 PRIMARY KEY (id),
509 UNIQUE (data) ON CONFLICT IGNORE
510 )
511
512To apply the FAIL algorithm for a NOT NULL constraint,
513``sqlite_on_conflict_not_null`` is used::
514
515 some_table = Table(
516 "some_table",
517 metadata,
518 Column("id", Integer, primary_key=True),
519 Column(
520 "data", Integer, nullable=False, sqlite_on_conflict_not_null="FAIL"
521 ),
522 )
523
524this renders the column inline ON CONFLICT phrase:
525
526.. sourcecode:: sql
527
528 CREATE TABLE some_table (
529 id INTEGER NOT NULL,
530 data INTEGER NOT NULL ON CONFLICT FAIL,
531 PRIMARY KEY (id)
532 )
533
534
535Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``::
536
537 some_table = Table(
538 "some_table",
539 metadata,
540 Column(
541 "id",
542 Integer,
543 primary_key=True,
544 sqlite_on_conflict_primary_key="FAIL",
545 ),
546 )
547
548SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict
549resolution algorithm is applied to the constraint itself:
550
551.. sourcecode:: sql
552
553 CREATE TABLE some_table (
554 id INTEGER NOT NULL,
555 PRIMARY KEY (id) ON CONFLICT FAIL
556 )
557
558.. _sqlite_on_conflict_insert:
559
560INSERT...ON CONFLICT (Upsert)
561-----------------------------
562
563.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for
564 SQLite, which occurs within an INSERT statement. For "ON CONFLICT" as
565 applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`.
566
567From version 3.24.0 onwards, SQLite supports "upserts" (update or insert)
568of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT``
569statement. A candidate row will only be inserted if that row does not violate
570any unique or primary key constraints. In the case of a unique constraint violation, a
571secondary action can occur which can be either "DO UPDATE", indicating that
572the data in the target row should be updated, or "DO NOTHING", which indicates
573to silently skip this row.
574
575Conflicts are determined using columns that are part of existing unique
576constraints and indexes. These constraints are identified by stating the
577columns and conditions that comprise the indexes.
578
579SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific
580:func:`_sqlite.insert()` function, which provides
581the generative methods :meth:`_sqlite.Insert.on_conflict_do_update`
582and :meth:`_sqlite.Insert.on_conflict_do_nothing`:
583
584.. sourcecode:: pycon+sql
585
586 >>> from sqlalchemy.dialects.sqlite import insert
587
588 >>> insert_stmt = insert(my_table).values(
589 ... id="some_existing_id", data="inserted value"
590 ... )
591
592 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
593 ... index_elements=["id"], set_=dict(data="updated value")
594 ... )
595
596 >>> print(do_update_stmt)
597 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
598 ON CONFLICT (id) DO UPDATE SET data = ?{stop}
599
600 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"])
601
602 >>> print(do_nothing_stmt)
603 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
604 ON CONFLICT (id) DO NOTHING
605
606.. versionadded:: 1.4
607
608.. seealso::
609
610 `Upsert
611 <https://sqlite.org/lang_UPSERT.html>`_
612 - in the SQLite documentation.
613
614
615Specifying the Target
616^^^^^^^^^^^^^^^^^^^^^
617
618Both methods supply the "target" of the conflict using column inference:
619
620* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument
621 specifies a sequence containing string column names, :class:`_schema.Column`
622 objects, and/or SQL expression elements, which would identify a unique index
623 or unique constraint.
624
625* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements`
626 to infer an index, a partial index can be inferred by also specifying the
627 :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter:
628
629 .. sourcecode:: pycon+sql
630
631 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data")
632
633 >>> do_update_stmt = stmt.on_conflict_do_update(
634 ... index_elements=[my_table.c.user_email],
635 ... index_where=my_table.c.user_email.like("%@gmail.com"),
636 ... set_=dict(data=stmt.excluded.data),
637 ... )
638
639 >>> print(do_update_stmt)
640 {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?)
641 ON CONFLICT (user_email)
642 WHERE user_email LIKE '%@gmail.com'
643 DO UPDATE SET data = excluded.data
644
645The SET Clause
646^^^^^^^^^^^^^^^
647
648``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
649existing row, using any combination of new values as well as values
650from the proposed insertion. These values are specified using the
651:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter. This
652parameter accepts a dictionary which consists of direct values
653for UPDATE:
654
655.. sourcecode:: pycon+sql
656
657 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
658
659 >>> do_update_stmt = stmt.on_conflict_do_update(
660 ... index_elements=["id"], set_=dict(data="updated value")
661 ... )
662
663 >>> print(do_update_stmt)
664 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
665 ON CONFLICT (id) DO UPDATE SET data = ?
666
667.. warning::
668
669 The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take
670 into account Python-side default UPDATE values or generation functions,
671 e.g. those specified using :paramref:`_schema.Column.onupdate`. These
672 values will not be exercised for an ON CONFLICT style of UPDATE, unless
673 they are manually specified in the
674 :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary.
675
676Updating using the Excluded INSERT Values
677^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
678
679In order to refer to the proposed insertion row, the special alias
680:attr:`~.sqlite.Insert.excluded` is available as an attribute on
681the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix
682on a column, that informs the DO UPDATE to update the row with the value that
683would have been inserted had the constraint not failed:
684
685.. sourcecode:: pycon+sql
686
687 >>> stmt = insert(my_table).values(
688 ... id="some_id", data="inserted value", author="jlh"
689 ... )
690
691 >>> do_update_stmt = stmt.on_conflict_do_update(
692 ... index_elements=["id"],
693 ... set_=dict(data="updated value", author=stmt.excluded.author),
694 ... )
695
696 >>> print(do_update_stmt)
697 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
698 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
699
700Additional WHERE Criteria
701^^^^^^^^^^^^^^^^^^^^^^^^^
702
703The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts
704a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where`
705parameter, which will limit those rows which receive an UPDATE:
706
707.. sourcecode:: pycon+sql
708
709 >>> stmt = insert(my_table).values(
710 ... id="some_id", data="inserted value", author="jlh"
711 ... )
712
713 >>> on_update_stmt = stmt.on_conflict_do_update(
714 ... index_elements=["id"],
715 ... set_=dict(data="updated value", author=stmt.excluded.author),
716 ... where=(my_table.c.status == 2),
717 ... )
718 >>> print(on_update_stmt)
719 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
720 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
721 WHERE my_table.status = ?
722
723
724Skipping Rows with DO NOTHING
725^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
726
727``ON CONFLICT`` may be used to skip inserting a row entirely
728if any conflict with a unique constraint occurs; below this is illustrated
729using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method:
730
731.. sourcecode:: pycon+sql
732
733 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
734 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
735 >>> print(stmt)
736 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING
737
738
739If ``DO NOTHING`` is used without specifying any columns or constraint,
740it has the effect of skipping the INSERT for any unique violation which
741occurs:
742
743.. sourcecode:: pycon+sql
744
745 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
746 >>> stmt = stmt.on_conflict_do_nothing()
747 >>> print(stmt)
748 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING
749
750.. _sqlite_type_reflection:
751
752Type Reflection
753---------------
754
755SQLite types are unlike those of most other database backends, in that
756the string name of the type usually does not correspond to a "type" in a
757one-to-one fashion. Instead, SQLite links per-column typing behavior
758to one of five so-called "type affinities" based on a string matching
759pattern for the type.
760
761SQLAlchemy's reflection process, when inspecting types, uses a simple
762lookup table to link the keywords returned to provided SQLAlchemy types.
763This lookup table is present within the SQLite dialect as it is for all
764other dialects. However, the SQLite dialect has a different "fallback"
765routine for when a particular type name is not located in the lookup map;
766it instead implements the SQLite "type affinity" scheme located at
767https://www.sqlite.org/datatype3.html section 2.1.
768
769The provided typemap will make direct associations from an exact string
770name match for the following types:
771
772:class:`_types.BIGINT`, :class:`_types.BLOB`,
773:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`,
774:class:`_types.CHAR`, :class:`_types.DATE`,
775:class:`_types.DATETIME`, :class:`_types.FLOAT`,
776:class:`_types.DECIMAL`, :class:`_types.FLOAT`,
777:class:`_types.INTEGER`, :class:`_types.INTEGER`,
778:class:`_types.NUMERIC`, :class:`_types.REAL`,
779:class:`_types.SMALLINT`, :class:`_types.TEXT`,
780:class:`_types.TIME`, :class:`_types.TIMESTAMP`,
781:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`,
782:class:`_types.NCHAR`
783
784When a type name does not match one of the above types, the "type affinity"
785lookup is used instead:
786
787* :class:`_types.INTEGER` is returned if the type name includes the
788 string ``INT``
789* :class:`_types.TEXT` is returned if the type name includes the
790 string ``CHAR``, ``CLOB`` or ``TEXT``
791* :class:`_types.NullType` is returned if the type name includes the
792 string ``BLOB``
793* :class:`_types.REAL` is returned if the type name includes the string
794 ``REAL``, ``FLOA`` or ``DOUB``.
795* Otherwise, the :class:`_types.NUMERIC` type is used.
796
797.. _sqlite_partial_index:
798
799Partial Indexes
800---------------
801
802A partial index, e.g. one which uses a WHERE clause, can be specified
803with the DDL system using the argument ``sqlite_where``::
804
805 tbl = Table("testtbl", m, Column("data", Integer))
806 idx = Index(
807 "test_idx1",
808 tbl.c.data,
809 sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10),
810 )
811
812The index will be rendered at create time as:
813
814.. sourcecode:: sql
815
816 CREATE INDEX test_idx1 ON testtbl (data)
817 WHERE data > 5 AND data < 10
818
819.. _sqlite_dotted_column_names:
820
821Dotted Column Names
822-------------------
823
824Using table or column names that explicitly have periods in them is
825**not recommended**. While this is generally a bad idea for relational
826databases in general, as the dot is a syntactically significant character,
827the SQLite driver up until version **3.10.0** of SQLite has a bug which
828requires that SQLAlchemy filter out these dots in result sets.
829
830The bug, entirely outside of SQLAlchemy, can be illustrated thusly::
831
832 import sqlite3
833
834 assert sqlite3.sqlite_version_info < (
835 3,
836 10,
837 0,
838 ), "bug is fixed in this version"
839
840 conn = sqlite3.connect(":memory:")
841 cursor = conn.cursor()
842
843 cursor.execute("create table x (a integer, b integer)")
844 cursor.execute("insert into x (a, b) values (1, 1)")
845 cursor.execute("insert into x (a, b) values (2, 2)")
846
847 cursor.execute("select x.a, x.b from x")
848 assert [c[0] for c in cursor.description] == ["a", "b"]
849
850 cursor.execute(
851 """
852 select x.a, x.b from x where a=1
853 union
854 select x.a, x.b from x where a=2
855 """
856 )
857 assert [c[0] for c in cursor.description] == ["a", "b"], [
858 c[0] for c in cursor.description
859 ]
860
861The second assertion fails:
862
863.. sourcecode:: text
864
865 Traceback (most recent call last):
866 File "test.py", line 19, in <module>
867 [c[0] for c in cursor.description]
868 AssertionError: ['x.a', 'x.b']
869
870Where above, the driver incorrectly reports the names of the columns
871including the name of the table, which is entirely inconsistent vs.
872when the UNION is not present.
873
874SQLAlchemy relies upon column names being predictable in how they match
875to the original statement, so the SQLAlchemy dialect has no choice but
876to filter these out::
877
878
879 from sqlalchemy import create_engine
880
881 eng = create_engine("sqlite://")
882 conn = eng.connect()
883
884 conn.exec_driver_sql("create table x (a integer, b integer)")
885 conn.exec_driver_sql("insert into x (a, b) values (1, 1)")
886 conn.exec_driver_sql("insert into x (a, b) values (2, 2)")
887
888 result = conn.exec_driver_sql("select x.a, x.b from x")
889 assert result.keys() == ["a", "b"]
890
891 result = conn.exec_driver_sql(
892 """
893 select x.a, x.b from x where a=1
894 union
895 select x.a, x.b from x where a=2
896 """
897 )
898 assert result.keys() == ["a", "b"]
899
900Note that above, even though SQLAlchemy filters out the dots, *both
901names are still addressable*::
902
903 >>> row = result.first()
904 >>> row["a"]
905 1
906 >>> row["x.a"]
907 1
908 >>> row["b"]
909 1
910 >>> row["x.b"]
911 1
912
913Therefore, the workaround applied by SQLAlchemy only impacts
914:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In
915the very specific case where an application is forced to use column names that
916contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and
917:meth:`.Row.keys()` is required to return these dotted names unmodified,
918the ``sqlite_raw_colnames`` execution option may be provided, either on a
919per-:class:`_engine.Connection` basis::
920
921 result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql(
922 """
923 select x.a, x.b from x where a=1
924 union
925 select x.a, x.b from x where a=2
926 """
927 )
928 assert result.keys() == ["x.a", "x.b"]
929
930or on a per-:class:`_engine.Engine` basis::
931
932 engine = create_engine(
933 "sqlite://", execution_options={"sqlite_raw_colnames": True}
934 )
935
936When using the per-:class:`_engine.Engine` execution option, note that
937**Core and ORM queries that use UNION may not function properly**.
938
939SQLite-specific table options
940-----------------------------
941
942One option for CREATE TABLE is supported directly by the SQLite
943dialect in conjunction with the :class:`_schema.Table` construct:
944
945* ``WITHOUT ROWID``::
946
947 Table("some_table", metadata, ..., sqlite_with_rowid=False)
948
949*
950 ``STRICT``::
951
952 Table("some_table", metadata, ..., sqlite_strict=True)
953
954 .. versionadded:: 2.0.37
955
956.. seealso::
957
958 `SQLite CREATE TABLE options
959 <https://www.sqlite.org/lang_createtable.html>`_
960
961.. _sqlite_include_internal:
962
963Reflecting internal schema tables
964----------------------------------
965
966Reflection methods that return lists of tables will omit so-called
967"SQLite internal schema object" names, which are considered by SQLite
968as any object name that is prefixed with ``sqlite_``. An example of
969such an object is the ``sqlite_sequence`` table that's generated when
970the ``AUTOINCREMENT`` column parameter is used. In order to return
971these objects, the parameter ``sqlite_include_internal=True`` may be
972passed to methods such as :meth:`_schema.MetaData.reflect` or
973:meth:`.Inspector.get_table_names`.
974
975.. versionadded:: 2.0 Added the ``sqlite_include_internal=True`` parameter.
976 Previously, these tables were not ignored by SQLAlchemy reflection
977 methods.
978
979.. note::
980
981 The ``sqlite_include_internal`` parameter does not refer to the
982 "system" tables that are present in schemas such as ``sqlite_master``.
983
984.. seealso::
985
986 `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite
987 documentation.
988
989''' # noqa
990from __future__ import annotations
991
992import datetime
993import numbers
994import re
995from typing import Any
996from typing import Callable
997from typing import Optional
998from typing import TYPE_CHECKING
999
1000from .json import JSON
1001from .json import JSONIndexType
1002from .json import JSONPathType
1003from ... import exc
1004from ... import schema as sa_schema
1005from ... import sql
1006from ... import text
1007from ... import types as sqltypes
1008from ... import util
1009from ...engine import default
1010from ...engine import processors
1011from ...engine import reflection
1012from ...engine.reflection import ReflectionDefaults
1013from ...sql import coercions
1014from ...sql import compiler
1015from ...sql import elements
1016from ...sql import roles
1017from ...sql import schema
1018from ...types import BLOB # noqa
1019from ...types import BOOLEAN # noqa
1020from ...types import CHAR # noqa
1021from ...types import DECIMAL # noqa
1022from ...types import FLOAT # noqa
1023from ...types import INTEGER # noqa
1024from ...types import NUMERIC # noqa
1025from ...types import REAL # noqa
1026from ...types import SMALLINT # noqa
1027from ...types import TEXT # noqa
1028from ...types import TIMESTAMP # noqa
1029from ...types import VARCHAR # noqa
1030
1031if TYPE_CHECKING:
1032 from ...engine.interfaces import DBAPIConnection
1033 from ...engine.interfaces import Dialect
1034 from ...engine.interfaces import IsolationLevel
1035 from ...sql.type_api import _BindProcessorType
1036 from ...sql.type_api import _ResultProcessorType
1037
1038
1039class _SQliteJson(JSON):
1040 def result_processor(self, dialect, coltype):
1041 default_processor = super().result_processor(dialect, coltype)
1042
1043 def process(value):
1044 try:
1045 return default_processor(value)
1046 except TypeError:
1047 if isinstance(value, numbers.Number):
1048 return value
1049 else:
1050 raise
1051
1052 return process
1053
1054
1055class _DateTimeMixin:
1056 _reg = None
1057 _storage_format = None
1058
1059 def __init__(self, storage_format=None, regexp=None, **kw):
1060 super().__init__(**kw)
1061 if regexp is not None:
1062 self._reg = re.compile(regexp)
1063 if storage_format is not None:
1064 self._storage_format = storage_format
1065
1066 @property
1067 def format_is_text_affinity(self):
1068 """return True if the storage format will automatically imply
1069 a TEXT affinity.
1070
1071 If the storage format contains no non-numeric characters,
1072 it will imply a NUMERIC storage format on SQLite; in this case,
1073 the type will generate its DDL as DATE_CHAR, DATETIME_CHAR,
1074 TIME_CHAR.
1075
1076 """
1077 spec = self._storage_format % {
1078 "year": 0,
1079 "month": 0,
1080 "day": 0,
1081 "hour": 0,
1082 "minute": 0,
1083 "second": 0,
1084 "microsecond": 0,
1085 }
1086 return bool(re.search(r"[^0-9]", spec))
1087
1088 def adapt(self, cls, **kw):
1089 if issubclass(cls, _DateTimeMixin):
1090 if self._storage_format:
1091 kw["storage_format"] = self._storage_format
1092 if self._reg:
1093 kw["regexp"] = self._reg
1094 return super().adapt(cls, **kw)
1095
1096 def literal_processor(self, dialect):
1097 bp = self.bind_processor(dialect)
1098
1099 def process(value):
1100 return "'%s'" % bp(value)
1101
1102 return process
1103
1104
1105class DATETIME(_DateTimeMixin, sqltypes.DateTime):
1106 r"""Represent a Python datetime object in SQLite using a string.
1107
1108 The default string storage format is::
1109
1110 "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1111
1112 e.g.:
1113
1114 .. sourcecode:: text
1115
1116 2021-03-15 12:05:57.105542
1117
1118 The incoming storage format is by default parsed using the
1119 Python ``datetime.fromisoformat()`` function.
1120
1121 .. versionchanged:: 2.0 ``datetime.fromisoformat()`` is used for default
1122 datetime string parsing.
1123
1124 The storage format can be customized to some degree using the
1125 ``storage_format`` and ``regexp`` parameters, such as::
1126
1127 import re
1128 from sqlalchemy.dialects.sqlite import DATETIME
1129
1130 dt = DATETIME(
1131 storage_format=(
1132 "%(year)04d/%(month)02d/%(day)02d %(hour)02d:%(minute)02d:%(second)02d"
1133 ),
1134 regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)",
1135 )
1136
1137 :param truncate_microseconds: when ``True`` microseconds will be truncated
1138 from the datetime. Can't be specified together with ``storage_format``
1139 or ``regexp``.
1140
1141 :param storage_format: format string which will be applied to the dict
1142 with keys year, month, day, hour, minute, second, and microsecond.
1143
1144 :param regexp: regular expression which will be applied to incoming result
1145 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming
1146 strings. If the regexp contains named groups, the resulting match dict is
1147 applied to the Python datetime() constructor as keyword arguments.
1148 Otherwise, if positional groups are used, the datetime() constructor
1149 is called with positional arguments via
1150 ``*map(int, match_obj.groups(0))``.
1151
1152 """ # noqa
1153
1154 _storage_format = (
1155 "%(year)04d-%(month)02d-%(day)02d "
1156 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1157 )
1158
1159 def __init__(self, *args, **kwargs):
1160 truncate_microseconds = kwargs.pop("truncate_microseconds", False)
1161 super().__init__(*args, **kwargs)
1162 if truncate_microseconds:
1163 assert "storage_format" not in kwargs, (
1164 "You can specify only "
1165 "one of truncate_microseconds or storage_format."
1166 )
1167 assert "regexp" not in kwargs, (
1168 "You can specify only one of "
1169 "truncate_microseconds or regexp."
1170 )
1171 self._storage_format = (
1172 "%(year)04d-%(month)02d-%(day)02d "
1173 "%(hour)02d:%(minute)02d:%(second)02d"
1174 )
1175
1176 def bind_processor(
1177 self, dialect: Dialect
1178 ) -> Optional[_BindProcessorType[Any]]:
1179 datetime_datetime = datetime.datetime
1180 datetime_date = datetime.date
1181 format_ = self._storage_format
1182
1183 def process(value):
1184 if value is None:
1185 return None
1186 elif isinstance(value, datetime_datetime):
1187 return format_ % {
1188 "year": value.year,
1189 "month": value.month,
1190 "day": value.day,
1191 "hour": value.hour,
1192 "minute": value.minute,
1193 "second": value.second,
1194 "microsecond": value.microsecond,
1195 }
1196 elif isinstance(value, datetime_date):
1197 return format_ % {
1198 "year": value.year,
1199 "month": value.month,
1200 "day": value.day,
1201 "hour": 0,
1202 "minute": 0,
1203 "second": 0,
1204 "microsecond": 0,
1205 }
1206 else:
1207 raise TypeError(
1208 "SQLite DateTime type only accepts Python "
1209 "datetime and date objects as input."
1210 )
1211
1212 return process
1213
1214 def result_processor(
1215 self, dialect: Dialect, coltype: object
1216 ) -> Optional[_ResultProcessorType[Any]]:
1217 if self._reg:
1218 return processors.str_to_datetime_processor_factory(
1219 self._reg, datetime.datetime
1220 )
1221 else:
1222 return processors.str_to_datetime
1223
1224
1225class DATE(_DateTimeMixin, sqltypes.Date):
1226 r"""Represent a Python date object in SQLite using a string.
1227
1228 The default string storage format is::
1229
1230 "%(year)04d-%(month)02d-%(day)02d"
1231
1232 e.g.:
1233
1234 .. sourcecode:: text
1235
1236 2011-03-15
1237
1238 The incoming storage format is by default parsed using the
1239 Python ``date.fromisoformat()`` function.
1240
1241 .. versionchanged:: 2.0 ``date.fromisoformat()`` is used for default
1242 date string parsing.
1243
1244
1245 The storage format can be customized to some degree using the
1246 ``storage_format`` and ``regexp`` parameters, such as::
1247
1248 import re
1249 from sqlalchemy.dialects.sqlite import DATE
1250
1251 d = DATE(
1252 storage_format="%(month)02d/%(day)02d/%(year)04d",
1253 regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)"),
1254 )
1255
1256 :param storage_format: format string which will be applied to the
1257 dict with keys year, month, and day.
1258
1259 :param regexp: regular expression which will be applied to
1260 incoming result rows, replacing the use of ``date.fromisoformat()`` to
1261 parse incoming strings. If the regexp contains named groups, the resulting
1262 match dict is applied to the Python date() constructor as keyword
1263 arguments. Otherwise, if positional groups are used, the date()
1264 constructor is called with positional arguments via
1265 ``*map(int, match_obj.groups(0))``.
1266
1267 """
1268
1269 _storage_format = "%(year)04d-%(month)02d-%(day)02d"
1270
1271 def bind_processor(
1272 self, dialect: Dialect
1273 ) -> Optional[_BindProcessorType[Any]]:
1274 datetime_date = datetime.date
1275 format_ = self._storage_format
1276
1277 def process(value):
1278 if value is None:
1279 return None
1280 elif isinstance(value, datetime_date):
1281 return format_ % {
1282 "year": value.year,
1283 "month": value.month,
1284 "day": value.day,
1285 }
1286 else:
1287 raise TypeError(
1288 "SQLite Date type only accepts Python "
1289 "date objects as input."
1290 )
1291
1292 return process
1293
1294 def result_processor(
1295 self, dialect: Dialect, coltype: object
1296 ) -> Optional[_ResultProcessorType[Any]]:
1297 if self._reg:
1298 return processors.str_to_datetime_processor_factory(
1299 self._reg, datetime.date
1300 )
1301 else:
1302 return processors.str_to_date
1303
1304
1305class TIME(_DateTimeMixin, sqltypes.Time):
1306 r"""Represent a Python time object in SQLite using a string.
1307
1308 The default string storage format is::
1309
1310 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1311
1312 e.g.:
1313
1314 .. sourcecode:: text
1315
1316 12:05:57.10558
1317
1318 The incoming storage format is by default parsed using the
1319 Python ``time.fromisoformat()`` function.
1320
1321 .. versionchanged:: 2.0 ``time.fromisoformat()`` is used for default
1322 time string parsing.
1323
1324 The storage format can be customized to some degree using the
1325 ``storage_format`` and ``regexp`` parameters, such as::
1326
1327 import re
1328 from sqlalchemy.dialects.sqlite import TIME
1329
1330 t = TIME(
1331 storage_format="%(hour)02d-%(minute)02d-%(second)02d-%(microsecond)06d",
1332 regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?"),
1333 )
1334
1335 :param truncate_microseconds: when ``True`` microseconds will be truncated
1336 from the time. Can't be specified together with ``storage_format``
1337 or ``regexp``.
1338
1339 :param storage_format: format string which will be applied to the dict
1340 with keys hour, minute, second, and microsecond.
1341
1342 :param regexp: regular expression which will be applied to incoming result
1343 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming
1344 strings. If the regexp contains named groups, the resulting match dict is
1345 applied to the Python time() constructor as keyword arguments. Otherwise,
1346 if positional groups are used, the time() constructor is called with
1347 positional arguments via ``*map(int, match_obj.groups(0))``.
1348
1349 """
1350
1351 _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1352
1353 def __init__(self, *args, **kwargs):
1354 truncate_microseconds = kwargs.pop("truncate_microseconds", False)
1355 super().__init__(*args, **kwargs)
1356 if truncate_microseconds:
1357 assert "storage_format" not in kwargs, (
1358 "You can specify only "
1359 "one of truncate_microseconds or storage_format."
1360 )
1361 assert "regexp" not in kwargs, (
1362 "You can specify only one of "
1363 "truncate_microseconds or regexp."
1364 )
1365 self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d"
1366
1367 def bind_processor(self, dialect):
1368 datetime_time = datetime.time
1369 format_ = self._storage_format
1370
1371 def process(value):
1372 if value is None:
1373 return None
1374 elif isinstance(value, datetime_time):
1375 return format_ % {
1376 "hour": value.hour,
1377 "minute": value.minute,
1378 "second": value.second,
1379 "microsecond": value.microsecond,
1380 }
1381 else:
1382 raise TypeError(
1383 "SQLite Time type only accepts Python "
1384 "time objects as input."
1385 )
1386
1387 return process
1388
1389 def result_processor(self, dialect, coltype):
1390 if self._reg:
1391 return processors.str_to_datetime_processor_factory(
1392 self._reg, datetime.time
1393 )
1394 else:
1395 return processors.str_to_time
1396
1397
1398colspecs = {
1399 sqltypes.Date: DATE,
1400 sqltypes.DateTime: DATETIME,
1401 sqltypes.JSON: _SQliteJson,
1402 sqltypes.JSON.JSONIndexType: JSONIndexType,
1403 sqltypes.JSON.JSONPathType: JSONPathType,
1404 sqltypes.Time: TIME,
1405}
1406
1407ischema_names = {
1408 "BIGINT": sqltypes.BIGINT,
1409 "BLOB": sqltypes.BLOB,
1410 "BOOL": sqltypes.BOOLEAN,
1411 "BOOLEAN": sqltypes.BOOLEAN,
1412 "CHAR": sqltypes.CHAR,
1413 "DATE": sqltypes.DATE,
1414 "DATE_CHAR": sqltypes.DATE,
1415 "DATETIME": sqltypes.DATETIME,
1416 "DATETIME_CHAR": sqltypes.DATETIME,
1417 "DOUBLE": sqltypes.DOUBLE,
1418 "DECIMAL": sqltypes.DECIMAL,
1419 "FLOAT": sqltypes.FLOAT,
1420 "INT": sqltypes.INTEGER,
1421 "INTEGER": sqltypes.INTEGER,
1422 "JSON": JSON,
1423 "NUMERIC": sqltypes.NUMERIC,
1424 "REAL": sqltypes.REAL,
1425 "SMALLINT": sqltypes.SMALLINT,
1426 "TEXT": sqltypes.TEXT,
1427 "TIME": sqltypes.TIME,
1428 "TIME_CHAR": sqltypes.TIME,
1429 "TIMESTAMP": sqltypes.TIMESTAMP,
1430 "VARCHAR": sqltypes.VARCHAR,
1431 "NVARCHAR": sqltypes.NVARCHAR,
1432 "NCHAR": sqltypes.NCHAR,
1433}
1434
1435
1436class SQLiteCompiler(compiler.SQLCompiler):
1437 extract_map = util.update_copy(
1438 compiler.SQLCompiler.extract_map,
1439 {
1440 "month": "%m",
1441 "day": "%d",
1442 "year": "%Y",
1443 "second": "%S",
1444 "hour": "%H",
1445 "doy": "%j",
1446 "minute": "%M",
1447 "epoch": "%s",
1448 "dow": "%w",
1449 "week": "%W",
1450 },
1451 )
1452
1453 def visit_truediv_binary(self, binary, operator, **kw):
1454 return (
1455 self.process(binary.left, **kw)
1456 + " / "
1457 + "(%s + 0.0)" % self.process(binary.right, **kw)
1458 )
1459
1460 def visit_now_func(self, fn, **kw):
1461 return "CURRENT_TIMESTAMP"
1462
1463 def visit_localtimestamp_func(self, func, **kw):
1464 return "DATETIME(CURRENT_TIMESTAMP, 'localtime')"
1465
1466 def visit_true(self, expr, **kw):
1467 return "1"
1468
1469 def visit_false(self, expr, **kw):
1470 return "0"
1471
1472 def visit_char_length_func(self, fn, **kw):
1473 return "length%s" % self.function_argspec(fn)
1474
1475 def visit_aggregate_strings_func(self, fn, **kw):
1476 return "group_concat%s" % self.function_argspec(fn)
1477
1478 def visit_cast(self, cast, **kwargs):
1479 if self.dialect.supports_cast:
1480 return super().visit_cast(cast, **kwargs)
1481 else:
1482 return self.process(cast.clause, **kwargs)
1483
1484 def visit_extract(self, extract, **kw):
1485 try:
1486 return "CAST(STRFTIME('%s', %s) AS INTEGER)" % (
1487 self.extract_map[extract.field],
1488 self.process(extract.expr, **kw),
1489 )
1490 except KeyError as err:
1491 raise exc.CompileError(
1492 "%s is not a valid extract argument." % extract.field
1493 ) from err
1494
1495 def returning_clause(
1496 self,
1497 stmt,
1498 returning_cols,
1499 *,
1500 populate_result_map,
1501 **kw,
1502 ):
1503 kw["include_table"] = False
1504 return super().returning_clause(
1505 stmt, returning_cols, populate_result_map=populate_result_map, **kw
1506 )
1507
1508 def limit_clause(self, select, **kw):
1509 text = ""
1510 if select._limit_clause is not None:
1511 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
1512 if select._offset_clause is not None:
1513 if select._limit_clause is None:
1514 text += "\n LIMIT " + self.process(sql.literal(-1))
1515 text += " OFFSET " + self.process(select._offset_clause, **kw)
1516 else:
1517 text += " OFFSET " + self.process(sql.literal(0), **kw)
1518 return text
1519
1520 def for_update_clause(self, select, **kw):
1521 # sqlite has no "FOR UPDATE" AFAICT
1522 return ""
1523
1524 def update_from_clause(
1525 self, update_stmt, from_table, extra_froms, from_hints, **kw
1526 ):
1527 kw["asfrom"] = True
1528 return "FROM " + ", ".join(
1529 t._compiler_dispatch(self, fromhints=from_hints, **kw)
1530 for t in extra_froms
1531 )
1532
1533 def visit_is_distinct_from_binary(self, binary, operator, **kw):
1534 return "%s IS NOT %s" % (
1535 self.process(binary.left),
1536 self.process(binary.right),
1537 )
1538
1539 def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
1540 return "%s IS %s" % (
1541 self.process(binary.left),
1542 self.process(binary.right),
1543 )
1544
1545 def visit_json_getitem_op_binary(self, binary, operator, **kw):
1546 if binary.type._type_affinity is sqltypes.JSON:
1547 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
1548 else:
1549 expr = "JSON_EXTRACT(%s, %s)"
1550
1551 return expr % (
1552 self.process(binary.left, **kw),
1553 self.process(binary.right, **kw),
1554 )
1555
1556 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
1557 if binary.type._type_affinity is sqltypes.JSON:
1558 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
1559 else:
1560 expr = "JSON_EXTRACT(%s, %s)"
1561
1562 return expr % (
1563 self.process(binary.left, **kw),
1564 self.process(binary.right, **kw),
1565 )
1566
1567 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
1568 # slightly old SQLite versions don't seem to be able to handle
1569 # the empty set impl
1570 return self.visit_empty_set_expr(type_)
1571
1572 def visit_empty_set_expr(self, element_types, **kw):
1573 return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % (
1574 ", ".join("1" for type_ in element_types or [INTEGER()]),
1575 ", ".join("1" for type_ in element_types or [INTEGER()]),
1576 )
1577
1578 def visit_regexp_match_op_binary(self, binary, operator, **kw):
1579 return self._generate_generic_binary(binary, " REGEXP ", **kw)
1580
1581 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
1582 return self._generate_generic_binary(binary, " NOT REGEXP ", **kw)
1583
1584 def _on_conflict_target(self, clause, **kw):
1585 if clause.inferred_target_elements is not None:
1586 target_text = "(%s)" % ", ".join(
1587 (
1588 self.preparer.quote(c)
1589 if isinstance(c, str)
1590 else self.process(c, include_table=False, use_schema=False)
1591 )
1592 for c in clause.inferred_target_elements
1593 )
1594 if clause.inferred_target_whereclause is not None:
1595 whereclause_kw = dict(kw)
1596 whereclause_kw.update(
1597 include_table=False,
1598 use_schema=False,
1599 literal_execute=True,
1600 )
1601 target_text += " WHERE %s" % self.process(
1602 clause.inferred_target_whereclause,
1603 **whereclause_kw,
1604 )
1605
1606 else:
1607 target_text = ""
1608
1609 return target_text
1610
1611 def visit_on_conflict_do_nothing(self, on_conflict, **kw):
1612 target_text = self._on_conflict_target(on_conflict, **kw)
1613
1614 if target_text:
1615 return "ON CONFLICT %s DO NOTHING" % target_text
1616 else:
1617 return "ON CONFLICT DO NOTHING"
1618
1619 def visit_on_conflict_do_update(self, on_conflict, **kw):
1620 clause = on_conflict
1621
1622 target_text = self._on_conflict_target(on_conflict, **kw)
1623
1624 action_set_ops = []
1625
1626 set_parameters = dict(clause.update_values_to_set)
1627 # create a list of column assignment clauses as tuples
1628
1629 insert_statement = self.stack[-1]["selectable"]
1630 cols = insert_statement.table.c
1631 set_kw = dict(kw)
1632 set_kw.update(use_schema=False)
1633 for c in cols:
1634 col_key = c.key
1635
1636 if col_key in set_parameters:
1637 value = set_parameters.pop(col_key)
1638 elif c in set_parameters:
1639 value = set_parameters.pop(c)
1640 else:
1641 continue
1642
1643 if coercions._is_literal(value):
1644 value = elements.BindParameter(None, value, type_=c.type)
1645
1646 else:
1647 if (
1648 isinstance(value, elements.BindParameter)
1649 and value.type._isnull
1650 ):
1651 value = value._clone()
1652 value.type = c.type
1653 value_text = self.process(
1654 value.self_group(), is_upsert_set=True, **set_kw
1655 )
1656
1657 key_text = self.preparer.quote(c.name)
1658 action_set_ops.append("%s = %s" % (key_text, value_text))
1659
1660 # check for names that don't match columns
1661 if set_parameters:
1662 util.warn(
1663 "Additional column names not matching "
1664 "any column keys in table '%s': %s"
1665 % (
1666 self.current_executable.table.name,
1667 (", ".join("'%s'" % c for c in set_parameters)),
1668 )
1669 )
1670 for k, v in set_parameters.items():
1671 key_text = (
1672 self.preparer.quote(k)
1673 if isinstance(k, str)
1674 else self.process(k, **set_kw)
1675 )
1676 value_text = self.process(
1677 coercions.expect(roles.ExpressionElementRole, v),
1678 is_upsert_set=True,
1679 **set_kw,
1680 )
1681 action_set_ops.append("%s = %s" % (key_text, value_text))
1682
1683 action_text = ", ".join(action_set_ops)
1684 if clause.update_whereclause is not None:
1685 where_kw = dict(kw)
1686 where_kw.update(include_table=True, use_schema=False)
1687 action_text += " WHERE %s" % self.process(
1688 clause.update_whereclause, **where_kw
1689 )
1690
1691 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
1692
1693 def visit_bitwise_xor_op_binary(self, binary, operator, **kw):
1694 # sqlite has no xor. Use "a XOR b" = "(a | b) - (a & b)".
1695 kw["eager_grouping"] = True
1696 or_ = self._generate_generic_binary(binary, " | ", **kw)
1697 and_ = self._generate_generic_binary(binary, " & ", **kw)
1698 return f"({or_} - {and_})"
1699
1700
1701class SQLiteDDLCompiler(compiler.DDLCompiler):
1702 def get_column_specification(self, column, **kwargs):
1703 coltype = self.dialect.type_compiler_instance.process(
1704 column.type, type_expression=column
1705 )
1706 colspec = self.preparer.format_column(column) + " " + coltype
1707 default = self.get_column_default_string(column)
1708 if default is not None:
1709
1710 if not re.match(r"""^\s*[\'\"\(]""", default) and re.match(
1711 r".*\W.*", default
1712 ):
1713 colspec += f" DEFAULT ({default})"
1714 else:
1715 colspec += f" DEFAULT {default}"
1716
1717 if not column.nullable:
1718 colspec += " NOT NULL"
1719
1720 on_conflict_clause = column.dialect_options["sqlite"][
1721 "on_conflict_not_null"
1722 ]
1723 if on_conflict_clause is not None:
1724 colspec += " ON CONFLICT " + on_conflict_clause
1725
1726 if column.primary_key:
1727 if (
1728 column.autoincrement is True
1729 and len(column.table.primary_key.columns) != 1
1730 ):
1731 raise exc.CompileError(
1732 "SQLite does not support autoincrement for "
1733 "composite primary keys"
1734 )
1735
1736 if (
1737 column.table.dialect_options["sqlite"]["autoincrement"]
1738 and len(column.table.primary_key.columns) == 1
1739 and issubclass(column.type._type_affinity, sqltypes.Integer)
1740 and not column.foreign_keys
1741 ):
1742 colspec += " PRIMARY KEY"
1743
1744 on_conflict_clause = column.dialect_options["sqlite"][
1745 "on_conflict_primary_key"
1746 ]
1747 if on_conflict_clause is not None:
1748 colspec += " ON CONFLICT " + on_conflict_clause
1749
1750 colspec += " AUTOINCREMENT"
1751
1752 if column.computed is not None:
1753 colspec += " " + self.process(column.computed)
1754
1755 return colspec
1756
1757 def visit_primary_key_constraint(self, constraint, **kw):
1758 # for columns with sqlite_autoincrement=True,
1759 # the PRIMARY KEY constraint can only be inline
1760 # with the column itself.
1761 if len(constraint.columns) == 1:
1762 c = list(constraint)[0]
1763 if (
1764 c.primary_key
1765 and c.table.dialect_options["sqlite"]["autoincrement"]
1766 and issubclass(c.type._type_affinity, sqltypes.Integer)
1767 and not c.foreign_keys
1768 ):
1769 return None
1770
1771 text = super().visit_primary_key_constraint(constraint)
1772
1773 on_conflict_clause = constraint.dialect_options["sqlite"][
1774 "on_conflict"
1775 ]
1776 if on_conflict_clause is None and len(constraint.columns) == 1:
1777 on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][
1778 "on_conflict_primary_key"
1779 ]
1780
1781 if on_conflict_clause is not None:
1782 text += " ON CONFLICT " + on_conflict_clause
1783
1784 return text
1785
1786 def visit_unique_constraint(self, constraint, **kw):
1787 text = super().visit_unique_constraint(constraint)
1788
1789 on_conflict_clause = constraint.dialect_options["sqlite"][
1790 "on_conflict"
1791 ]
1792 if on_conflict_clause is None and len(constraint.columns) == 1:
1793 col1 = list(constraint)[0]
1794 if isinstance(col1, schema.SchemaItem):
1795 on_conflict_clause = list(constraint)[0].dialect_options[
1796 "sqlite"
1797 ]["on_conflict_unique"]
1798
1799 if on_conflict_clause is not None:
1800 text += " ON CONFLICT " + on_conflict_clause
1801
1802 return text
1803
1804 def visit_check_constraint(self, constraint, **kw):
1805 text = super().visit_check_constraint(constraint)
1806
1807 on_conflict_clause = constraint.dialect_options["sqlite"][
1808 "on_conflict"
1809 ]
1810
1811 if on_conflict_clause is not None:
1812 text += " ON CONFLICT " + on_conflict_clause
1813
1814 return text
1815
1816 def visit_column_check_constraint(self, constraint, **kw):
1817 text = super().visit_column_check_constraint(constraint)
1818
1819 if constraint.dialect_options["sqlite"]["on_conflict"] is not None:
1820 raise exc.CompileError(
1821 "SQLite does not support on conflict clause for "
1822 "column check constraint"
1823 )
1824
1825 return text
1826
1827 def visit_foreign_key_constraint(self, constraint, **kw):
1828 local_table = constraint.elements[0].parent.table
1829 remote_table = constraint.elements[0].column.table
1830
1831 if local_table.schema != remote_table.schema:
1832 return None
1833 else:
1834 return super().visit_foreign_key_constraint(constraint)
1835
1836 def define_constraint_remote_table(self, constraint, table, preparer):
1837 """Format the remote table clause of a CREATE CONSTRAINT clause."""
1838
1839 return preparer.format_table(table, use_schema=False)
1840
1841 def visit_create_index(
1842 self, create, include_schema=False, include_table_schema=True, **kw
1843 ):
1844 index = create.element
1845 self._verify_index_table(index)
1846 preparer = self.preparer
1847 text = "CREATE "
1848 if index.unique:
1849 text += "UNIQUE "
1850
1851 text += "INDEX "
1852
1853 if create.if_not_exists:
1854 text += "IF NOT EXISTS "
1855
1856 text += "%s ON %s (%s)" % (
1857 self._prepared_index_name(index, include_schema=True),
1858 preparer.format_table(index.table, use_schema=False),
1859 ", ".join(
1860 self.sql_compiler.process(
1861 expr, include_table=False, literal_binds=True
1862 )
1863 for expr in index.expressions
1864 ),
1865 )
1866
1867 whereclause = index.dialect_options["sqlite"]["where"]
1868 if whereclause is not None:
1869 where_compiled = self.sql_compiler.process(
1870 whereclause, include_table=False, literal_binds=True
1871 )
1872 text += " WHERE " + where_compiled
1873
1874 return text
1875
1876 def post_create_table(self, table):
1877 table_options = []
1878
1879 if not table.dialect_options["sqlite"]["with_rowid"]:
1880 table_options.append("WITHOUT ROWID")
1881
1882 if table.dialect_options["sqlite"]["strict"]:
1883 table_options.append("STRICT")
1884
1885 if table_options:
1886 return "\n " + ",\n ".join(table_options)
1887 else:
1888 return ""
1889
1890
1891class SQLiteTypeCompiler(compiler.GenericTypeCompiler):
1892 def visit_large_binary(self, type_, **kw):
1893 return self.visit_BLOB(type_)
1894
1895 def visit_DATETIME(self, type_, **kw):
1896 if (
1897 not isinstance(type_, _DateTimeMixin)
1898 or type_.format_is_text_affinity
1899 ):
1900 return super().visit_DATETIME(type_)
1901 else:
1902 return "DATETIME_CHAR"
1903
1904 def visit_DATE(self, type_, **kw):
1905 if (
1906 not isinstance(type_, _DateTimeMixin)
1907 or type_.format_is_text_affinity
1908 ):
1909 return super().visit_DATE(type_)
1910 else:
1911 return "DATE_CHAR"
1912
1913 def visit_TIME(self, type_, **kw):
1914 if (
1915 not isinstance(type_, _DateTimeMixin)
1916 or type_.format_is_text_affinity
1917 ):
1918 return super().visit_TIME(type_)
1919 else:
1920 return "TIME_CHAR"
1921
1922 def visit_JSON(self, type_, **kw):
1923 # note this name provides NUMERIC affinity, not TEXT.
1924 # should not be an issue unless the JSON value consists of a single
1925 # numeric value. JSONTEXT can be used if this case is required.
1926 return "JSON"
1927
1928
1929class SQLiteIdentifierPreparer(compiler.IdentifierPreparer):
1930 reserved_words = {
1931 "add",
1932 "after",
1933 "all",
1934 "alter",
1935 "analyze",
1936 "and",
1937 "as",
1938 "asc",
1939 "attach",
1940 "autoincrement",
1941 "before",
1942 "begin",
1943 "between",
1944 "by",
1945 "cascade",
1946 "case",
1947 "cast",
1948 "check",
1949 "collate",
1950 "column",
1951 "commit",
1952 "conflict",
1953 "constraint",
1954 "create",
1955 "cross",
1956 "current_date",
1957 "current_time",
1958 "current_timestamp",
1959 "database",
1960 "default",
1961 "deferrable",
1962 "deferred",
1963 "delete",
1964 "desc",
1965 "detach",
1966 "distinct",
1967 "drop",
1968 "each",
1969 "else",
1970 "end",
1971 "escape",
1972 "except",
1973 "exclusive",
1974 "exists",
1975 "explain",
1976 "false",
1977 "fail",
1978 "for",
1979 "foreign",
1980 "from",
1981 "full",
1982 "glob",
1983 "group",
1984 "having",
1985 "if",
1986 "ignore",
1987 "immediate",
1988 "in",
1989 "index",
1990 "indexed",
1991 "initially",
1992 "inner",
1993 "insert",
1994 "instead",
1995 "intersect",
1996 "into",
1997 "is",
1998 "isnull",
1999 "join",
2000 "key",
2001 "left",
2002 "like",
2003 "limit",
2004 "match",
2005 "natural",
2006 "not",
2007 "notnull",
2008 "null",
2009 "of",
2010 "offset",
2011 "on",
2012 "or",
2013 "order",
2014 "outer",
2015 "plan",
2016 "pragma",
2017 "primary",
2018 "query",
2019 "raise",
2020 "references",
2021 "reindex",
2022 "rename",
2023 "replace",
2024 "restrict",
2025 "right",
2026 "rollback",
2027 "row",
2028 "select",
2029 "set",
2030 "table",
2031 "temp",
2032 "temporary",
2033 "then",
2034 "to",
2035 "transaction",
2036 "trigger",
2037 "true",
2038 "union",
2039 "unique",
2040 "update",
2041 "using",
2042 "vacuum",
2043 "values",
2044 "view",
2045 "virtual",
2046 "when",
2047 "where",
2048 }
2049
2050
2051class SQLiteExecutionContext(default.DefaultExecutionContext):
2052 @util.memoized_property
2053 def _preserve_raw_colnames(self):
2054 return (
2055 not self.dialect._broken_dotted_colnames
2056 or self.execution_options.get("sqlite_raw_colnames", False)
2057 )
2058
2059 def _translate_colname(self, colname):
2060 # TODO: detect SQLite version 3.10.0 or greater;
2061 # see [ticket:3633]
2062
2063 # adjust for dotted column names. SQLite
2064 # in the case of UNION may store col names as
2065 # "tablename.colname", or if using an attached database,
2066 # "database.tablename.colname", in cursor.description
2067 if not self._preserve_raw_colnames and "." in colname:
2068 return colname.split(".")[-1], colname
2069 else:
2070 return colname, None
2071
2072
2073class SQLiteDialect(default.DefaultDialect):
2074 name = "sqlite"
2075 supports_alter = False
2076
2077 # SQlite supports "DEFAULT VALUES" but *does not* support
2078 # "VALUES (DEFAULT)"
2079 supports_default_values = True
2080 supports_default_metavalue = False
2081
2082 # sqlite issue:
2083 # https://github.com/python/cpython/issues/93421
2084 # note this parameter is no longer used by the ORM or default dialect
2085 # see #9414
2086 supports_sane_rowcount_returning = False
2087
2088 supports_empty_insert = False
2089 supports_cast = True
2090 supports_multivalues_insert = True
2091 use_insertmanyvalues = True
2092 tuple_in_values = True
2093 supports_statement_cache = True
2094 insert_null_pk_still_autoincrements = True
2095 insert_returning = True
2096 update_returning = True
2097 update_returning_multifrom = True
2098 delete_returning = True
2099 update_returning_multifrom = True
2100
2101 supports_default_metavalue = True
2102 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
2103
2104 default_metavalue_token = "NULL"
2105 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
2106 parenthesis."""
2107
2108 default_paramstyle = "qmark"
2109 execution_ctx_cls = SQLiteExecutionContext
2110 statement_compiler = SQLiteCompiler
2111 ddl_compiler = SQLiteDDLCompiler
2112 type_compiler_cls = SQLiteTypeCompiler
2113 preparer = SQLiteIdentifierPreparer
2114 ischema_names = ischema_names
2115 colspecs = colspecs
2116
2117 construct_arguments = [
2118 (
2119 sa_schema.Table,
2120 {
2121 "autoincrement": False,
2122 "with_rowid": True,
2123 "strict": False,
2124 },
2125 ),
2126 (sa_schema.Index, {"where": None}),
2127 (
2128 sa_schema.Column,
2129 {
2130 "on_conflict_primary_key": None,
2131 "on_conflict_not_null": None,
2132 "on_conflict_unique": None,
2133 },
2134 ),
2135 (sa_schema.Constraint, {"on_conflict": None}),
2136 ]
2137
2138 _broken_fk_pragma_quotes = False
2139 _broken_dotted_colnames = False
2140
2141 @util.deprecated_params(
2142 _json_serializer=(
2143 "1.3.7",
2144 "The _json_serializer argument to the SQLite dialect has "
2145 "been renamed to the correct name of json_serializer. The old "
2146 "argument name will be removed in a future release.",
2147 ),
2148 _json_deserializer=(
2149 "1.3.7",
2150 "The _json_deserializer argument to the SQLite dialect has "
2151 "been renamed to the correct name of json_deserializer. The old "
2152 "argument name will be removed in a future release.",
2153 ),
2154 )
2155 def __init__(
2156 self,
2157 native_datetime: bool = False,
2158 json_serializer: Optional[Callable[..., Any]] = None,
2159 json_deserializer: Optional[Callable[..., Any]] = None,
2160 _json_serializer: Optional[Callable[..., Any]] = None,
2161 _json_deserializer: Optional[Callable[..., Any]] = None,
2162 **kwargs: Any,
2163 ) -> None:
2164 default.DefaultDialect.__init__(self, **kwargs)
2165
2166 if _json_serializer:
2167 json_serializer = _json_serializer
2168 if _json_deserializer:
2169 json_deserializer = _json_deserializer
2170 self._json_serializer = json_serializer
2171 self._json_deserializer = json_deserializer
2172
2173 # this flag used by pysqlite dialect, and perhaps others in the
2174 # future, to indicate the driver is handling date/timestamp
2175 # conversions (and perhaps datetime/time as well on some hypothetical
2176 # driver ?)
2177 self.native_datetime = native_datetime
2178
2179 if self.dbapi is not None:
2180 if self.dbapi.sqlite_version_info < (3, 7, 16):
2181 util.warn(
2182 "SQLite version %s is older than 3.7.16, and will not "
2183 "support right nested joins, as are sometimes used in "
2184 "more complex ORM scenarios. SQLAlchemy 1.4 and above "
2185 "no longer tries to rewrite these joins."
2186 % (self.dbapi.sqlite_version_info,)
2187 )
2188
2189 # NOTE: python 3.7 on fedora for me has SQLite 3.34.1. These
2190 # version checks are getting very stale.
2191 self._broken_dotted_colnames = self.dbapi.sqlite_version_info < (
2192 3,
2193 10,
2194 0,
2195 )
2196 self.supports_default_values = self.dbapi.sqlite_version_info >= (
2197 3,
2198 3,
2199 8,
2200 )
2201 self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3)
2202 self.supports_multivalues_insert = (
2203 # https://www.sqlite.org/releaselog/3_7_11.html
2204 self.dbapi.sqlite_version_info
2205 >= (3, 7, 11)
2206 )
2207 # see https://www.sqlalchemy.org/trac/ticket/2568
2208 # as well as https://www.sqlite.org/src/info/600482d161
2209 self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < (
2210 3,
2211 6,
2212 14,
2213 )
2214
2215 if self.dbapi.sqlite_version_info < (3, 35) or util.pypy:
2216 self.update_returning = self.delete_returning = (
2217 self.insert_returning
2218 ) = False
2219
2220 if self.dbapi.sqlite_version_info < (3, 32, 0):
2221 # https://www.sqlite.org/limits.html
2222 self.insertmanyvalues_max_parameters = 999
2223
2224 _isolation_lookup = util.immutabledict(
2225 {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0}
2226 )
2227
2228 def get_isolation_level_values(self, dbapi_connection):
2229 return list(self._isolation_lookup)
2230
2231 def set_isolation_level(
2232 self, dbapi_connection: DBAPIConnection, level: IsolationLevel
2233 ) -> None:
2234 isolation_level = self._isolation_lookup[level]
2235
2236 cursor = dbapi_connection.cursor()
2237 cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}")
2238 cursor.close()
2239
2240 def get_isolation_level(self, dbapi_connection):
2241 cursor = dbapi_connection.cursor()
2242 cursor.execute("PRAGMA read_uncommitted")
2243 res = cursor.fetchone()
2244 if res:
2245 value = res[0]
2246 else:
2247 # https://www.sqlite.org/changes.html#version_3_3_3
2248 # "Optional READ UNCOMMITTED isolation (instead of the
2249 # default isolation level of SERIALIZABLE) and
2250 # table level locking when database connections
2251 # share a common cache.""
2252 # pre-SQLite 3.3.0 default to 0
2253 value = 0
2254 cursor.close()
2255 if value == 0:
2256 return "SERIALIZABLE"
2257 elif value == 1:
2258 return "READ UNCOMMITTED"
2259 else:
2260 assert False, "Unknown isolation level %s" % value
2261
2262 @reflection.cache
2263 def get_schema_names(self, connection, **kw):
2264 s = "PRAGMA database_list"
2265 dl = connection.exec_driver_sql(s)
2266
2267 return [db[1] for db in dl if db[1] != "temp"]
2268
2269 def _format_schema(self, schema, table_name):
2270 if schema is not None:
2271 qschema = self.identifier_preparer.quote_identifier(schema)
2272 name = f"{qschema}.{table_name}"
2273 else:
2274 name = table_name
2275 return name
2276
2277 def _sqlite_main_query(
2278 self,
2279 table: str,
2280 type_: str,
2281 schema: Optional[str],
2282 sqlite_include_internal: bool,
2283 ):
2284 main = self._format_schema(schema, table)
2285 if not sqlite_include_internal:
2286 filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'"
2287 else:
2288 filter_table = ""
2289 query = (
2290 f"SELECT name FROM {main} "
2291 f"WHERE type='{type_}'{filter_table} "
2292 "ORDER BY name"
2293 )
2294 return query
2295
2296 @reflection.cache
2297 def get_table_names(
2298 self, connection, schema=None, sqlite_include_internal=False, **kw
2299 ):
2300 query = self._sqlite_main_query(
2301 "sqlite_master", "table", schema, sqlite_include_internal
2302 )
2303 names = connection.exec_driver_sql(query).scalars().all()
2304 return names
2305
2306 @reflection.cache
2307 def get_temp_table_names(
2308 self, connection, sqlite_include_internal=False, **kw
2309 ):
2310 query = self._sqlite_main_query(
2311 "sqlite_temp_master", "table", None, sqlite_include_internal
2312 )
2313 names = connection.exec_driver_sql(query).scalars().all()
2314 return names
2315
2316 @reflection.cache
2317 def get_temp_view_names(
2318 self, connection, sqlite_include_internal=False, **kw
2319 ):
2320 query = self._sqlite_main_query(
2321 "sqlite_temp_master", "view", None, sqlite_include_internal
2322 )
2323 names = connection.exec_driver_sql(query).scalars().all()
2324 return names
2325
2326 @reflection.cache
2327 def has_table(self, connection, table_name, schema=None, **kw):
2328 self._ensure_has_table_connection(connection)
2329
2330 if schema is not None and schema not in self.get_schema_names(
2331 connection, **kw
2332 ):
2333 return False
2334
2335 info = self._get_table_pragma(
2336 connection, "table_info", table_name, schema=schema
2337 )
2338 return bool(info)
2339
2340 def _get_default_schema_name(self, connection):
2341 return "main"
2342
2343 @reflection.cache
2344 def get_view_names(
2345 self, connection, schema=None, sqlite_include_internal=False, **kw
2346 ):
2347 query = self._sqlite_main_query(
2348 "sqlite_master", "view", schema, sqlite_include_internal
2349 )
2350 names = connection.exec_driver_sql(query).scalars().all()
2351 return names
2352
2353 @reflection.cache
2354 def get_view_definition(self, connection, view_name, schema=None, **kw):
2355 if schema is not None:
2356 qschema = self.identifier_preparer.quote_identifier(schema)
2357 master = f"{qschema}.sqlite_master"
2358 s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % (
2359 master,
2360 )
2361 rs = connection.exec_driver_sql(s, (view_name,))
2362 else:
2363 try:
2364 s = (
2365 "SELECT sql FROM "
2366 " (SELECT * FROM sqlite_master UNION ALL "
2367 " SELECT * FROM sqlite_temp_master) "
2368 "WHERE name = ? "
2369 "AND type='view'"
2370 )
2371 rs = connection.exec_driver_sql(s, (view_name,))
2372 except exc.DBAPIError:
2373 s = (
2374 "SELECT sql FROM sqlite_master WHERE name = ? "
2375 "AND type='view'"
2376 )
2377 rs = connection.exec_driver_sql(s, (view_name,))
2378
2379 result = rs.fetchall()
2380 if result:
2381 return result[0].sql
2382 else:
2383 raise exc.NoSuchTableError(
2384 f"{schema}.{view_name}" if schema else view_name
2385 )
2386
2387 @reflection.cache
2388 def get_columns(self, connection, table_name, schema=None, **kw):
2389 pragma = "table_info"
2390 # computed columns are threaded as hidden, they require table_xinfo
2391 if self.server_version_info >= (3, 31):
2392 pragma = "table_xinfo"
2393 info = self._get_table_pragma(
2394 connection, pragma, table_name, schema=schema
2395 )
2396 columns = []
2397 tablesql = None
2398 for row in info:
2399 name = row[1]
2400 type_ = row[2].upper()
2401 nullable = not row[3]
2402 default = row[4]
2403 primary_key = row[5]
2404 hidden = row[6] if pragma == "table_xinfo" else 0
2405
2406 # hidden has value 0 for normal columns, 1 for hidden columns,
2407 # 2 for computed virtual columns and 3 for computed stored columns
2408 # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b
2409 if hidden == 1:
2410 continue
2411
2412 generated = bool(hidden)
2413 persisted = hidden == 3
2414
2415 if tablesql is None and generated:
2416 tablesql = self._get_table_sql(
2417 connection, table_name, schema, **kw
2418 )
2419 # remove create table
2420 match = re.match(
2421 (
2422 r"create table .*?\((.*)\)"
2423 r"(?:\s*,?\s*(?:WITHOUT\s+ROWID|STRICT))*$"
2424 ),
2425 tablesql.strip(),
2426 re.DOTALL | re.IGNORECASE,
2427 )
2428 assert match, f"create table not found in {tablesql}"
2429 tablesql = match.group(1).strip()
2430
2431 columns.append(
2432 self._get_column_info(
2433 name,
2434 type_,
2435 nullable,
2436 default,
2437 primary_key,
2438 generated,
2439 persisted,
2440 tablesql,
2441 )
2442 )
2443 if columns:
2444 return columns
2445 elif not self.has_table(connection, table_name, schema):
2446 raise exc.NoSuchTableError(
2447 f"{schema}.{table_name}" if schema else table_name
2448 )
2449 else:
2450 return ReflectionDefaults.columns()
2451
2452 def _get_column_info(
2453 self,
2454 name,
2455 type_,
2456 nullable,
2457 default,
2458 primary_key,
2459 generated,
2460 persisted,
2461 tablesql,
2462 ):
2463 if generated:
2464 # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)"
2465 # somehow is "INTEGER GENERATED ALWAYS"
2466 type_ = re.sub("generated", "", type_, flags=re.IGNORECASE)
2467 type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip()
2468
2469 coltype = self._resolve_type_affinity(type_)
2470
2471 if default is not None:
2472 default = str(default)
2473
2474 colspec = {
2475 "name": name,
2476 "type": coltype,
2477 "nullable": nullable,
2478 "default": default,
2479 "primary_key": primary_key,
2480 }
2481 if generated:
2482 sqltext = ""
2483 if tablesql:
2484 pattern = (
2485 r"[^,]*\s+GENERATED\s+ALWAYS\s+AS"
2486 r"\s+\((.*)\)\s*(?:virtual|stored)?"
2487 )
2488 match = re.search(
2489 re.escape(name) + pattern, tablesql, re.IGNORECASE
2490 )
2491 if match:
2492 sqltext = match.group(1)
2493 colspec["computed"] = {"sqltext": sqltext, "persisted": persisted}
2494 return colspec
2495
2496 def _resolve_type_affinity(self, type_):
2497 """Return a data type from a reflected column, using affinity rules.
2498
2499 SQLite's goal for universal compatibility introduces some complexity
2500 during reflection, as a column's defined type might not actually be a
2501 type that SQLite understands - or indeed, my not be defined *at all*.
2502 Internally, SQLite handles this with a 'data type affinity' for each
2503 column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER',
2504 'REAL', or 'NONE' (raw bits). The algorithm that determines this is
2505 listed in https://www.sqlite.org/datatype3.html section 2.1.
2506
2507 This method allows SQLAlchemy to support that algorithm, while still
2508 providing access to smarter reflection utilities by recognizing
2509 column definitions that SQLite only supports through affinity (like
2510 DATE and DOUBLE).
2511
2512 """
2513 match = re.match(r"([\w ]+)(\(.*?\))?", type_)
2514 if match:
2515 coltype = match.group(1)
2516 args = match.group(2)
2517 else:
2518 coltype = ""
2519 args = ""
2520
2521 if coltype in self.ischema_names:
2522 coltype = self.ischema_names[coltype]
2523 elif "INT" in coltype:
2524 coltype = sqltypes.INTEGER
2525 elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype:
2526 coltype = sqltypes.TEXT
2527 elif "BLOB" in coltype or not coltype:
2528 coltype = sqltypes.NullType
2529 elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype:
2530 coltype = sqltypes.REAL
2531 else:
2532 coltype = sqltypes.NUMERIC
2533
2534 if args is not None:
2535 args = re.findall(r"(\d+)", args)
2536 try:
2537 coltype = coltype(*[int(a) for a in args])
2538 except TypeError:
2539 util.warn(
2540 "Could not instantiate type %s with "
2541 "reflected arguments %s; using no arguments."
2542 % (coltype, args)
2543 )
2544 coltype = coltype()
2545 else:
2546 coltype = coltype()
2547
2548 return coltype
2549
2550 @reflection.cache
2551 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
2552 constraint_name = None
2553 table_data = self._get_table_sql(connection, table_name, schema=schema)
2554 if table_data:
2555 PK_PATTERN = r'CONSTRAINT +(?:"(.+?)"|(\w+)) +PRIMARY KEY'
2556 result = re.search(PK_PATTERN, table_data, re.I)
2557 if result:
2558 constraint_name = result.group(1) or result.group(2)
2559 else:
2560 constraint_name = None
2561
2562 cols = self.get_columns(connection, table_name, schema, **kw)
2563 # consider only pk columns. This also avoids sorting the cached
2564 # value returned by get_columns
2565 cols = [col for col in cols if col.get("primary_key", 0) > 0]
2566 cols.sort(key=lambda col: col.get("primary_key"))
2567 pkeys = [col["name"] for col in cols]
2568
2569 if pkeys:
2570 return {"constrained_columns": pkeys, "name": constraint_name}
2571 else:
2572 return ReflectionDefaults.pk_constraint()
2573
2574 @reflection.cache
2575 def get_foreign_keys(self, connection, table_name, schema=None, **kw):
2576 # sqlite makes this *extremely difficult*.
2577 # First, use the pragma to get the actual FKs.
2578 pragma_fks = self._get_table_pragma(
2579 connection, "foreign_key_list", table_name, schema=schema
2580 )
2581
2582 fks = {}
2583
2584 for row in pragma_fks:
2585 (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4])
2586
2587 if not rcol:
2588 # no referred column, which means it was not named in the
2589 # original DDL. The referred columns of the foreign key
2590 # constraint are therefore the primary key of the referred
2591 # table.
2592 try:
2593 referred_pk = self.get_pk_constraint(
2594 connection, rtbl, schema=schema, **kw
2595 )
2596 referred_columns = referred_pk["constrained_columns"]
2597 except exc.NoSuchTableError:
2598 # ignore not existing parents
2599 referred_columns = []
2600 else:
2601 # note we use this list only if this is the first column
2602 # in the constraint. for subsequent columns we ignore the
2603 # list and append "rcol" if present.
2604 referred_columns = []
2605
2606 if self._broken_fk_pragma_quotes:
2607 rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl)
2608
2609 if numerical_id in fks:
2610 fk = fks[numerical_id]
2611 else:
2612 fk = fks[numerical_id] = {
2613 "name": None,
2614 "constrained_columns": [],
2615 "referred_schema": schema,
2616 "referred_table": rtbl,
2617 "referred_columns": referred_columns,
2618 "options": {},
2619 }
2620 fks[numerical_id] = fk
2621
2622 fk["constrained_columns"].append(lcol)
2623
2624 if rcol:
2625 fk["referred_columns"].append(rcol)
2626
2627 def fk_sig(constrained_columns, referred_table, referred_columns):
2628 return (
2629 tuple(constrained_columns)
2630 + (referred_table,)
2631 + tuple(referred_columns)
2632 )
2633
2634 # then, parse the actual SQL and attempt to find DDL that matches
2635 # the names as well. SQLite saves the DDL in whatever format
2636 # it was typed in as, so need to be liberal here.
2637
2638 keys_by_signature = {
2639 fk_sig(
2640 fk["constrained_columns"],
2641 fk["referred_table"],
2642 fk["referred_columns"],
2643 ): fk
2644 for fk in fks.values()
2645 }
2646
2647 table_data = self._get_table_sql(connection, table_name, schema=schema)
2648
2649 def parse_fks():
2650 if table_data is None:
2651 # system tables, etc.
2652 return
2653
2654 # note that we already have the FKs from PRAGMA above. This whole
2655 # regexp thing is trying to locate additional detail about the
2656 # FKs, namely the name of the constraint and other options.
2657 # so parsing the columns is really about matching it up to what
2658 # we already have.
2659 FK_PATTERN = (
2660 r'(?:CONSTRAINT +(?:"(.+?)"|(\w+)) +)?'
2661 r"FOREIGN KEY *\( *(.+?) *\) +"
2662 r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *' # noqa: E501
2663 r"((?:ON (?:DELETE|UPDATE) "
2664 r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)"
2665 r"((?:NOT +)?DEFERRABLE)?"
2666 r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?"
2667 )
2668 for match in re.finditer(FK_PATTERN, table_data, re.I):
2669 (
2670 constraint_quoted_name,
2671 constraint_name,
2672 constrained_columns,
2673 referred_quoted_name,
2674 referred_name,
2675 referred_columns,
2676 onupdatedelete,
2677 deferrable,
2678 initially,
2679 ) = match.group(1, 2, 3, 4, 5, 6, 7, 8, 9)
2680 constraint_name = constraint_quoted_name or constraint_name
2681 constrained_columns = list(
2682 self._find_cols_in_sig(constrained_columns)
2683 )
2684 if not referred_columns:
2685 referred_columns = constrained_columns
2686 else:
2687 referred_columns = list(
2688 self._find_cols_in_sig(referred_columns)
2689 )
2690 referred_name = referred_quoted_name or referred_name
2691 options = {}
2692
2693 for token in re.split(r" *\bON\b *", onupdatedelete.upper()):
2694 if token.startswith("DELETE"):
2695 ondelete = token[6:].strip()
2696 if ondelete and ondelete != "NO ACTION":
2697 options["ondelete"] = ondelete
2698 elif token.startswith("UPDATE"):
2699 onupdate = token[6:].strip()
2700 if onupdate and onupdate != "NO ACTION":
2701 options["onupdate"] = onupdate
2702
2703 if deferrable:
2704 options["deferrable"] = "NOT" not in deferrable.upper()
2705 if initially:
2706 options["initially"] = initially.upper()
2707
2708 yield (
2709 constraint_name,
2710 constrained_columns,
2711 referred_name,
2712 referred_columns,
2713 options,
2714 )
2715
2716 fkeys = []
2717
2718 for (
2719 constraint_name,
2720 constrained_columns,
2721 referred_name,
2722 referred_columns,
2723 options,
2724 ) in parse_fks():
2725 sig = fk_sig(constrained_columns, referred_name, referred_columns)
2726 if sig not in keys_by_signature:
2727 util.warn(
2728 "WARNING: SQL-parsed foreign key constraint "
2729 "'%s' could not be located in PRAGMA "
2730 "foreign_keys for table %s" % (sig, table_name)
2731 )
2732 continue
2733 key = keys_by_signature.pop(sig)
2734 key["name"] = constraint_name
2735 key["options"] = options
2736 fkeys.append(key)
2737 # assume the remainders are the unnamed, inline constraints, just
2738 # use them as is as it's extremely difficult to parse inline
2739 # constraints
2740 fkeys.extend(keys_by_signature.values())
2741 if fkeys:
2742 return fkeys
2743 else:
2744 return ReflectionDefaults.foreign_keys()
2745
2746 def _find_cols_in_sig(self, sig):
2747 for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I):
2748 yield match.group(1) or match.group(2)
2749
2750 @reflection.cache
2751 def get_unique_constraints(
2752 self, connection, table_name, schema=None, **kw
2753 ):
2754 auto_index_by_sig = {}
2755 for idx in self.get_indexes(
2756 connection,
2757 table_name,
2758 schema=schema,
2759 include_auto_indexes=True,
2760 **kw,
2761 ):
2762 if not idx["name"].startswith("sqlite_autoindex"):
2763 continue
2764 sig = tuple(idx["column_names"])
2765 auto_index_by_sig[sig] = idx
2766
2767 table_data = self._get_table_sql(
2768 connection, table_name, schema=schema, **kw
2769 )
2770 unique_constraints = []
2771
2772 def parse_uqs():
2773 if table_data is None:
2774 return
2775 UNIQUE_PATTERN = (
2776 r'(?:CONSTRAINT +(?:"(.+?)"|(\w+)) +)?UNIQUE *\((.+?)\)'
2777 )
2778 INLINE_UNIQUE_PATTERN = (
2779 r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?)[\t ]'
2780 r"+[a-z0-9_ ]+?[\t ]+UNIQUE"
2781 )
2782
2783 for match in re.finditer(UNIQUE_PATTERN, table_data, re.I):
2784 quoted_name, unquoted_name, cols = match.group(1, 2, 3)
2785 name = quoted_name or unquoted_name
2786 yield name, list(self._find_cols_in_sig(cols))
2787
2788 # we need to match inlines as well, as we seek to differentiate
2789 # a UNIQUE constraint from a UNIQUE INDEX, even though these
2790 # are kind of the same thing :)
2791 for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I):
2792 cols = list(
2793 self._find_cols_in_sig(match.group(1) or match.group(2))
2794 )
2795 yield None, cols
2796
2797 for name, cols in parse_uqs():
2798 sig = tuple(cols)
2799 if sig in auto_index_by_sig:
2800 auto_index_by_sig.pop(sig)
2801 parsed_constraint = {"name": name, "column_names": cols}
2802 unique_constraints.append(parsed_constraint)
2803 # NOTE: auto_index_by_sig might not be empty here,
2804 # the PRIMARY KEY may have an entry.
2805 if unique_constraints:
2806 return unique_constraints
2807 else:
2808 return ReflectionDefaults.unique_constraints()
2809
2810 @reflection.cache
2811 def get_check_constraints(self, connection, table_name, schema=None, **kw):
2812 table_data = self._get_table_sql(
2813 connection, table_name, schema=schema, **kw
2814 )
2815
2816 # Extract CHECK constraints by properly handling balanced parentheses
2817 # and avoiding false matches when CHECK/CONSTRAINT appear in table
2818 # names. See #12924 for context.
2819 #
2820 # SQLite supports 4 identifier quote styles (see
2821 # sqlite.org/lang_keywords.html):
2822 # - Double quotes "..." (standard SQL)
2823 # - Brackets [...] (MS Access/SQL Server compatibility)
2824 # - Backticks `...` (MySQL compatibility)
2825 # - Single quotes '...' (SQLite extension)
2826 #
2827 # NOTE: there is not currently a way to parse CHECK constraints that
2828 # contain newlines as the approach here relies upon each individual
2829 # CHECK constraint being on a single line by itself. This necessarily
2830 # makes assumptions as to how the CREATE TABLE was emitted.
2831 CHECK_PATTERN = re.compile(
2832 r"""
2833 (?<![A-Za-z0-9_]) # Negative lookbehind: ensure CHECK is not
2834 # part of an identifier (e.g., table name
2835 # like "tableCHECK")
2836
2837 (?: # Optional CONSTRAINT clause
2838 CONSTRAINT\s+
2839 ( # Group 1: Constraint name (quoted or unquoted)
2840 "(?:[^"]|"")+" # Double-quoted: "name" or "na""me"
2841 |'(?:[^']|'')+' # Single-quoted: 'name' or 'na''me'
2842 |\[(?:[^\]]|\]\])+\] # Bracket-quoted: [name] or [na]]me]
2843 |`(?:[^`]|``)+` # Backtick-quoted: `name` or `na``me`
2844 |\S+ # Unquoted: simple_name
2845 )
2846 \s+
2847 )?
2848
2849 CHECK\s*\( # CHECK keyword followed by opening paren
2850 """,
2851 re.VERBOSE | re.IGNORECASE,
2852 )
2853 cks = []
2854
2855 for match in re.finditer(CHECK_PATTERN, table_data or ""):
2856 constraint_name = match.group(1)
2857
2858 if constraint_name:
2859 # Remove surrounding quotes if present
2860 # Double quotes: "name" -> name
2861 # Single quotes: 'name' -> name
2862 # Brackets: [name] -> name
2863 # Backticks: `name` -> name
2864 constraint_name = re.sub(
2865 r'^(["\'`])(.+)\1$|^\[(.+)\]$',
2866 lambda m: m.group(2) or m.group(3),
2867 constraint_name,
2868 flags=re.DOTALL,
2869 )
2870
2871 # Find the matching closing parenthesis by counting balanced parens
2872 # Must track string context to ignore parens inside string literals
2873 start = match.end() # Position after 'CHECK ('
2874 paren_count = 1
2875 in_single_quote = False
2876 in_double_quote = False
2877
2878 for pos, char in enumerate(table_data[start:], start):
2879 # Track string literal context
2880 if char == "'" and not in_double_quote:
2881 in_single_quote = not in_single_quote
2882 elif char == '"' and not in_single_quote:
2883 in_double_quote = not in_double_quote
2884 # Only count parens when not inside a string literal
2885 elif not in_single_quote and not in_double_quote:
2886 if char == "(":
2887 paren_count += 1
2888 elif char == ")":
2889 paren_count -= 1
2890 if paren_count == 0:
2891 # Successfully found matching closing parenthesis
2892 sqltext = table_data[start:pos].strip()
2893 cks.append(
2894 {"sqltext": sqltext, "name": constraint_name}
2895 )
2896 break
2897
2898 cks.sort(key=lambda d: d["name"] or "~") # sort None as last
2899 if cks:
2900 return cks
2901 else:
2902 return ReflectionDefaults.check_constraints()
2903
2904 @reflection.cache
2905 def get_indexes(self, connection, table_name, schema=None, **kw):
2906 pragma_indexes = self._get_table_pragma(
2907 connection, "index_list", table_name, schema=schema
2908 )
2909 indexes = []
2910
2911 # regular expression to extract the filter predicate of a partial
2912 # index. this could fail to extract the predicate correctly on
2913 # indexes created like
2914 # CREATE INDEX i ON t (col || ') where') WHERE col <> ''
2915 # but as this function does not support expression-based indexes
2916 # this case does not occur.
2917 partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE)
2918
2919 if schema:
2920 schema_expr = "%s." % self.identifier_preparer.quote_identifier(
2921 schema
2922 )
2923 else:
2924 schema_expr = ""
2925
2926 include_auto_indexes = kw.pop("include_auto_indexes", False)
2927 for row in pragma_indexes:
2928 # ignore implicit primary key index.
2929 # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html
2930 if not include_auto_indexes and row[1].startswith(
2931 "sqlite_autoindex"
2932 ):
2933 continue
2934 indexes.append(
2935 dict(
2936 name=row[1],
2937 column_names=[],
2938 unique=row[2],
2939 dialect_options={},
2940 )
2941 )
2942
2943 # check partial indexes
2944 if len(row) >= 5 and row[4]:
2945 s = (
2946 "SELECT sql FROM %(schema)ssqlite_master "
2947 "WHERE name = ? "
2948 "AND type = 'index'" % {"schema": schema_expr}
2949 )
2950 rs = connection.exec_driver_sql(s, (row[1],))
2951 index_sql = rs.scalar()
2952 predicate_match = partial_pred_re.search(index_sql)
2953 if predicate_match is None:
2954 # unless the regex is broken this case shouldn't happen
2955 # because we know this is a partial index, so the
2956 # definition sql should match the regex
2957 util.warn(
2958 "Failed to look up filter predicate of "
2959 "partial index %s" % row[1]
2960 )
2961 else:
2962 predicate = predicate_match.group(1)
2963 indexes[-1]["dialect_options"]["sqlite_where"] = text(
2964 predicate
2965 )
2966
2967 # loop thru unique indexes to get the column names.
2968 for idx in list(indexes):
2969 pragma_index = self._get_table_pragma(
2970 connection, "index_info", idx["name"], schema=schema
2971 )
2972
2973 for row in pragma_index:
2974 if row[2] is None:
2975 util.warn(
2976 "Skipped unsupported reflection of "
2977 "expression-based index %s" % idx["name"]
2978 )
2979 indexes.remove(idx)
2980 break
2981 else:
2982 idx["column_names"].append(row[2])
2983
2984 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last
2985 if indexes:
2986 return indexes
2987 elif not self.has_table(connection, table_name, schema):
2988 raise exc.NoSuchTableError(
2989 f"{schema}.{table_name}" if schema else table_name
2990 )
2991 else:
2992 return ReflectionDefaults.indexes()
2993
2994 def _is_sys_table(self, table_name):
2995 return table_name in {
2996 "sqlite_schema",
2997 "sqlite_master",
2998 "sqlite_temp_schema",
2999 "sqlite_temp_master",
3000 }
3001
3002 @reflection.cache
3003 def _get_table_sql(self, connection, table_name, schema=None, **kw):
3004 if schema:
3005 schema_expr = "%s." % (
3006 self.identifier_preparer.quote_identifier(schema)
3007 )
3008 else:
3009 schema_expr = ""
3010 try:
3011 s = (
3012 "SELECT sql FROM "
3013 " (SELECT * FROM %(schema)ssqlite_master UNION ALL "
3014 " SELECT * FROM %(schema)ssqlite_temp_master) "
3015 "WHERE name = ? "
3016 "AND type in ('table', 'view')" % {"schema": schema_expr}
3017 )
3018 rs = connection.exec_driver_sql(s, (table_name,))
3019 except exc.DBAPIError:
3020 s = (
3021 "SELECT sql FROM %(schema)ssqlite_master "
3022 "WHERE name = ? "
3023 "AND type in ('table', 'view')" % {"schema": schema_expr}
3024 )
3025 rs = connection.exec_driver_sql(s, (table_name,))
3026 value = rs.scalar()
3027 if value is None and not self._is_sys_table(table_name):
3028 raise exc.NoSuchTableError(f"{schema_expr}{table_name}")
3029 return value
3030
3031 def _get_table_pragma(self, connection, pragma, table_name, schema=None):
3032 quote = self.identifier_preparer.quote_identifier
3033 if schema is not None:
3034 statements = [f"PRAGMA {quote(schema)}."]
3035 else:
3036 # because PRAGMA looks in all attached databases if no schema
3037 # given, need to specify "main" schema, however since we want
3038 # 'temp' tables in the same namespace as 'main', need to run
3039 # the PRAGMA twice
3040 statements = ["PRAGMA main.", "PRAGMA temp."]
3041
3042 qtable = quote(table_name)
3043 for statement in statements:
3044 statement = f"{statement}{pragma}({qtable})"
3045 cursor = connection.exec_driver_sql(statement)
3046 if not cursor._soft_closed:
3047 # work around SQLite issue whereby cursor.description
3048 # is blank when PRAGMA returns no rows:
3049 # https://www.sqlite.org/cvstrac/tktview?tn=1884
3050 result = cursor.fetchall()
3051 else:
3052 result = []
3053 if result:
3054 return result
3055 else:
3056 return []