1# dialects/sqlite/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10r'''
11.. dialect:: sqlite
12 :name: SQLite
13 :normal_support: 3.12+
14 :best_effort: 3.7.16+
15
16.. _sqlite_datetime:
17
18Date and Time Types
19-------------------
20
21SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does
22not provide out of the box functionality for translating values between Python
23`datetime` objects and a SQLite-supported format. SQLAlchemy's own
24:class:`~sqlalchemy.types.DateTime` and related types provide date formatting
25and parsing functionality when SQLite is used. The implementation classes are
26:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`.
27These types represent dates and times as ISO formatted strings, which also
28nicely support ordering. There's no reliance on typical "libc" internals for
29these functions so historical dates are fully supported.
30
31Ensuring Text affinity
32^^^^^^^^^^^^^^^^^^^^^^
33
34The DDL rendered for these types is the standard ``DATE``, ``TIME``
35and ``DATETIME`` indicators. However, custom storage formats can also be
36applied to these types. When the
37storage format is detected as containing no alpha characters, the DDL for
38these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``,
39so that the column continues to have textual affinity.
40
41.. seealso::
42
43 `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ -
44 in the SQLite documentation
45
46.. _sqlite_autoincrement:
47
48SQLite Auto Incrementing Behavior
49----------------------------------
50
51Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html
52
53Key concepts:
54
55* SQLite has an implicit "auto increment" feature that takes place for any
56 non-composite primary-key column that is specifically created using
57 "INTEGER PRIMARY KEY" for the type + primary key.
58
59* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not**
60 equivalent to the implicit autoincrement feature; this keyword is not
61 recommended for general use. SQLAlchemy does not render this keyword
62 unless a special SQLite-specific directive is used (see below). However,
63 it still requires that the column's type is named "INTEGER".
64
65Using the AUTOINCREMENT Keyword
66^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
67
68To specifically render the AUTOINCREMENT keyword on the primary key column
69when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table
70construct::
71
72 Table(
73 "sometable",
74 metadata,
75 Column("id", Integer, primary_key=True),
76 sqlite_autoincrement=True,
77 )
78
79Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER
80^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
81
82SQLite's typing model is based on naming conventions. Among other things, this
83means that any type name which contains the substring ``"INT"`` will be
84determined to be of "integer affinity". A type named ``"BIGINT"``,
85``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be
86of "integer" affinity. However, **the SQLite autoincrement feature, whether
87implicitly or explicitly enabled, requires that the name of the column's type
88is exactly the string "INTEGER"**. Therefore, if an application uses a type
89like :class:`.BigInteger` for a primary key, on SQLite this type will need to
90be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE
91TABLE`` statement in order for the autoincrement behavior to be available.
92
93One approach to achieve this is to use :class:`.Integer` on SQLite
94only using :meth:`.TypeEngine.with_variant`::
95
96 table = Table(
97 "my_table",
98 metadata,
99 Column(
100 "id",
101 BigInteger().with_variant(Integer, "sqlite"),
102 primary_key=True,
103 ),
104 )
105
106Another is to use a subclass of :class:`.BigInteger` that overrides its DDL
107name to be ``INTEGER`` when compiled against SQLite::
108
109 from sqlalchemy import BigInteger
110 from sqlalchemy.ext.compiler import compiles
111
112
113 class SLBigInteger(BigInteger):
114 pass
115
116
117 @compiles(SLBigInteger, "sqlite")
118 def bi_c(element, compiler, **kw):
119 return "INTEGER"
120
121
122 @compiles(SLBigInteger)
123 def bi_c(element, compiler, **kw):
124 return compiler.visit_BIGINT(element, **kw)
125
126
127 table = Table(
128 "my_table", metadata, Column("id", SLBigInteger(), primary_key=True)
129 )
130
131.. seealso::
132
133 :meth:`.TypeEngine.with_variant`
134
135 :ref:`sqlalchemy.ext.compiler_toplevel`
136
137 `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_
138
139.. _sqlite_transactions:
140
141Transactions with SQLite and the sqlite3 driver
142-----------------------------------------------
143
144As a file-based database, SQLite's approach to transactions differs from
145traditional databases in many ways. Additionally, the ``sqlite3`` driver
146standard with Python (as well as the async version ``aiosqlite`` which builds
147on top of it) has several quirks, workarounds, and API features in the
148area of transaction control, all of which generally need to be addressed when
149constructing a SQLAlchemy application that uses SQLite.
150
151Legacy Transaction Mode with the sqlite3 driver
152^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
153
154The most important aspect of transaction handling with the sqlite3 driver is
155that it defaults (which will continue through Python 3.15 before being
156removed in Python 3.16) to legacy transactional behavior which does
157not strictly follow :pep:`249`. The way in which the driver diverges from the
158PEP is that it does not "begin" a transaction automatically as dictated by
159:pep:`249` except in the case of DML statements, e.g. INSERT, UPDATE, and
160DELETE. Normally, :pep:`249` dictates that a BEGIN must be emitted upon
161the first SQL statement of any kind, so that all subsequent operations will
162be established within a transaction until ``connection.commit()`` has been
163called. The ``sqlite3`` driver, in an effort to be easier to use in
164highly concurrent environments, skips this step for DQL (e.g. SELECT) statements,
165and also skips it for DDL (e.g. CREATE TABLE etc.) statements for more legacy
166reasons. Statements such as SAVEPOINT are also skipped.
167
168In modern versions of the ``sqlite3`` driver as of Python 3.12, this legacy
169mode of operation is referred to as
170`"legacy transaction control" <https://docs.python.org/3/library/sqlite3.html#sqlite3-transaction-control-isolation-level>`_, and is in
171effect by default due to the ``Connection.autocommit`` parameter being set to
172the constant ``sqlite3.LEGACY_TRANSACTION_CONTROL``. Prior to Python 3.12,
173the ``Connection.autocommit`` attribute did not exist.
174
175The implications of legacy transaction mode include:
176
177* **Incorrect support for transactional DDL** - statements like CREATE TABLE, ALTER TABLE,
178 CREATE INDEX etc. will not automatically BEGIN a transaction if one were not
179 started already, leading to the changes by each statement being
180 "autocommitted" immediately unless BEGIN were otherwise emitted first. Very
181 old (pre Python 3.6) versions of SQLite would also force a COMMIT for these
182 operations even if a transaction were present, however this is no longer the
183 case.
184* **SERIALIZABLE behavior not fully functional** - SQLite's transaction isolation
185 behavior is normally consistent with SERIALIZABLE isolation, as it is a file-
186 based system that locks the database file entirely for write operations,
187 preventing COMMIT until all reader transactions (and associated file locks)
188 have completed. However, sqlite3's legacy transaction mode fails to emit BEGIN for SELECT
189 statements, which causes these SELECT statements to no longer be "repeatable",
190 failing one of the consistency guarantees of SERIALIZABLE.
191* **Incorrect behavior for SAVEPOINT** - as the SAVEPOINT statement does not
192 imply a BEGIN, a new SAVEPOINT emitted before a BEGIN will function on its
193 own but fails to participate in the enclosing transaction, meaning a ROLLBACK
194 of the transaction will not rollback elements that were part of a released
195 savepoint.
196
197Legacy transaction mode first existed in order to facilitate working around
198SQLite's file locks. Because SQLite relies upon whole-file locks, it is easy to
199get "database is locked" errors, particularly when newer features like "write
200ahead logging" are disabled. This is a key reason why ``sqlite3``'s legacy
201transaction mode is still the default mode of operation; disabling it will
202produce behavior that is more susceptible to locked database errors. However
203note that **legacy transaction mode will no longer be the default** in a future
204Python version (3.16 as of this writing).
205
206.. _sqlite_enabling_transactions:
207
208Enabling Non-Legacy SQLite Transactional Modes with the sqlite3 or aiosqlite driver
209^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
210
211Current SQLAlchemy support allows either for setting the
212``.Connection.autocommit`` attribute, most directly by using a
213:func:`._sa.create_engine` parameter, or if on an older version of Python where
214the attribute is not available, using event hooks to control the behavior of
215BEGIN.
216
217* **Enabling modern sqlite3 transaction control via the autocommit connect parameter** (Python 3.12 and above)
218
219 To use SQLite in the mode described at `Transaction control via the autocommit attribute <https://docs.python.org/3/library/sqlite3.html#transaction-control-via-the-autocommit-attribute>`_,
220 the most straightforward approach is to set the attribute to its recommended value
221 of ``False`` at the connect level using :paramref:`_sa.create_engine.connect_args``::
222
223 from sqlalchemy import create_engine
224
225 engine = create_engine(
226 "sqlite:///myfile.db", connect_args={"autocommit": False}
227 )
228
229 This parameter is also passed through when using the aiosqlite driver::
230
231 from sqlalchemy.ext.asyncio import create_async_engine
232
233 engine = create_async_engine(
234 "sqlite+aiosqlite:///myfile.db", connect_args={"autocommit": False}
235 )
236
237 The parameter can also be set at the attribute level using the :meth:`.PoolEvents.connect`
238 event hook, however this will only work for sqlite3, as aiosqlite does not yet expose this
239 attribute on its ``Connection`` object::
240
241 from sqlalchemy import create_engine, event
242
243 engine = create_engine("sqlite:///myfile.db")
244
245
246 @event.listens_for(engine, "connect")
247 def do_connect(dbapi_connection, connection_record):
248 # enable autocommit=False mode
249 dbapi_connection.autocommit = False
250
251* **Using SQLAlchemy to emit BEGIN in lieu of SQLite's transaction control** (all Python versions, sqlite3 and aiosqlite)
252
253 For older versions of ``sqlite3`` or for cross-compatiblity with older and
254 newer versions, SQLAlchemy can also take over the job of transaction control.
255 This is achieved by using the :meth:`.ConnectionEvents.begin` hook
256 to emit the "BEGIN" command directly, while also disabling SQLite's control
257 of this command using the :meth:`.PoolEvents.connect` event hook to set the
258 ``Connection.isolation_level`` attribute to ``None``::
259
260
261 from sqlalchemy import create_engine, event
262
263 engine = create_engine("sqlite:///myfile.db")
264
265
266 @event.listens_for(engine, "connect")
267 def do_connect(dbapi_connection, connection_record):
268 # disable sqlite3's emitting of the BEGIN statement entirely.
269 dbapi_connection.isolation_level = None
270
271
272 @event.listens_for(engine, "begin")
273 def do_begin(conn):
274 # emit our own BEGIN. sqlite3 still emits COMMIT/ROLLBACK correctly
275 conn.exec_driver_sql("BEGIN")
276
277 When using the asyncio variant ``aiosqlite``, refer to ``engine.sync_engine``
278 as in the example below::
279
280 from sqlalchemy import create_engine, event
281 from sqlalchemy.ext.asyncio import create_async_engine
282
283 engine = create_async_engine("sqlite+aiosqlite:///myfile.db")
284
285
286 @event.listens_for(engine.sync_engine, "connect")
287 def do_connect(dbapi_connection, connection_record):
288 # disable aiosqlite's emitting of the BEGIN statement entirely.
289 dbapi_connection.isolation_level = None
290
291
292 @event.listens_for(engine.sync_engine, "begin")
293 def do_begin(conn):
294 # emit our own BEGIN. aiosqlite still emits COMMIT/ROLLBACK correctly
295 conn.exec_driver_sql("BEGIN")
296
297.. _sqlite_isolation_level:
298
299Using SQLAlchemy's Driver Level AUTOCOMMIT Feature with SQLite
300^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
301
302SQLAlchemy has a comprehensive database isolation feature with optional
303autocommit support that is introduced in the section :ref:`dbapi_autocommit`.
304
305For the ``sqlite3`` and ``aiosqlite`` drivers, SQLAlchemy only includes
306built-in support for "AUTOCOMMIT". Note that this mode is currently incompatible
307with the non-legacy isolation mode hooks documented in the previous
308section at :ref:`sqlite_enabling_transactions`.
309
310To use the ``sqlite3`` driver with SQLAlchemy driver-level autocommit,
311create an engine setting the :paramref:`_sa.create_engine.isolation_level`
312parameter to "AUTOCOMMIT"::
313
314 eng = create_engine("sqlite:///myfile.db", isolation_level="AUTOCOMMIT")
315
316When using the above mode, any event hooks that set the sqlite3 ``Connection.autocommit``
317parameter away from its default of ``sqlite3.LEGACY_TRANSACTION_CONTROL``
318as well as hooks that emit ``BEGIN`` should be disabled.
319
320Additional Reading for SQLite / sqlite3 transaction control
321^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
322
323Links with important information on SQLite, the sqlite3 driver,
324as well as long historical conversations on how things got to their current state:
325
326* `Isolation in SQLite <https://www.sqlite.org/isolation.html>`_ - on the SQLite website
327* `Transaction control <https://docs.python.org/3/library/sqlite3.html#transaction-control>`_ - describes the sqlite3 autocommit attribute as well
328 as the legacy isolation_level attribute.
329* `sqlite3 SELECT does not BEGIN a transaction, but should according to spec <https://github.com/python/cpython/issues/54133>`_ - imported Python standard library issue on github
330* `sqlite3 module breaks transactions and potentially corrupts data <https://github.com/python/cpython/issues/54949>`_ - imported Python standard library issue on github
331
332
333INSERT/UPDATE/DELETE...RETURNING
334---------------------------------
335
336The SQLite dialect supports SQLite 3.35's ``INSERT|UPDATE|DELETE..RETURNING``
337syntax. ``INSERT..RETURNING`` may be used
338automatically in some cases in order to fetch newly generated identifiers in
339place of the traditional approach of using ``cursor.lastrowid``, however
340``cursor.lastrowid`` is currently still preferred for simple single-statement
341cases for its better performance.
342
343To specify an explicit ``RETURNING`` clause, use the
344:meth:`._UpdateBase.returning` method on a per-statement basis::
345
346 # INSERT..RETURNING
347 result = connection.execute(
348 table.insert().values(name="foo").returning(table.c.col1, table.c.col2)
349 )
350 print(result.all())
351
352 # UPDATE..RETURNING
353 result = connection.execute(
354 table.update()
355 .where(table.c.name == "foo")
356 .values(name="bar")
357 .returning(table.c.col1, table.c.col2)
358 )
359 print(result.all())
360
361 # DELETE..RETURNING
362 result = connection.execute(
363 table.delete()
364 .where(table.c.name == "foo")
365 .returning(table.c.col1, table.c.col2)
366 )
367 print(result.all())
368
369.. versionadded:: 2.0 Added support for SQLite RETURNING
370
371
372.. _sqlite_foreign_keys:
373
374Foreign Key Support
375-------------------
376
377SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables,
378however by default these constraints have no effect on the operation of the
379table.
380
381Constraint checking on SQLite has three prerequisites:
382
383* At least version 3.6.19 of SQLite must be in use
384* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY
385 or SQLITE_OMIT_TRIGGER symbols enabled.
386* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all
387 connections before use -- including the initial call to
388 :meth:`sqlalchemy.schema.MetaData.create_all`.
389
390SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for
391new connections through the usage of events::
392
393 from sqlalchemy.engine import Engine
394 from sqlalchemy import event
395
396
397 @event.listens_for(Engine, "connect")
398 def set_sqlite_pragma(dbapi_connection, connection_record):
399 # the sqlite3 driver will not set PRAGMA foreign_keys
400 # if autocommit=False; set to True temporarily
401 ac = dbapi_connection.autocommit
402 dbapi_connection.autocommit = True
403
404 cursor = dbapi_connection.cursor()
405 cursor.execute("PRAGMA foreign_keys=ON")
406 cursor.close()
407
408 # restore previous autocommit setting
409 dbapi_connection.autocommit = ac
410
411.. warning::
412
413 When SQLite foreign keys are enabled, it is **not possible**
414 to emit CREATE or DROP statements for tables that contain
415 mutually-dependent foreign key constraints;
416 to emit the DDL for these tables requires that ALTER TABLE be used to
417 create or drop these constraints separately, for which SQLite has
418 no support.
419
420.. seealso::
421
422 `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_
423 - on the SQLite web site.
424
425 :ref:`event_toplevel` - SQLAlchemy event API.
426
427 :ref:`use_alter` - more information on SQLAlchemy's facilities for handling
428 mutually-dependent foreign key constraints.
429
430.. _sqlite_on_conflict_ddl:
431
432ON CONFLICT support for constraints
433-----------------------------------
434
435.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for
436 SQLite, which occurs within a CREATE TABLE statement. For "ON CONFLICT" as
437 applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`.
438
439SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied
440to primary key, unique, check, and not null constraints. In DDL, it is
441rendered either within the "CONSTRAINT" clause or within the column definition
442itself depending on the location of the target constraint. To render this
443clause within DDL, the extension parameter ``sqlite_on_conflict`` can be
444specified with a string conflict resolution algorithm within the
445:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`,
446:class:`.CheckConstraint` objects. Within the :class:`_schema.Column` object,
447there
448are individual parameters ``sqlite_on_conflict_not_null``,
449``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each
450correspond to the three types of relevant constraint types that can be
451indicated from a :class:`_schema.Column` object.
452
453.. seealso::
454
455 `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite
456 documentation
457
458The ``sqlite_on_conflict`` parameters accept a string argument which is just
459the resolution name to be chosen, which on SQLite can be one of ROLLBACK,
460ABORT, FAIL, IGNORE, and REPLACE. For example, to add a UNIQUE constraint
461that specifies the IGNORE algorithm::
462
463 some_table = Table(
464 "some_table",
465 metadata,
466 Column("id", Integer, primary_key=True),
467 Column("data", Integer),
468 UniqueConstraint("id", "data", sqlite_on_conflict="IGNORE"),
469 )
470
471The above renders CREATE TABLE DDL as:
472
473.. sourcecode:: sql
474
475 CREATE TABLE some_table (
476 id INTEGER NOT NULL,
477 data INTEGER,
478 PRIMARY KEY (id),
479 UNIQUE (id, data) ON CONFLICT IGNORE
480 )
481
482
483When using the :paramref:`_schema.Column.unique`
484flag to add a UNIQUE constraint
485to a single column, the ``sqlite_on_conflict_unique`` parameter can
486be added to the :class:`_schema.Column` as well, which will be added to the
487UNIQUE constraint in the DDL::
488
489 some_table = Table(
490 "some_table",
491 metadata,
492 Column("id", Integer, primary_key=True),
493 Column(
494 "data", Integer, unique=True, sqlite_on_conflict_unique="IGNORE"
495 ),
496 )
497
498rendering:
499
500.. sourcecode:: sql
501
502 CREATE TABLE some_table (
503 id INTEGER NOT NULL,
504 data INTEGER,
505 PRIMARY KEY (id),
506 UNIQUE (data) ON CONFLICT IGNORE
507 )
508
509To apply the FAIL algorithm for a NOT NULL constraint,
510``sqlite_on_conflict_not_null`` is used::
511
512 some_table = Table(
513 "some_table",
514 metadata,
515 Column("id", Integer, primary_key=True),
516 Column(
517 "data", Integer, nullable=False, sqlite_on_conflict_not_null="FAIL"
518 ),
519 )
520
521this renders the column inline ON CONFLICT phrase:
522
523.. sourcecode:: sql
524
525 CREATE TABLE some_table (
526 id INTEGER NOT NULL,
527 data INTEGER NOT NULL ON CONFLICT FAIL,
528 PRIMARY KEY (id)
529 )
530
531
532Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``::
533
534 some_table = Table(
535 "some_table",
536 metadata,
537 Column(
538 "id",
539 Integer,
540 primary_key=True,
541 sqlite_on_conflict_primary_key="FAIL",
542 ),
543 )
544
545SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict
546resolution algorithm is applied to the constraint itself:
547
548.. sourcecode:: sql
549
550 CREATE TABLE some_table (
551 id INTEGER NOT NULL,
552 PRIMARY KEY (id) ON CONFLICT FAIL
553 )
554
555.. _sqlite_on_conflict_insert:
556
557INSERT...ON CONFLICT (Upsert)
558-----------------------------
559
560.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for
561 SQLite, which occurs within an INSERT statement. For "ON CONFLICT" as
562 applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`.
563
564From version 3.24.0 onwards, SQLite supports "upserts" (update or insert)
565of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT``
566statement. A candidate row will only be inserted if that row does not violate
567any unique or primary key constraints. In the case of a unique constraint violation, a
568secondary action can occur which can be either "DO UPDATE", indicating that
569the data in the target row should be updated, or "DO NOTHING", which indicates
570to silently skip this row.
571
572Conflicts are determined using columns that are part of existing unique
573constraints and indexes. These constraints are identified by stating the
574columns and conditions that comprise the indexes.
575
576SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific
577:func:`_sqlite.insert()` function, which provides
578the generative methods :meth:`_sqlite.Insert.on_conflict_do_update`
579and :meth:`_sqlite.Insert.on_conflict_do_nothing`:
580
581.. sourcecode:: pycon+sql
582
583 >>> from sqlalchemy.dialects.sqlite import insert
584
585 >>> insert_stmt = insert(my_table).values(
586 ... id="some_existing_id", data="inserted value"
587 ... )
588
589 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
590 ... index_elements=["id"], set_=dict(data="updated value")
591 ... )
592
593 >>> print(do_update_stmt)
594 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
595 ON CONFLICT (id) DO UPDATE SET data = ?{stop}
596
597 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"])
598
599 >>> print(do_nothing_stmt)
600 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
601 ON CONFLICT (id) DO NOTHING
602
603.. versionadded:: 1.4
604
605.. seealso::
606
607 `Upsert
608 <https://sqlite.org/lang_UPSERT.html>`_
609 - in the SQLite documentation.
610
611
612Specifying the Target
613^^^^^^^^^^^^^^^^^^^^^
614
615Both methods supply the "target" of the conflict using column inference:
616
617* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument
618 specifies a sequence containing string column names, :class:`_schema.Column`
619 objects, and/or SQL expression elements, which would identify a unique index
620 or unique constraint.
621
622* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements`
623 to infer an index, a partial index can be inferred by also specifying the
624 :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter:
625
626 .. sourcecode:: pycon+sql
627
628 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data")
629
630 >>> do_update_stmt = stmt.on_conflict_do_update(
631 ... index_elements=[my_table.c.user_email],
632 ... index_where=my_table.c.user_email.like("%@gmail.com"),
633 ... set_=dict(data=stmt.excluded.data),
634 ... )
635
636 >>> print(do_update_stmt)
637 {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?)
638 ON CONFLICT (user_email)
639 WHERE user_email LIKE '%@gmail.com'
640 DO UPDATE SET data = excluded.data
641
642The SET Clause
643^^^^^^^^^^^^^^^
644
645``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
646existing row, using any combination of new values as well as values
647from the proposed insertion. These values are specified using the
648:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter. This
649parameter accepts a dictionary which consists of direct values
650for UPDATE:
651
652.. sourcecode:: pycon+sql
653
654 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
655
656 >>> do_update_stmt = stmt.on_conflict_do_update(
657 ... index_elements=["id"], set_=dict(data="updated value")
658 ... )
659
660 >>> print(do_update_stmt)
661 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
662 ON CONFLICT (id) DO UPDATE SET data = ?
663
664.. warning::
665
666 The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take
667 into account Python-side default UPDATE values or generation functions,
668 e.g. those specified using :paramref:`_schema.Column.onupdate`. These
669 values will not be exercised for an ON CONFLICT style of UPDATE, unless
670 they are manually specified in the
671 :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary.
672
673Updating using the Excluded INSERT Values
674^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
675
676In order to refer to the proposed insertion row, the special alias
677:attr:`~.sqlite.Insert.excluded` is available as an attribute on
678the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix
679on a column, that informs the DO UPDATE to update the row with the value that
680would have been inserted had the constraint not failed:
681
682.. sourcecode:: pycon+sql
683
684 >>> stmt = insert(my_table).values(
685 ... id="some_id", data="inserted value", author="jlh"
686 ... )
687
688 >>> do_update_stmt = stmt.on_conflict_do_update(
689 ... index_elements=["id"],
690 ... set_=dict(data="updated value", author=stmt.excluded.author),
691 ... )
692
693 >>> print(do_update_stmt)
694 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
695 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
696
697Additional WHERE Criteria
698^^^^^^^^^^^^^^^^^^^^^^^^^
699
700The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts
701a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where`
702parameter, which will limit those rows which receive an UPDATE:
703
704.. sourcecode:: pycon+sql
705
706 >>> stmt = insert(my_table).values(
707 ... id="some_id", data="inserted value", author="jlh"
708 ... )
709
710 >>> on_update_stmt = stmt.on_conflict_do_update(
711 ... index_elements=["id"],
712 ... set_=dict(data="updated value", author=stmt.excluded.author),
713 ... where=(my_table.c.status == 2),
714 ... )
715 >>> print(on_update_stmt)
716 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
717 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
718 WHERE my_table.status = ?
719
720
721Skipping Rows with DO NOTHING
722^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
723
724``ON CONFLICT`` may be used to skip inserting a row entirely
725if any conflict with a unique constraint occurs; below this is illustrated
726using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method:
727
728.. sourcecode:: pycon+sql
729
730 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
731 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
732 >>> print(stmt)
733 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING
734
735
736If ``DO NOTHING`` is used without specifying any columns or constraint,
737it has the effect of skipping the INSERT for any unique violation which
738occurs:
739
740.. sourcecode:: pycon+sql
741
742 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
743 >>> stmt = stmt.on_conflict_do_nothing()
744 >>> print(stmt)
745 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING
746
747.. _sqlite_type_reflection:
748
749Type Reflection
750---------------
751
752SQLite types are unlike those of most other database backends, in that
753the string name of the type usually does not correspond to a "type" in a
754one-to-one fashion. Instead, SQLite links per-column typing behavior
755to one of five so-called "type affinities" based on a string matching
756pattern for the type.
757
758SQLAlchemy's reflection process, when inspecting types, uses a simple
759lookup table to link the keywords returned to provided SQLAlchemy types.
760This lookup table is present within the SQLite dialect as it is for all
761other dialects. However, the SQLite dialect has a different "fallback"
762routine for when a particular type name is not located in the lookup map;
763it instead implements the SQLite "type affinity" scheme located at
764https://www.sqlite.org/datatype3.html section 2.1.
765
766The provided typemap will make direct associations from an exact string
767name match for the following types:
768
769:class:`_types.BIGINT`, :class:`_types.BLOB`,
770:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`,
771:class:`_types.CHAR`, :class:`_types.DATE`,
772:class:`_types.DATETIME`, :class:`_types.FLOAT`,
773:class:`_types.DECIMAL`, :class:`_types.FLOAT`,
774:class:`_types.INTEGER`, :class:`_types.INTEGER`,
775:class:`_types.NUMERIC`, :class:`_types.REAL`,
776:class:`_types.SMALLINT`, :class:`_types.TEXT`,
777:class:`_types.TIME`, :class:`_types.TIMESTAMP`,
778:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`,
779:class:`_types.NCHAR`
780
781When a type name does not match one of the above types, the "type affinity"
782lookup is used instead:
783
784* :class:`_types.INTEGER` is returned if the type name includes the
785 string ``INT``
786* :class:`_types.TEXT` is returned if the type name includes the
787 string ``CHAR``, ``CLOB`` or ``TEXT``
788* :class:`_types.NullType` is returned if the type name includes the
789 string ``BLOB``
790* :class:`_types.REAL` is returned if the type name includes the string
791 ``REAL``, ``FLOA`` or ``DOUB``.
792* Otherwise, the :class:`_types.NUMERIC` type is used.
793
794.. _sqlite_partial_index:
795
796Partial Indexes
797---------------
798
799A partial index, e.g. one which uses a WHERE clause, can be specified
800with the DDL system using the argument ``sqlite_where``::
801
802 tbl = Table("testtbl", m, Column("data", Integer))
803 idx = Index(
804 "test_idx1",
805 tbl.c.data,
806 sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10),
807 )
808
809The index will be rendered at create time as:
810
811.. sourcecode:: sql
812
813 CREATE INDEX test_idx1 ON testtbl (data)
814 WHERE data > 5 AND data < 10
815
816.. _sqlite_dotted_column_names:
817
818Dotted Column Names
819-------------------
820
821Using table or column names that explicitly have periods in them is
822**not recommended**. While this is generally a bad idea for relational
823databases in general, as the dot is a syntactically significant character,
824the SQLite driver up until version **3.10.0** of SQLite has a bug which
825requires that SQLAlchemy filter out these dots in result sets.
826
827The bug, entirely outside of SQLAlchemy, can be illustrated thusly::
828
829 import sqlite3
830
831 assert sqlite3.sqlite_version_info < (
832 3,
833 10,
834 0,
835 ), "bug is fixed in this version"
836
837 conn = sqlite3.connect(":memory:")
838 cursor = conn.cursor()
839
840 cursor.execute("create table x (a integer, b integer)")
841 cursor.execute("insert into x (a, b) values (1, 1)")
842 cursor.execute("insert into x (a, b) values (2, 2)")
843
844 cursor.execute("select x.a, x.b from x")
845 assert [c[0] for c in cursor.description] == ["a", "b"]
846
847 cursor.execute(
848 """
849 select x.a, x.b from x where a=1
850 union
851 select x.a, x.b from x where a=2
852 """
853 )
854 assert [c[0] for c in cursor.description] == ["a", "b"], [
855 c[0] for c in cursor.description
856 ]
857
858The second assertion fails:
859
860.. sourcecode:: text
861
862 Traceback (most recent call last):
863 File "test.py", line 19, in <module>
864 [c[0] for c in cursor.description]
865 AssertionError: ['x.a', 'x.b']
866
867Where above, the driver incorrectly reports the names of the columns
868including the name of the table, which is entirely inconsistent vs.
869when the UNION is not present.
870
871SQLAlchemy relies upon column names being predictable in how they match
872to the original statement, so the SQLAlchemy dialect has no choice but
873to filter these out::
874
875
876 from sqlalchemy import create_engine
877
878 eng = create_engine("sqlite://")
879 conn = eng.connect()
880
881 conn.exec_driver_sql("create table x (a integer, b integer)")
882 conn.exec_driver_sql("insert into x (a, b) values (1, 1)")
883 conn.exec_driver_sql("insert into x (a, b) values (2, 2)")
884
885 result = conn.exec_driver_sql("select x.a, x.b from x")
886 assert result.keys() == ["a", "b"]
887
888 result = conn.exec_driver_sql(
889 """
890 select x.a, x.b from x where a=1
891 union
892 select x.a, x.b from x where a=2
893 """
894 )
895 assert result.keys() == ["a", "b"]
896
897Note that above, even though SQLAlchemy filters out the dots, *both
898names are still addressable*::
899
900 >>> row = result.first()
901 >>> row["a"]
902 1
903 >>> row["x.a"]
904 1
905 >>> row["b"]
906 1
907 >>> row["x.b"]
908 1
909
910Therefore, the workaround applied by SQLAlchemy only impacts
911:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In
912the very specific case where an application is forced to use column names that
913contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and
914:meth:`.Row.keys()` is required to return these dotted names unmodified,
915the ``sqlite_raw_colnames`` execution option may be provided, either on a
916per-:class:`_engine.Connection` basis::
917
918 result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql(
919 """
920 select x.a, x.b from x where a=1
921 union
922 select x.a, x.b from x where a=2
923 """
924 )
925 assert result.keys() == ["x.a", "x.b"]
926
927or on a per-:class:`_engine.Engine` basis::
928
929 engine = create_engine(
930 "sqlite://", execution_options={"sqlite_raw_colnames": True}
931 )
932
933When using the per-:class:`_engine.Engine` execution option, note that
934**Core and ORM queries that use UNION may not function properly**.
935
936SQLite-specific table options
937-----------------------------
938
939One option for CREATE TABLE is supported directly by the SQLite
940dialect in conjunction with the :class:`_schema.Table` construct:
941
942* ``WITHOUT ROWID``::
943
944 Table("some_table", metadata, ..., sqlite_with_rowid=False)
945
946*
947 ``STRICT``::
948
949 Table("some_table", metadata, ..., sqlite_strict=True)
950
951 .. versionadded:: 2.0.37
952
953.. seealso::
954
955 `SQLite CREATE TABLE options
956 <https://www.sqlite.org/lang_createtable.html>`_
957
958.. _sqlite_include_internal:
959
960Reflecting internal schema tables
961----------------------------------
962
963Reflection methods that return lists of tables will omit so-called
964"SQLite internal schema object" names, which are considered by SQLite
965as any object name that is prefixed with ``sqlite_``. An example of
966such an object is the ``sqlite_sequence`` table that's generated when
967the ``AUTOINCREMENT`` column parameter is used. In order to return
968these objects, the parameter ``sqlite_include_internal=True`` may be
969passed to methods such as :meth:`_schema.MetaData.reflect` or
970:meth:`.Inspector.get_table_names`.
971
972.. versionadded:: 2.0 Added the ``sqlite_include_internal=True`` parameter.
973 Previously, these tables were not ignored by SQLAlchemy reflection
974 methods.
975
976.. note::
977
978 The ``sqlite_include_internal`` parameter does not refer to the
979 "system" tables that are present in schemas such as ``sqlite_master``.
980
981.. seealso::
982
983 `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite
984 documentation.
985
986''' # noqa
987from __future__ import annotations
988
989import datetime
990import numbers
991import re
992from typing import Any
993from typing import Callable
994from typing import Optional
995from typing import TYPE_CHECKING
996
997from .json import JSON
998from .json import JSONIndexType
999from .json import JSONPathType
1000from ... import exc
1001from ... import schema as sa_schema
1002from ... import sql
1003from ... import text
1004from ... import types as sqltypes
1005from ... import util
1006from ...engine import default
1007from ...engine import processors
1008from ...engine import reflection
1009from ...engine.reflection import ReflectionDefaults
1010from ...sql import coercions
1011from ...sql import compiler
1012from ...sql import ddl as sa_ddl
1013from ...sql import elements
1014from ...sql import roles
1015from ...sql import schema
1016from ...types import BLOB # noqa
1017from ...types import BOOLEAN # noqa
1018from ...types import CHAR # noqa
1019from ...types import DECIMAL # noqa
1020from ...types import FLOAT # noqa
1021from ...types import INTEGER # noqa
1022from ...types import NUMERIC # noqa
1023from ...types import REAL # noqa
1024from ...types import SMALLINT # noqa
1025from ...types import TEXT # noqa
1026from ...types import TIMESTAMP # noqa
1027from ...types import VARCHAR # noqa
1028
1029if TYPE_CHECKING:
1030 from ...engine.interfaces import DBAPIConnection
1031 from ...engine.interfaces import Dialect
1032 from ...engine.interfaces import IsolationLevel
1033 from ...sql.type_api import _BindProcessorType
1034 from ...sql.type_api import _ResultProcessorType
1035
1036
1037class _SQliteJson(JSON):
1038 def result_processor(self, dialect, coltype):
1039 default_processor = super().result_processor(dialect, coltype)
1040
1041 def process(value):
1042 try:
1043 return default_processor(value)
1044 except TypeError:
1045 if isinstance(value, numbers.Number):
1046 return value
1047 else:
1048 raise
1049
1050 return process
1051
1052
1053class _DateTimeMixin:
1054 _reg = None
1055 _storage_format = None
1056
1057 def __init__(self, storage_format=None, regexp=None, **kw):
1058 super().__init__(**kw)
1059 if regexp is not None:
1060 self._reg = re.compile(regexp)
1061 if storage_format is not None:
1062 self._storage_format = storage_format
1063
1064 @property
1065 def format_is_text_affinity(self):
1066 """return True if the storage format will automatically imply
1067 a TEXT affinity.
1068
1069 If the storage format contains no non-numeric characters,
1070 it will imply a NUMERIC storage format on SQLite; in this case,
1071 the type will generate its DDL as DATE_CHAR, DATETIME_CHAR,
1072 TIME_CHAR.
1073
1074 """
1075 spec = self._storage_format % {
1076 "year": 0,
1077 "month": 0,
1078 "day": 0,
1079 "hour": 0,
1080 "minute": 0,
1081 "second": 0,
1082 "microsecond": 0,
1083 }
1084 return bool(re.search(r"[^0-9]", spec))
1085
1086 def adapt(self, cls, **kw):
1087 if issubclass(cls, _DateTimeMixin):
1088 if self._storage_format:
1089 kw["storage_format"] = self._storage_format
1090 if self._reg:
1091 kw["regexp"] = self._reg
1092 return super().adapt(cls, **kw)
1093
1094 def literal_processor(self, dialect):
1095 bp = self.bind_processor(dialect)
1096
1097 def process(value):
1098 return "'%s'" % bp(value)
1099
1100 return process
1101
1102
1103class DATETIME(_DateTimeMixin, sqltypes.DateTime):
1104 r"""Represent a Python datetime object in SQLite using a string.
1105
1106 The default string storage format is::
1107
1108 "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1109
1110 e.g.:
1111
1112 .. sourcecode:: text
1113
1114 2021-03-15 12:05:57.105542
1115
1116 The incoming storage format is by default parsed using the
1117 Python ``datetime.fromisoformat()`` function.
1118
1119 .. versionchanged:: 2.0 ``datetime.fromisoformat()`` is used for default
1120 datetime string parsing.
1121
1122 The storage format can be customized to some degree using the
1123 ``storage_format`` and ``regexp`` parameters, such as::
1124
1125 import re
1126 from sqlalchemy.dialects.sqlite import DATETIME
1127
1128 dt = DATETIME(
1129 storage_format=(
1130 "%(year)04d/%(month)02d/%(day)02d %(hour)02d:%(minute)02d:%(second)02d"
1131 ),
1132 regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)",
1133 )
1134
1135 :param truncate_microseconds: when ``True`` microseconds will be truncated
1136 from the datetime. Can't be specified together with ``storage_format``
1137 or ``regexp``.
1138
1139 :param storage_format: format string which will be applied to the dict
1140 with keys year, month, day, hour, minute, second, and microsecond.
1141
1142 :param regexp: regular expression which will be applied to incoming result
1143 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming
1144 strings. If the regexp contains named groups, the resulting match dict is
1145 applied to the Python datetime() constructor as keyword arguments.
1146 Otherwise, if positional groups are used, the datetime() constructor
1147 is called with positional arguments via
1148 ``*map(int, match_obj.groups(0))``.
1149
1150 """ # noqa
1151
1152 _storage_format = (
1153 "%(year)04d-%(month)02d-%(day)02d "
1154 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1155 )
1156
1157 def __init__(self, *args, **kwargs):
1158 truncate_microseconds = kwargs.pop("truncate_microseconds", False)
1159 super().__init__(*args, **kwargs)
1160 if truncate_microseconds:
1161 assert "storage_format" not in kwargs, (
1162 "You can specify only "
1163 "one of truncate_microseconds or storage_format."
1164 )
1165 assert "regexp" not in kwargs, (
1166 "You can specify only one of "
1167 "truncate_microseconds or regexp."
1168 )
1169 self._storage_format = (
1170 "%(year)04d-%(month)02d-%(day)02d "
1171 "%(hour)02d:%(minute)02d:%(second)02d"
1172 )
1173
1174 def bind_processor(
1175 self, dialect: Dialect
1176 ) -> Optional[_BindProcessorType[Any]]:
1177 datetime_datetime = datetime.datetime
1178 datetime_date = datetime.date
1179 format_ = self._storage_format
1180
1181 def process(value):
1182 if value is None:
1183 return None
1184 elif isinstance(value, datetime_datetime):
1185 return format_ % {
1186 "year": value.year,
1187 "month": value.month,
1188 "day": value.day,
1189 "hour": value.hour,
1190 "minute": value.minute,
1191 "second": value.second,
1192 "microsecond": value.microsecond,
1193 }
1194 elif isinstance(value, datetime_date):
1195 return format_ % {
1196 "year": value.year,
1197 "month": value.month,
1198 "day": value.day,
1199 "hour": 0,
1200 "minute": 0,
1201 "second": 0,
1202 "microsecond": 0,
1203 }
1204 else:
1205 raise TypeError(
1206 "SQLite DateTime type only accepts Python "
1207 "datetime and date objects as input."
1208 )
1209
1210 return process
1211
1212 def result_processor(
1213 self, dialect: Dialect, coltype: object
1214 ) -> Optional[_ResultProcessorType[Any]]:
1215 if self._reg:
1216 return processors.str_to_datetime_processor_factory(
1217 self._reg, datetime.datetime
1218 )
1219 else:
1220 return processors.str_to_datetime
1221
1222
1223class DATE(_DateTimeMixin, sqltypes.Date):
1224 r"""Represent a Python date object in SQLite using a string.
1225
1226 The default string storage format is::
1227
1228 "%(year)04d-%(month)02d-%(day)02d"
1229
1230 e.g.:
1231
1232 .. sourcecode:: text
1233
1234 2011-03-15
1235
1236 The incoming storage format is by default parsed using the
1237 Python ``date.fromisoformat()`` function.
1238
1239 .. versionchanged:: 2.0 ``date.fromisoformat()`` is used for default
1240 date string parsing.
1241
1242
1243 The storage format can be customized to some degree using the
1244 ``storage_format`` and ``regexp`` parameters, such as::
1245
1246 import re
1247 from sqlalchemy.dialects.sqlite import DATE
1248
1249 d = DATE(
1250 storage_format="%(month)02d/%(day)02d/%(year)04d",
1251 regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)"),
1252 )
1253
1254 :param storage_format: format string which will be applied to the
1255 dict with keys year, month, and day.
1256
1257 :param regexp: regular expression which will be applied to
1258 incoming result rows, replacing the use of ``date.fromisoformat()`` to
1259 parse incoming strings. If the regexp contains named groups, the resulting
1260 match dict is applied to the Python date() constructor as keyword
1261 arguments. Otherwise, if positional groups are used, the date()
1262 constructor is called with positional arguments via
1263 ``*map(int, match_obj.groups(0))``.
1264
1265 """
1266
1267 _storage_format = "%(year)04d-%(month)02d-%(day)02d"
1268
1269 def bind_processor(
1270 self, dialect: Dialect
1271 ) -> Optional[_BindProcessorType[Any]]:
1272 datetime_date = datetime.date
1273 format_ = self._storage_format
1274
1275 def process(value):
1276 if value is None:
1277 return None
1278 elif isinstance(value, datetime_date):
1279 return format_ % {
1280 "year": value.year,
1281 "month": value.month,
1282 "day": value.day,
1283 }
1284 else:
1285 raise TypeError(
1286 "SQLite Date type only accepts Python "
1287 "date objects as input."
1288 )
1289
1290 return process
1291
1292 def result_processor(
1293 self, dialect: Dialect, coltype: object
1294 ) -> Optional[_ResultProcessorType[Any]]:
1295 if self._reg:
1296 return processors.str_to_datetime_processor_factory(
1297 self._reg, datetime.date
1298 )
1299 else:
1300 return processors.str_to_date
1301
1302
1303class TIME(_DateTimeMixin, sqltypes.Time):
1304 r"""Represent a Python time object in SQLite using a string.
1305
1306 The default string storage format is::
1307
1308 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1309
1310 e.g.:
1311
1312 .. sourcecode:: text
1313
1314 12:05:57.10558
1315
1316 The incoming storage format is by default parsed using the
1317 Python ``time.fromisoformat()`` function.
1318
1319 .. versionchanged:: 2.0 ``time.fromisoformat()`` is used for default
1320 time string parsing.
1321
1322 The storage format can be customized to some degree using the
1323 ``storage_format`` and ``regexp`` parameters, such as::
1324
1325 import re
1326 from sqlalchemy.dialects.sqlite import TIME
1327
1328 t = TIME(
1329 storage_format="%(hour)02d-%(minute)02d-%(second)02d-%(microsecond)06d",
1330 regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?"),
1331 )
1332
1333 :param truncate_microseconds: when ``True`` microseconds will be truncated
1334 from the time. Can't be specified together with ``storage_format``
1335 or ``regexp``.
1336
1337 :param storage_format: format string which will be applied to the dict
1338 with keys hour, minute, second, and microsecond.
1339
1340 :param regexp: regular expression which will be applied to incoming result
1341 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming
1342 strings. If the regexp contains named groups, the resulting match dict is
1343 applied to the Python time() constructor as keyword arguments. Otherwise,
1344 if positional groups are used, the time() constructor is called with
1345 positional arguments via ``*map(int, match_obj.groups(0))``.
1346
1347 """
1348
1349 _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1350
1351 def __init__(self, *args, **kwargs):
1352 truncate_microseconds = kwargs.pop("truncate_microseconds", False)
1353 super().__init__(*args, **kwargs)
1354 if truncate_microseconds:
1355 assert "storage_format" not in kwargs, (
1356 "You can specify only "
1357 "one of truncate_microseconds or storage_format."
1358 )
1359 assert "regexp" not in kwargs, (
1360 "You can specify only one of "
1361 "truncate_microseconds or regexp."
1362 )
1363 self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d"
1364
1365 def bind_processor(self, dialect):
1366 datetime_time = datetime.time
1367 format_ = self._storage_format
1368
1369 def process(value):
1370 if value is None:
1371 return None
1372 elif isinstance(value, datetime_time):
1373 return format_ % {
1374 "hour": value.hour,
1375 "minute": value.minute,
1376 "second": value.second,
1377 "microsecond": value.microsecond,
1378 }
1379 else:
1380 raise TypeError(
1381 "SQLite Time type only accepts Python "
1382 "time objects as input."
1383 )
1384
1385 return process
1386
1387 def result_processor(self, dialect, coltype):
1388 if self._reg:
1389 return processors.str_to_datetime_processor_factory(
1390 self._reg, datetime.time
1391 )
1392 else:
1393 return processors.str_to_time
1394
1395
1396colspecs = {
1397 sqltypes.Date: DATE,
1398 sqltypes.DateTime: DATETIME,
1399 sqltypes.JSON: _SQliteJson,
1400 sqltypes.JSON.JSONIndexType: JSONIndexType,
1401 sqltypes.JSON.JSONPathType: JSONPathType,
1402 sqltypes.Time: TIME,
1403}
1404
1405ischema_names = {
1406 "BIGINT": sqltypes.BIGINT,
1407 "BLOB": sqltypes.BLOB,
1408 "BOOL": sqltypes.BOOLEAN,
1409 "BOOLEAN": sqltypes.BOOLEAN,
1410 "CHAR": sqltypes.CHAR,
1411 "DATE": sqltypes.DATE,
1412 "DATE_CHAR": sqltypes.DATE,
1413 "DATETIME": sqltypes.DATETIME,
1414 "DATETIME_CHAR": sqltypes.DATETIME,
1415 "DOUBLE": sqltypes.DOUBLE,
1416 "DECIMAL": sqltypes.DECIMAL,
1417 "FLOAT": sqltypes.FLOAT,
1418 "INT": sqltypes.INTEGER,
1419 "INTEGER": sqltypes.INTEGER,
1420 "JSON": JSON,
1421 "NUMERIC": sqltypes.NUMERIC,
1422 "REAL": sqltypes.REAL,
1423 "SMALLINT": sqltypes.SMALLINT,
1424 "TEXT": sqltypes.TEXT,
1425 "TIME": sqltypes.TIME,
1426 "TIME_CHAR": sqltypes.TIME,
1427 "TIMESTAMP": sqltypes.TIMESTAMP,
1428 "VARCHAR": sqltypes.VARCHAR,
1429 "NVARCHAR": sqltypes.NVARCHAR,
1430 "NCHAR": sqltypes.NCHAR,
1431}
1432
1433
1434class SQLiteCompiler(compiler.SQLCompiler):
1435 extract_map = util.update_copy(
1436 compiler.SQLCompiler.extract_map,
1437 {
1438 "month": "%m",
1439 "day": "%d",
1440 "year": "%Y",
1441 "second": "%S",
1442 "hour": "%H",
1443 "doy": "%j",
1444 "minute": "%M",
1445 "epoch": "%s",
1446 "dow": "%w",
1447 "week": "%W",
1448 },
1449 )
1450
1451 def visit_truediv_binary(self, binary, operator, **kw):
1452 return (
1453 self.process(binary.left, **kw)
1454 + " / "
1455 + "(%s + 0.0)" % self.process(binary.right, **kw)
1456 )
1457
1458 def visit_now_func(self, fn, **kw):
1459 return "CURRENT_TIMESTAMP"
1460
1461 def visit_localtimestamp_func(self, func, **kw):
1462 return "DATETIME(CURRENT_TIMESTAMP, 'localtime')"
1463
1464 def visit_true(self, expr, **kw):
1465 return "1"
1466
1467 def visit_false(self, expr, **kw):
1468 return "0"
1469
1470 def visit_char_length_func(self, fn, **kw):
1471 return "length%s" % self.function_argspec(fn)
1472
1473 def visit_aggregate_strings_func(self, fn, **kw):
1474 return super().visit_aggregate_strings_func(
1475 fn, use_function_name="group_concat", **kw
1476 )
1477
1478 def visit_cast(self, cast, **kwargs):
1479 if self.dialect.supports_cast:
1480 return super().visit_cast(cast, **kwargs)
1481 else:
1482 return self.process(cast.clause, **kwargs)
1483
1484 def visit_extract(self, extract, **kw):
1485 try:
1486 return "CAST(STRFTIME('%s', %s) AS INTEGER)" % (
1487 self.extract_map[extract.field],
1488 self.process(extract.expr, **kw),
1489 )
1490 except KeyError as err:
1491 raise exc.CompileError(
1492 "%s is not a valid extract argument." % extract.field
1493 ) from err
1494
1495 def returning_clause(
1496 self,
1497 stmt,
1498 returning_cols,
1499 *,
1500 populate_result_map,
1501 **kw,
1502 ):
1503 kw["include_table"] = False
1504 return super().returning_clause(
1505 stmt, returning_cols, populate_result_map=populate_result_map, **kw
1506 )
1507
1508 def limit_clause(self, select, **kw):
1509 text = ""
1510 if select._limit_clause is not None:
1511 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
1512 if select._offset_clause is not None:
1513 if select._limit_clause is None:
1514 text += "\n LIMIT " + self.process(sql.literal(-1))
1515 text += " OFFSET " + self.process(select._offset_clause, **kw)
1516 else:
1517 text += " OFFSET " + self.process(sql.literal(0), **kw)
1518 return text
1519
1520 def for_update_clause(self, select, **kw):
1521 # sqlite has no "FOR UPDATE" AFAICT
1522 return ""
1523
1524 def update_from_clause(
1525 self, update_stmt, from_table, extra_froms, from_hints, **kw
1526 ):
1527 kw["asfrom"] = True
1528 return "FROM " + ", ".join(
1529 t._compiler_dispatch(self, fromhints=from_hints, **kw)
1530 for t in extra_froms
1531 )
1532
1533 def visit_is_distinct_from_binary(self, binary, operator, **kw):
1534 return "%s IS NOT %s" % (
1535 self.process(binary.left),
1536 self.process(binary.right),
1537 )
1538
1539 def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
1540 return "%s IS %s" % (
1541 self.process(binary.left),
1542 self.process(binary.right),
1543 )
1544
1545 def visit_json_getitem_op_binary(
1546 self, binary, operator, _cast_applied=False, **kw
1547 ):
1548 if (
1549 not _cast_applied
1550 and binary.type._type_affinity is not sqltypes.JSON
1551 ):
1552 kw["_cast_applied"] = True
1553 return self.process(sql.cast(binary, binary.type), **kw)
1554
1555 if binary.type._type_affinity is sqltypes.JSON:
1556 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
1557 else:
1558 expr = "JSON_EXTRACT(%s, %s)"
1559
1560 return expr % (
1561 self.process(binary.left, **kw),
1562 self.process(binary.right, **kw),
1563 )
1564
1565 def visit_json_path_getitem_op_binary(
1566 self, binary, operator, _cast_applied=False, **kw
1567 ):
1568 if (
1569 not _cast_applied
1570 and binary.type._type_affinity is not sqltypes.JSON
1571 ):
1572 kw["_cast_applied"] = True
1573 return self.process(sql.cast(binary, binary.type), **kw)
1574
1575 if binary.type._type_affinity is sqltypes.JSON:
1576 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
1577 else:
1578 expr = "JSON_EXTRACT(%s, %s)"
1579
1580 return expr % (
1581 self.process(binary.left, **kw),
1582 self.process(binary.right, **kw),
1583 )
1584
1585 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
1586 # slightly old SQLite versions don't seem to be able to handle
1587 # the empty set impl
1588 return self.visit_empty_set_expr(type_)
1589
1590 def visit_empty_set_expr(self, element_types, **kw):
1591 return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % (
1592 ", ".join("1" for type_ in element_types or [INTEGER()]),
1593 ", ".join("1" for type_ in element_types or [INTEGER()]),
1594 )
1595
1596 def visit_regexp_match_op_binary(self, binary, operator, **kw):
1597 return self._generate_generic_binary(binary, " REGEXP ", **kw)
1598
1599 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
1600 return self._generate_generic_binary(binary, " NOT REGEXP ", **kw)
1601
1602 def _on_conflict_target(self, clause, **kw):
1603 if clause.inferred_target_elements is not None:
1604 target_text = "(%s)" % ", ".join(
1605 (
1606 self.preparer.quote(c)
1607 if isinstance(c, str)
1608 else self.process(c, include_table=False, use_schema=False)
1609 )
1610 for c in clause.inferred_target_elements
1611 )
1612 if clause.inferred_target_whereclause is not None:
1613 whereclause_kw = dict(kw)
1614 whereclause_kw.update(
1615 include_table=False,
1616 use_schema=False,
1617 literal_execute=True,
1618 )
1619 target_text += " WHERE %s" % self.process(
1620 clause.inferred_target_whereclause,
1621 **whereclause_kw,
1622 )
1623
1624 else:
1625 target_text = ""
1626
1627 return target_text
1628
1629 def visit_on_conflict_do_nothing(self, on_conflict, **kw):
1630 target_text = self._on_conflict_target(on_conflict, **kw)
1631
1632 if target_text:
1633 return "ON CONFLICT %s DO NOTHING" % target_text
1634 else:
1635 return "ON CONFLICT DO NOTHING"
1636
1637 def visit_on_conflict_do_update(self, on_conflict, **kw):
1638 clause = on_conflict
1639
1640 target_text = self._on_conflict_target(on_conflict, **kw)
1641
1642 action_set_ops = []
1643
1644 set_parameters = dict(clause.update_values_to_set)
1645 # create a list of column assignment clauses as tuples
1646
1647 insert_statement = self.stack[-1]["selectable"]
1648 cols = insert_statement.table.c
1649 set_kw = dict(kw)
1650 set_kw.update(use_schema=False)
1651 for c in cols:
1652 col_key = c.key
1653
1654 if col_key in set_parameters:
1655 value = set_parameters.pop(col_key)
1656 elif c in set_parameters:
1657 value = set_parameters.pop(c)
1658 else:
1659 continue
1660
1661 if (
1662 isinstance(value, elements.BindParameter)
1663 and value.type._isnull
1664 ):
1665 value = value._with_binary_element_type(c.type)
1666
1667 value_text = self.process(
1668 value.self_group(), is_upsert_set=True, **set_kw
1669 )
1670
1671 key_text = self.preparer.quote(c.name)
1672 action_set_ops.append("%s = %s" % (key_text, value_text))
1673
1674 # check for names that don't match columns
1675 if set_parameters:
1676 util.warn(
1677 "Additional column names not matching "
1678 "any column keys in table '%s': %s"
1679 % (
1680 self.current_executable.table.name,
1681 (", ".join("'%s'" % c for c in set_parameters)),
1682 )
1683 )
1684 for k, v in set_parameters.items():
1685 key_text = (
1686 self.preparer.quote(k)
1687 if isinstance(k, str)
1688 else self.process(k, **set_kw)
1689 )
1690 value_text = self.process(
1691 coercions.expect(roles.ExpressionElementRole, v),
1692 is_upsert_set=True,
1693 **set_kw,
1694 )
1695 action_set_ops.append("%s = %s" % (key_text, value_text))
1696
1697 action_text = ", ".join(action_set_ops)
1698 if clause.update_whereclause is not None:
1699 where_kw = dict(kw)
1700 where_kw.update(include_table=True, use_schema=False)
1701 action_text += " WHERE %s" % self.process(
1702 clause.update_whereclause, **where_kw
1703 )
1704
1705 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
1706
1707 def visit_bitwise_xor_op_binary(self, binary, operator, **kw):
1708 # sqlite has no xor. Use "a XOR b" = "(a | b) - (a & b)".
1709 kw["eager_grouping"] = True
1710 or_ = self._generate_generic_binary(binary, " | ", **kw)
1711 and_ = self._generate_generic_binary(binary, " & ", **kw)
1712 return f"({or_} - {and_})"
1713
1714
1715class SQLiteDDLCompiler(compiler.DDLCompiler):
1716 def get_column_specification(self, column, **kwargs):
1717 coltype = self.dialect.type_compiler_instance.process(
1718 column.type, type_expression=column
1719 )
1720 colspec = self.preparer.format_column(column) + " " + coltype
1721 default = self.get_column_default_string(column)
1722 if default is not None:
1723
1724 if not re.match(r"""^\s*[\'\"\(]""", default) and re.match(
1725 r".*\W.*", default
1726 ):
1727 colspec += f" DEFAULT ({default})"
1728 else:
1729 colspec += f" DEFAULT {default}"
1730
1731 if not column.nullable:
1732 colspec += " NOT NULL"
1733
1734 on_conflict_clause = column.dialect_options["sqlite"][
1735 "on_conflict_not_null"
1736 ]
1737 if on_conflict_clause is not None:
1738 colspec += " ON CONFLICT " + on_conflict_clause
1739
1740 if column.primary_key:
1741 if (
1742 column.autoincrement is True
1743 and len(column.table.primary_key.columns) != 1
1744 ):
1745 raise exc.CompileError(
1746 "SQLite does not support autoincrement for "
1747 "composite primary keys"
1748 )
1749
1750 if (
1751 column.table.dialect_options["sqlite"]["autoincrement"]
1752 and len(column.table.primary_key.columns) == 1
1753 and issubclass(column.type._type_affinity, sqltypes.Integer)
1754 and not column.foreign_keys
1755 ):
1756 colspec += " PRIMARY KEY"
1757
1758 on_conflict_clause = column.dialect_options["sqlite"][
1759 "on_conflict_primary_key"
1760 ]
1761 if on_conflict_clause is not None:
1762 colspec += " ON CONFLICT " + on_conflict_clause
1763
1764 colspec += " AUTOINCREMENT"
1765
1766 if column.computed is not None:
1767 colspec += " " + self.process(column.computed)
1768
1769 return colspec
1770
1771 def visit_primary_key_constraint(self, constraint, **kw):
1772 # for columns with sqlite_autoincrement=True,
1773 # the PRIMARY KEY constraint can only be inline
1774 # with the column itself.
1775 if len(constraint.columns) == 1:
1776 c = list(constraint)[0]
1777 if (
1778 c.primary_key
1779 and c.table.dialect_options["sqlite"]["autoincrement"]
1780 and issubclass(c.type._type_affinity, sqltypes.Integer)
1781 and not c.foreign_keys
1782 ):
1783 return None
1784
1785 text = super().visit_primary_key_constraint(constraint)
1786
1787 on_conflict_clause = constraint.dialect_options["sqlite"][
1788 "on_conflict"
1789 ]
1790 if on_conflict_clause is None and len(constraint.columns) == 1:
1791 on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][
1792 "on_conflict_primary_key"
1793 ]
1794
1795 if on_conflict_clause is not None:
1796 text += " ON CONFLICT " + on_conflict_clause
1797
1798 return text
1799
1800 def visit_unique_constraint(self, constraint, **kw):
1801 text = super().visit_unique_constraint(constraint)
1802
1803 on_conflict_clause = constraint.dialect_options["sqlite"][
1804 "on_conflict"
1805 ]
1806 if on_conflict_clause is None and len(constraint.columns) == 1:
1807 col1 = list(constraint)[0]
1808 if isinstance(col1, schema.SchemaItem):
1809 on_conflict_clause = list(constraint)[0].dialect_options[
1810 "sqlite"
1811 ]["on_conflict_unique"]
1812
1813 if on_conflict_clause is not None:
1814 text += " ON CONFLICT " + on_conflict_clause
1815
1816 return text
1817
1818 def visit_check_constraint(self, constraint, **kw):
1819 text = super().visit_check_constraint(constraint)
1820
1821 on_conflict_clause = constraint.dialect_options["sqlite"][
1822 "on_conflict"
1823 ]
1824
1825 if on_conflict_clause is not None:
1826 text += " ON CONFLICT " + on_conflict_clause
1827
1828 return text
1829
1830 def visit_column_check_constraint(self, constraint, **kw):
1831 text = super().visit_column_check_constraint(constraint)
1832
1833 if constraint.dialect_options["sqlite"]["on_conflict"] is not None:
1834 raise exc.CompileError(
1835 "SQLite does not support on conflict clause for "
1836 "column check constraint"
1837 )
1838
1839 return text
1840
1841 def visit_foreign_key_constraint(self, constraint, **kw):
1842 local_table = constraint.elements[0].parent.table
1843 remote_table = constraint.elements[0].column.table
1844
1845 if local_table.schema != remote_table.schema:
1846 return None
1847 else:
1848 return super().visit_foreign_key_constraint(constraint)
1849
1850 def define_constraint_remote_table(self, constraint, table, preparer):
1851 """Format the remote table clause of a CREATE CONSTRAINT clause."""
1852
1853 return preparer.format_table(table, use_schema=False)
1854
1855 def visit_create_index(
1856 self, create, include_schema=False, include_table_schema=True, **kw
1857 ):
1858 index = create.element
1859 self._verify_index_table(index)
1860 preparer = self.preparer
1861 text = "CREATE "
1862 if index.unique:
1863 text += "UNIQUE "
1864
1865 text += "INDEX "
1866
1867 if create.if_not_exists:
1868 text += "IF NOT EXISTS "
1869
1870 text += "%s ON %s (%s)" % (
1871 self._prepared_index_name(index, include_schema=True),
1872 preparer.format_table(index.table, use_schema=False),
1873 ", ".join(
1874 self.sql_compiler.process(
1875 expr, include_table=False, literal_binds=True
1876 )
1877 for expr in index.expressions
1878 ),
1879 )
1880
1881 whereclause = index.dialect_options["sqlite"]["where"]
1882 if whereclause is not None:
1883 where_compiled = self.sql_compiler.process(
1884 whereclause, include_table=False, literal_binds=True
1885 )
1886 text += " WHERE " + where_compiled
1887
1888 return text
1889
1890 def post_create_table(self, table):
1891 table_options = []
1892
1893 if not table.dialect_options["sqlite"]["with_rowid"]:
1894 table_options.append("WITHOUT ROWID")
1895
1896 if table.dialect_options["sqlite"]["strict"]:
1897 table_options.append("STRICT")
1898
1899 if table_options:
1900 return "\n " + ",\n ".join(table_options)
1901 else:
1902 return ""
1903
1904 def visit_create_view(self, create, **kw):
1905 """Handle SQLite if_not_exists dialect option for CREATE VIEW."""
1906 # Get the if_not_exists dialect option from the CreateView object
1907 if_not_exists = create.dialect_options["sqlite"].get(
1908 "if_not_exists", False
1909 )
1910
1911 # Pass if_not_exists through kw to the parent's _generate_table_select
1912 kw["if_not_exists"] = if_not_exists
1913 return super().visit_create_view(create, **kw)
1914
1915
1916class SQLiteTypeCompiler(compiler.GenericTypeCompiler):
1917 def visit_large_binary(self, type_, **kw):
1918 return self.visit_BLOB(type_)
1919
1920 def visit_DATETIME(self, type_, **kw):
1921 if (
1922 not isinstance(type_, _DateTimeMixin)
1923 or type_.format_is_text_affinity
1924 ):
1925 return super().visit_DATETIME(type_)
1926 else:
1927 return "DATETIME_CHAR"
1928
1929 def visit_DATE(self, type_, **kw):
1930 if (
1931 not isinstance(type_, _DateTimeMixin)
1932 or type_.format_is_text_affinity
1933 ):
1934 return super().visit_DATE(type_)
1935 else:
1936 return "DATE_CHAR"
1937
1938 def visit_TIME(self, type_, **kw):
1939 if (
1940 not isinstance(type_, _DateTimeMixin)
1941 or type_.format_is_text_affinity
1942 ):
1943 return super().visit_TIME(type_)
1944 else:
1945 return "TIME_CHAR"
1946
1947 def visit_JSON(self, type_, **kw):
1948 # note this name provides NUMERIC affinity, not TEXT.
1949 # should not be an issue unless the JSON value consists of a single
1950 # numeric value. JSONTEXT can be used if this case is required.
1951 return "JSON"
1952
1953
1954class SQLiteIdentifierPreparer(compiler.IdentifierPreparer):
1955 reserved_words = {
1956 "add",
1957 "after",
1958 "all",
1959 "alter",
1960 "analyze",
1961 "and",
1962 "as",
1963 "asc",
1964 "attach",
1965 "autoincrement",
1966 "before",
1967 "begin",
1968 "between",
1969 "by",
1970 "cascade",
1971 "case",
1972 "cast",
1973 "check",
1974 "collate",
1975 "column",
1976 "commit",
1977 "conflict",
1978 "constraint",
1979 "create",
1980 "cross",
1981 "current_date",
1982 "current_time",
1983 "current_timestamp",
1984 "database",
1985 "default",
1986 "deferrable",
1987 "deferred",
1988 "delete",
1989 "desc",
1990 "detach",
1991 "distinct",
1992 "drop",
1993 "each",
1994 "else",
1995 "end",
1996 "escape",
1997 "except",
1998 "exclusive",
1999 "exists",
2000 "explain",
2001 "false",
2002 "fail",
2003 "for",
2004 "foreign",
2005 "from",
2006 "full",
2007 "glob",
2008 "group",
2009 "having",
2010 "if",
2011 "ignore",
2012 "immediate",
2013 "in",
2014 "index",
2015 "indexed",
2016 "initially",
2017 "inner",
2018 "insert",
2019 "instead",
2020 "intersect",
2021 "into",
2022 "is",
2023 "isnull",
2024 "join",
2025 "key",
2026 "left",
2027 "like",
2028 "limit",
2029 "match",
2030 "natural",
2031 "not",
2032 "notnull",
2033 "null",
2034 "of",
2035 "offset",
2036 "on",
2037 "or",
2038 "order",
2039 "outer",
2040 "plan",
2041 "pragma",
2042 "primary",
2043 "query",
2044 "raise",
2045 "references",
2046 "reindex",
2047 "rename",
2048 "replace",
2049 "restrict",
2050 "right",
2051 "rollback",
2052 "row",
2053 "select",
2054 "set",
2055 "table",
2056 "temp",
2057 "temporary",
2058 "then",
2059 "to",
2060 "transaction",
2061 "trigger",
2062 "true",
2063 "union",
2064 "unique",
2065 "update",
2066 "using",
2067 "vacuum",
2068 "values",
2069 "view",
2070 "virtual",
2071 "when",
2072 "where",
2073 }
2074
2075
2076class SQLiteExecutionContext(default.DefaultExecutionContext):
2077 @util.memoized_property
2078 def _preserve_raw_colnames(self):
2079 return (
2080 not self.dialect._broken_dotted_colnames
2081 or self.execution_options.get("sqlite_raw_colnames", False)
2082 )
2083
2084 def _translate_colname(self, colname):
2085 # TODO: detect SQLite version 3.10.0 or greater;
2086 # see [ticket:3633]
2087
2088 # adjust for dotted column names. SQLite
2089 # in the case of UNION may store col names as
2090 # "tablename.colname", or if using an attached database,
2091 # "database.tablename.colname", in cursor.description
2092 if not self._preserve_raw_colnames and "." in colname:
2093 return colname.split(".")[-1], colname
2094 else:
2095 return colname, None
2096
2097
2098class SQLiteDialect(default.DefaultDialect):
2099 name = "sqlite"
2100 supports_alter = False
2101
2102 # SQlite supports "DEFAULT VALUES" but *does not* support
2103 # "VALUES (DEFAULT)"
2104 supports_default_values = True
2105 supports_default_metavalue = False
2106
2107 # sqlite issue:
2108 # https://github.com/python/cpython/issues/93421
2109 # note this parameter is no longer used by the ORM or default dialect
2110 # see #9414
2111 supports_sane_rowcount_returning = False
2112
2113 supports_empty_insert = False
2114 supports_cast = True
2115 supports_multivalues_insert = True
2116 use_insertmanyvalues = True
2117 tuple_in_values = True
2118 supports_statement_cache = True
2119 insert_null_pk_still_autoincrements = True
2120 insert_returning = True
2121 update_returning = True
2122 update_returning_multifrom = True
2123 delete_returning = True
2124 update_returning_multifrom = True
2125
2126 supports_default_metavalue = True
2127 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
2128
2129 default_metavalue_token = "NULL"
2130 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
2131 parenthesis."""
2132
2133 default_paramstyle = "qmark"
2134 execution_ctx_cls = SQLiteExecutionContext
2135 statement_compiler = SQLiteCompiler
2136 ddl_compiler = SQLiteDDLCompiler
2137 type_compiler_cls = SQLiteTypeCompiler
2138 preparer = SQLiteIdentifierPreparer
2139 ischema_names = ischema_names
2140 colspecs = colspecs
2141
2142 construct_arguments = [
2143 (
2144 sa_schema.Table,
2145 {
2146 "autoincrement": False,
2147 "with_rowid": True,
2148 "strict": False,
2149 },
2150 ),
2151 (sa_schema.Index, {"where": None}),
2152 (
2153 sa_schema.Column,
2154 {
2155 "on_conflict_primary_key": None,
2156 "on_conflict_not_null": None,
2157 "on_conflict_unique": None,
2158 },
2159 ),
2160 (sa_schema.Constraint, {"on_conflict": None}),
2161 (sa_ddl.CreateView, {"if_not_exists": False}),
2162 ]
2163
2164 _broken_fk_pragma_quotes = False
2165 _broken_dotted_colnames = False
2166
2167 def __init__(
2168 self,
2169 native_datetime: bool = False,
2170 json_serializer: Optional[Callable[..., Any]] = None,
2171 json_deserializer: Optional[Callable[..., Any]] = None,
2172 **kwargs: Any,
2173 ) -> None:
2174 default.DefaultDialect.__init__(self, **kwargs)
2175
2176 self._json_serializer = json_serializer
2177 self._json_deserializer = json_deserializer
2178
2179 # this flag used by pysqlite dialect, and perhaps others in the
2180 # future, to indicate the driver is handling date/timestamp
2181 # conversions (and perhaps datetime/time as well on some hypothetical
2182 # driver ?)
2183 self.native_datetime = native_datetime
2184
2185 if self.dbapi is not None:
2186 if self.dbapi.sqlite_version_info < (3, 7, 16):
2187 util.warn(
2188 "SQLite version %s is older than 3.7.16, and will not "
2189 "support right nested joins, as are sometimes used in "
2190 "more complex ORM scenarios. SQLAlchemy 1.4 and above "
2191 "no longer tries to rewrite these joins."
2192 % (self.dbapi.sqlite_version_info,)
2193 )
2194
2195 # NOTE: python 3.7 on fedora for me has SQLite 3.34.1. These
2196 # version checks are getting very stale.
2197 self._broken_dotted_colnames = self.dbapi.sqlite_version_info < (
2198 3,
2199 10,
2200 0,
2201 )
2202 self.supports_default_values = self.dbapi.sqlite_version_info >= (
2203 3,
2204 3,
2205 8,
2206 )
2207 self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3)
2208 self.supports_multivalues_insert = (
2209 # https://www.sqlite.org/releaselog/3_7_11.html
2210 self.dbapi.sqlite_version_info
2211 >= (3, 7, 11)
2212 )
2213 # see https://www.sqlalchemy.org/trac/ticket/2568
2214 # as well as https://www.sqlite.org/src/info/600482d161
2215 self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < (
2216 3,
2217 6,
2218 14,
2219 )
2220
2221 if self.dbapi.sqlite_version_info < (3, 35) or util.pypy:
2222 self.update_returning = self.delete_returning = (
2223 self.insert_returning
2224 ) = False
2225
2226 if self.dbapi.sqlite_version_info < (3, 32, 0):
2227 # https://www.sqlite.org/limits.html
2228 self.insertmanyvalues_max_parameters = 999
2229
2230 _isolation_lookup = util.immutabledict(
2231 {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0}
2232 )
2233
2234 def get_isolation_level_values(self, dbapi_connection):
2235 return list(self._isolation_lookup)
2236
2237 def set_isolation_level(
2238 self, dbapi_connection: DBAPIConnection, level: IsolationLevel
2239 ) -> None:
2240 isolation_level = self._isolation_lookup[level]
2241
2242 cursor = dbapi_connection.cursor()
2243 cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}")
2244 cursor.close()
2245
2246 def get_isolation_level(self, dbapi_connection):
2247 cursor = dbapi_connection.cursor()
2248 cursor.execute("PRAGMA read_uncommitted")
2249 res = cursor.fetchone()
2250 if res:
2251 value = res[0]
2252 else:
2253 # https://www.sqlite.org/changes.html#version_3_3_3
2254 # "Optional READ UNCOMMITTED isolation (instead of the
2255 # default isolation level of SERIALIZABLE) and
2256 # table level locking when database connections
2257 # share a common cache.""
2258 # pre-SQLite 3.3.0 default to 0
2259 value = 0
2260 cursor.close()
2261 if value == 0:
2262 return "SERIALIZABLE"
2263 elif value == 1:
2264 return "READ UNCOMMITTED"
2265 else:
2266 assert False, "Unknown isolation level %s" % value
2267
2268 @reflection.cache
2269 def get_schema_names(self, connection, **kw):
2270 s = "PRAGMA database_list"
2271 dl = connection.exec_driver_sql(s)
2272
2273 return [db[1] for db in dl if db[1] != "temp"]
2274
2275 def _format_schema(self, schema, table_name):
2276 if schema is not None:
2277 qschema = self.identifier_preparer.quote_identifier(schema)
2278 name = f"{qschema}.{table_name}"
2279 else:
2280 name = table_name
2281 return name
2282
2283 def _sqlite_main_query(
2284 self,
2285 table: str,
2286 type_: str,
2287 schema: Optional[str],
2288 sqlite_include_internal: bool,
2289 ):
2290 main = self._format_schema(schema, table)
2291 if not sqlite_include_internal:
2292 filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'"
2293 else:
2294 filter_table = ""
2295 query = (
2296 f"SELECT name FROM {main} "
2297 f"WHERE type='{type_}'{filter_table} "
2298 "ORDER BY name"
2299 )
2300 return query
2301
2302 @reflection.cache
2303 def get_table_names(
2304 self, connection, schema=None, sqlite_include_internal=False, **kw
2305 ):
2306 query = self._sqlite_main_query(
2307 "sqlite_master", "table", schema, sqlite_include_internal
2308 )
2309 names = connection.exec_driver_sql(query).scalars().all()
2310 return names
2311
2312 @reflection.cache
2313 def get_temp_table_names(
2314 self, connection, sqlite_include_internal=False, **kw
2315 ):
2316 query = self._sqlite_main_query(
2317 "sqlite_temp_master", "table", None, sqlite_include_internal
2318 )
2319 names = connection.exec_driver_sql(query).scalars().all()
2320 return names
2321
2322 @reflection.cache
2323 def get_temp_view_names(
2324 self, connection, sqlite_include_internal=False, **kw
2325 ):
2326 query = self._sqlite_main_query(
2327 "sqlite_temp_master", "view", None, sqlite_include_internal
2328 )
2329 names = connection.exec_driver_sql(query).scalars().all()
2330 return names
2331
2332 @reflection.cache
2333 def has_table(self, connection, table_name, schema=None, **kw):
2334 self._ensure_has_table_connection(connection)
2335
2336 if schema is not None and schema not in self.get_schema_names(
2337 connection, **kw
2338 ):
2339 return False
2340
2341 info = self._get_table_pragma(
2342 connection, "table_info", table_name, schema=schema
2343 )
2344 return bool(info)
2345
2346 def _get_default_schema_name(self, connection):
2347 return "main"
2348
2349 @reflection.cache
2350 def get_view_names(
2351 self, connection, schema=None, sqlite_include_internal=False, **kw
2352 ):
2353 query = self._sqlite_main_query(
2354 "sqlite_master", "view", schema, sqlite_include_internal
2355 )
2356 names = connection.exec_driver_sql(query).scalars().all()
2357 return names
2358
2359 @reflection.cache
2360 def get_view_definition(self, connection, view_name, schema=None, **kw):
2361 if schema is not None:
2362 qschema = self.identifier_preparer.quote_identifier(schema)
2363 master = f"{qschema}.sqlite_master"
2364 s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % (
2365 master,
2366 )
2367 rs = connection.exec_driver_sql(s, (view_name,))
2368 else:
2369 try:
2370 s = (
2371 "SELECT sql FROM "
2372 " (SELECT * FROM sqlite_master UNION ALL "
2373 " SELECT * FROM sqlite_temp_master) "
2374 "WHERE name = ? "
2375 "AND type='view'"
2376 )
2377 rs = connection.exec_driver_sql(s, (view_name,))
2378 except exc.DBAPIError:
2379 s = (
2380 "SELECT sql FROM sqlite_master WHERE name = ? "
2381 "AND type='view'"
2382 )
2383 rs = connection.exec_driver_sql(s, (view_name,))
2384
2385 result = rs.fetchall()
2386 if result:
2387 return result[0].sql
2388 else:
2389 raise exc.NoSuchTableError(
2390 f"{schema}.{view_name}" if schema else view_name
2391 )
2392
2393 @reflection.cache
2394 def get_columns(self, connection, table_name, schema=None, **kw):
2395 pragma = "table_info"
2396 # computed columns are threaded as hidden, they require table_xinfo
2397 if self.server_version_info >= (3, 31):
2398 pragma = "table_xinfo"
2399 info = self._get_table_pragma(
2400 connection, pragma, table_name, schema=schema
2401 )
2402 columns = []
2403 tablesql = None
2404 for row in info:
2405 name = row[1]
2406 type_ = row[2].upper()
2407 nullable = not row[3]
2408 default = row[4]
2409 primary_key = row[5]
2410 hidden = row[6] if pragma == "table_xinfo" else 0
2411
2412 # hidden has value 0 for normal columns, 1 for hidden columns,
2413 # 2 for computed virtual columns and 3 for computed stored columns
2414 # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b
2415 if hidden == 1:
2416 continue
2417
2418 generated = bool(hidden)
2419 persisted = hidden == 3
2420
2421 if tablesql is None and generated:
2422 tablesql = self._get_table_sql(
2423 connection, table_name, schema, **kw
2424 )
2425 # remove create table
2426 match = re.match(
2427 (
2428 r"create table .*?\((.*)\)"
2429 r"(?:\s*,?\s*(?:WITHOUT\s+ROWID|STRICT))*$"
2430 ),
2431 tablesql.strip(),
2432 re.DOTALL | re.IGNORECASE,
2433 )
2434 assert match, f"create table not found in {tablesql}"
2435 tablesql = match.group(1).strip()
2436
2437 columns.append(
2438 self._get_column_info(
2439 name,
2440 type_,
2441 nullable,
2442 default,
2443 primary_key,
2444 generated,
2445 persisted,
2446 tablesql,
2447 )
2448 )
2449 if columns:
2450 return columns
2451 elif not self.has_table(connection, table_name, schema):
2452 raise exc.NoSuchTableError(
2453 f"{schema}.{table_name}" if schema else table_name
2454 )
2455 else:
2456 return ReflectionDefaults.columns()
2457
2458 def _get_column_info(
2459 self,
2460 name,
2461 type_,
2462 nullable,
2463 default,
2464 primary_key,
2465 generated,
2466 persisted,
2467 tablesql,
2468 ):
2469 if generated:
2470 # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)"
2471 # somehow is "INTEGER GENERATED ALWAYS"
2472 type_ = re.sub("generated", "", type_, flags=re.IGNORECASE)
2473 type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip()
2474
2475 coltype = self._resolve_type_affinity(type_)
2476
2477 if default is not None:
2478 default = str(default)
2479
2480 colspec = {
2481 "name": name,
2482 "type": coltype,
2483 "nullable": nullable,
2484 "default": default,
2485 "primary_key": primary_key,
2486 }
2487 if generated:
2488 sqltext = ""
2489 if tablesql:
2490 pattern = (
2491 r"[^,]*\s+GENERATED\s+ALWAYS\s+AS"
2492 r"\s+\((.*)\)\s*(?:virtual|stored)?"
2493 )
2494 match = re.search(
2495 re.escape(name) + pattern, tablesql, re.IGNORECASE
2496 )
2497 if match:
2498 sqltext = match.group(1)
2499 colspec["computed"] = {"sqltext": sqltext, "persisted": persisted}
2500 return colspec
2501
2502 def _resolve_type_affinity(self, type_):
2503 """Return a data type from a reflected column, using affinity rules.
2504
2505 SQLite's goal for universal compatibility introduces some complexity
2506 during reflection, as a column's defined type might not actually be a
2507 type that SQLite understands - or indeed, my not be defined *at all*.
2508 Internally, SQLite handles this with a 'data type affinity' for each
2509 column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER',
2510 'REAL', or 'NONE' (raw bits). The algorithm that determines this is
2511 listed in https://www.sqlite.org/datatype3.html section 2.1.
2512
2513 This method allows SQLAlchemy to support that algorithm, while still
2514 providing access to smarter reflection utilities by recognizing
2515 column definitions that SQLite only supports through affinity (like
2516 DATE and DOUBLE).
2517
2518 """
2519 match = re.match(r"([\w ]+)(\(.*?\))?", type_)
2520 if match:
2521 coltype = match.group(1)
2522 args = match.group(2)
2523 else:
2524 coltype = ""
2525 args = ""
2526
2527 if coltype in self.ischema_names:
2528 coltype = self.ischema_names[coltype]
2529 elif "INT" in coltype:
2530 coltype = sqltypes.INTEGER
2531 elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype:
2532 coltype = sqltypes.TEXT
2533 elif "BLOB" in coltype or not coltype:
2534 coltype = sqltypes.NullType
2535 elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype:
2536 coltype = sqltypes.REAL
2537 else:
2538 coltype = sqltypes.NUMERIC
2539
2540 if args is not None:
2541 args = re.findall(r"(\d+)", args)
2542 try:
2543 coltype = coltype(*[int(a) for a in args])
2544 except TypeError:
2545 util.warn(
2546 "Could not instantiate type %s with "
2547 "reflected arguments %s; using no arguments."
2548 % (coltype, args)
2549 )
2550 coltype = coltype()
2551 else:
2552 coltype = coltype()
2553
2554 return coltype
2555
2556 @reflection.cache
2557 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
2558 constraint_name = None
2559 table_data = self._get_table_sql(connection, table_name, schema=schema)
2560 if table_data:
2561 PK_PATTERN = r'CONSTRAINT +(?:"(.+?)"|(\w+)) +PRIMARY KEY'
2562 result = re.search(PK_PATTERN, table_data, re.I)
2563 if result:
2564 constraint_name = result.group(1) or result.group(2)
2565 else:
2566 constraint_name = None
2567
2568 cols = self.get_columns(connection, table_name, schema, **kw)
2569 # consider only pk columns. This also avoids sorting the cached
2570 # value returned by get_columns
2571 cols = [col for col in cols if col.get("primary_key", 0) > 0]
2572 cols.sort(key=lambda col: col.get("primary_key"))
2573 pkeys = [col["name"] for col in cols]
2574
2575 if pkeys:
2576 return {"constrained_columns": pkeys, "name": constraint_name}
2577 else:
2578 return ReflectionDefaults.pk_constraint()
2579
2580 @reflection.cache
2581 def get_foreign_keys(self, connection, table_name, schema=None, **kw):
2582 # sqlite makes this *extremely difficult*.
2583 # First, use the pragma to get the actual FKs.
2584 pragma_fks = self._get_table_pragma(
2585 connection, "foreign_key_list", table_name, schema=schema
2586 )
2587
2588 fks = {}
2589
2590 for row in pragma_fks:
2591 (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4])
2592
2593 if not rcol:
2594 # no referred column, which means it was not named in the
2595 # original DDL. The referred columns of the foreign key
2596 # constraint are therefore the primary key of the referred
2597 # table.
2598 try:
2599 referred_pk = self.get_pk_constraint(
2600 connection, rtbl, schema=schema, **kw
2601 )
2602 referred_columns = referred_pk["constrained_columns"]
2603 except exc.NoSuchTableError:
2604 # ignore not existing parents
2605 referred_columns = []
2606 else:
2607 # note we use this list only if this is the first column
2608 # in the constraint. for subsequent columns we ignore the
2609 # list and append "rcol" if present.
2610 referred_columns = []
2611
2612 if self._broken_fk_pragma_quotes:
2613 rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl)
2614
2615 if numerical_id in fks:
2616 fk = fks[numerical_id]
2617 else:
2618 fk = fks[numerical_id] = {
2619 "name": None,
2620 "constrained_columns": [],
2621 "referred_schema": schema,
2622 "referred_table": rtbl,
2623 "referred_columns": referred_columns,
2624 "options": {},
2625 }
2626 fks[numerical_id] = fk
2627
2628 fk["constrained_columns"].append(lcol)
2629
2630 if rcol:
2631 fk["referred_columns"].append(rcol)
2632
2633 def fk_sig(constrained_columns, referred_table, referred_columns):
2634 return (
2635 tuple(constrained_columns)
2636 + (referred_table,)
2637 + tuple(referred_columns)
2638 )
2639
2640 # then, parse the actual SQL and attempt to find DDL that matches
2641 # the names as well. SQLite saves the DDL in whatever format
2642 # it was typed in as, so need to be liberal here.
2643
2644 keys_by_signature = {
2645 fk_sig(
2646 fk["constrained_columns"],
2647 fk["referred_table"],
2648 fk["referred_columns"],
2649 ): fk
2650 for fk in fks.values()
2651 }
2652
2653 table_data = self._get_table_sql(connection, table_name, schema=schema)
2654
2655 def parse_fks():
2656 if table_data is None:
2657 # system tables, etc.
2658 return
2659
2660 # note that we already have the FKs from PRAGMA above. This whole
2661 # regexp thing is trying to locate additional detail about the
2662 # FKs, namely the name of the constraint and other options.
2663 # so parsing the columns is really about matching it up to what
2664 # we already have.
2665 FK_PATTERN = (
2666 r'(?:CONSTRAINT +(?:"(.+?)"|(\w+)) +)?'
2667 r"FOREIGN KEY *\( *(.+?) *\) +"
2668 r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *' # noqa: E501
2669 r"((?:ON (?:DELETE|UPDATE) "
2670 r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)"
2671 r"((?:NOT +)?DEFERRABLE)?"
2672 r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?"
2673 )
2674 for match in re.finditer(FK_PATTERN, table_data, re.I):
2675 (
2676 constraint_quoted_name,
2677 constraint_name,
2678 constrained_columns,
2679 referred_quoted_name,
2680 referred_name,
2681 referred_columns,
2682 onupdatedelete,
2683 deferrable,
2684 initially,
2685 ) = match.group(1, 2, 3, 4, 5, 6, 7, 8, 9)
2686 constraint_name = constraint_quoted_name or constraint_name
2687 constrained_columns = list(
2688 self._find_cols_in_sig(constrained_columns)
2689 )
2690 if not referred_columns:
2691 referred_columns = constrained_columns
2692 else:
2693 referred_columns = list(
2694 self._find_cols_in_sig(referred_columns)
2695 )
2696 referred_name = referred_quoted_name or referred_name
2697 options = {}
2698
2699 for token in re.split(r" *\bON\b *", onupdatedelete.upper()):
2700 if token.startswith("DELETE"):
2701 ondelete = token[6:].strip()
2702 if ondelete and ondelete != "NO ACTION":
2703 options["ondelete"] = ondelete
2704 elif token.startswith("UPDATE"):
2705 onupdate = token[6:].strip()
2706 if onupdate and onupdate != "NO ACTION":
2707 options["onupdate"] = onupdate
2708
2709 if deferrable:
2710 options["deferrable"] = "NOT" not in deferrable.upper()
2711 if initially:
2712 options["initially"] = initially.upper()
2713
2714 yield (
2715 constraint_name,
2716 constrained_columns,
2717 referred_name,
2718 referred_columns,
2719 options,
2720 )
2721
2722 fkeys = []
2723
2724 for (
2725 constraint_name,
2726 constrained_columns,
2727 referred_name,
2728 referred_columns,
2729 options,
2730 ) in parse_fks():
2731 sig = fk_sig(constrained_columns, referred_name, referred_columns)
2732 if sig not in keys_by_signature:
2733 util.warn(
2734 "WARNING: SQL-parsed foreign key constraint "
2735 "'%s' could not be located in PRAGMA "
2736 "foreign_keys for table %s" % (sig, table_name)
2737 )
2738 continue
2739 key = keys_by_signature.pop(sig)
2740 key["name"] = constraint_name
2741 key["options"] = options
2742 fkeys.append(key)
2743 # assume the remainders are the unnamed, inline constraints, just
2744 # use them as is as it's extremely difficult to parse inline
2745 # constraints
2746 fkeys.extend(keys_by_signature.values())
2747 if fkeys:
2748 return fkeys
2749 else:
2750 return ReflectionDefaults.foreign_keys()
2751
2752 def _find_cols_in_sig(self, sig):
2753 for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I):
2754 yield match.group(1) or match.group(2)
2755
2756 @reflection.cache
2757 def get_unique_constraints(
2758 self, connection, table_name, schema=None, **kw
2759 ):
2760 auto_index_by_sig = {}
2761 for idx in self.get_indexes(
2762 connection,
2763 table_name,
2764 schema=schema,
2765 include_auto_indexes=True,
2766 **kw,
2767 ):
2768 if not idx["name"].startswith("sqlite_autoindex"):
2769 continue
2770 sig = tuple(idx["column_names"])
2771 auto_index_by_sig[sig] = idx
2772
2773 table_data = self._get_table_sql(
2774 connection, table_name, schema=schema, **kw
2775 )
2776 unique_constraints = []
2777
2778 def parse_uqs():
2779 if table_data is None:
2780 return
2781 UNIQUE_PATTERN = (
2782 r'(?:CONSTRAINT +(?:"(.+?)"|(\w+)) +)?UNIQUE *\((.+?)\)'
2783 )
2784 INLINE_UNIQUE_PATTERN = (
2785 r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?)[\t ]'
2786 r"+[a-z0-9_ ]+?[\t ]+UNIQUE"
2787 )
2788
2789 for match in re.finditer(UNIQUE_PATTERN, table_data, re.I):
2790 quoted_name, unquoted_name, cols = match.group(1, 2, 3)
2791 name = quoted_name or unquoted_name
2792 yield name, list(self._find_cols_in_sig(cols))
2793
2794 # we need to match inlines as well, as we seek to differentiate
2795 # a UNIQUE constraint from a UNIQUE INDEX, even though these
2796 # are kind of the same thing :)
2797 for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I):
2798 cols = list(
2799 self._find_cols_in_sig(match.group(1) or match.group(2))
2800 )
2801 yield None, cols
2802
2803 for name, cols in parse_uqs():
2804 sig = tuple(cols)
2805 if sig in auto_index_by_sig:
2806 auto_index_by_sig.pop(sig)
2807 parsed_constraint = {"name": name, "column_names": cols}
2808 unique_constraints.append(parsed_constraint)
2809 # NOTE: auto_index_by_sig might not be empty here,
2810 # the PRIMARY KEY may have an entry.
2811 if unique_constraints:
2812 return unique_constraints
2813 else:
2814 return ReflectionDefaults.unique_constraints()
2815
2816 @reflection.cache
2817 def get_check_constraints(self, connection, table_name, schema=None, **kw):
2818 table_data = self._get_table_sql(
2819 connection, table_name, schema=schema, **kw
2820 )
2821
2822 # Extract CHECK constraints by properly handling balanced parentheses
2823 # and avoiding false matches when CHECK/CONSTRAINT appear in table
2824 # names. See #12924 for context.
2825 #
2826 # SQLite supports 4 identifier quote styles (see
2827 # sqlite.org/lang_keywords.html):
2828 # - Double quotes "..." (standard SQL)
2829 # - Brackets [...] (MS Access/SQL Server compatibility)
2830 # - Backticks `...` (MySQL compatibility)
2831 # - Single quotes '...' (SQLite extension)
2832 #
2833 # NOTE: there is not currently a way to parse CHECK constraints that
2834 # contain newlines as the approach here relies upon each individual
2835 # CHECK constraint being on a single line by itself. This necessarily
2836 # makes assumptions as to how the CREATE TABLE was emitted.
2837 CHECK_PATTERN = re.compile(
2838 r"""
2839 (?<![A-Za-z0-9_]) # Negative lookbehind: ensure CHECK is not
2840 # part of an identifier (e.g., table name
2841 # like "tableCHECK")
2842
2843 (?: # Optional CONSTRAINT clause
2844 CONSTRAINT\s+
2845 ( # Group 1: Constraint name (quoted or unquoted)
2846 "(?:[^"]|"")+" # Double-quoted: "name" or "na""me"
2847 |'(?:[^']|'')+' # Single-quoted: 'name' or 'na''me'
2848 |\[(?:[^\]]|\]\])+\] # Bracket-quoted: [name] or [na]]me]
2849 |`(?:[^`]|``)+` # Backtick-quoted: `name` or `na``me`
2850 |\S+ # Unquoted: simple_name
2851 )
2852 \s+
2853 )?
2854
2855 CHECK\s*\( # CHECK keyword followed by opening paren
2856 """,
2857 re.VERBOSE | re.IGNORECASE,
2858 )
2859 cks = []
2860
2861 for match in re.finditer(CHECK_PATTERN, table_data or ""):
2862 constraint_name = match.group(1)
2863
2864 if constraint_name:
2865 # Remove surrounding quotes if present
2866 # Double quotes: "name" -> name
2867 # Single quotes: 'name' -> name
2868 # Brackets: [name] -> name
2869 # Backticks: `name` -> name
2870 constraint_name = re.sub(
2871 r'^(["\'`])(.+)\1$|^\[(.+)\]$',
2872 lambda m: m.group(2) or m.group(3),
2873 constraint_name,
2874 flags=re.DOTALL,
2875 )
2876
2877 # Find the matching closing parenthesis by counting balanced parens
2878 # Must track string context to ignore parens inside string literals
2879 start = match.end() # Position after 'CHECK ('
2880 paren_count = 1
2881 in_single_quote = False
2882 in_double_quote = False
2883
2884 for pos, char in enumerate(table_data[start:], start):
2885 # Track string literal context
2886 if char == "'" and not in_double_quote:
2887 in_single_quote = not in_single_quote
2888 elif char == '"' and not in_single_quote:
2889 in_double_quote = not in_double_quote
2890 # Only count parens when not inside a string literal
2891 elif not in_single_quote and not in_double_quote:
2892 if char == "(":
2893 paren_count += 1
2894 elif char == ")":
2895 paren_count -= 1
2896 if paren_count == 0:
2897 # Successfully found matching closing parenthesis
2898 sqltext = table_data[start:pos].strip()
2899 cks.append(
2900 {"sqltext": sqltext, "name": constraint_name}
2901 )
2902 break
2903
2904 cks.sort(key=lambda d: d["name"] or "~") # sort None as last
2905 if cks:
2906 return cks
2907 else:
2908 return ReflectionDefaults.check_constraints()
2909
2910 @reflection.cache
2911 def get_indexes(self, connection, table_name, schema=None, **kw):
2912 pragma_indexes = self._get_table_pragma(
2913 connection, "index_list", table_name, schema=schema
2914 )
2915 indexes = []
2916
2917 # regular expression to extract the filter predicate of a partial
2918 # index. this could fail to extract the predicate correctly on
2919 # indexes created like
2920 # CREATE INDEX i ON t (col || ') where') WHERE col <> ''
2921 # but as this function does not support expression-based indexes
2922 # this case does not occur.
2923 partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE)
2924
2925 if schema:
2926 schema_expr = "%s." % self.identifier_preparer.quote_identifier(
2927 schema
2928 )
2929 else:
2930 schema_expr = ""
2931
2932 include_auto_indexes = kw.pop("include_auto_indexes", False)
2933 for row in pragma_indexes:
2934 # ignore implicit primary key index.
2935 # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html
2936 if not include_auto_indexes and row[1].startswith(
2937 "sqlite_autoindex"
2938 ):
2939 continue
2940 indexes.append(
2941 dict(
2942 name=row[1],
2943 column_names=[],
2944 unique=row[2],
2945 dialect_options={},
2946 )
2947 )
2948
2949 # check partial indexes
2950 if len(row) >= 5 and row[4]:
2951 s = (
2952 "SELECT sql FROM %(schema)ssqlite_master "
2953 "WHERE name = ? "
2954 "AND type = 'index'" % {"schema": schema_expr}
2955 )
2956 rs = connection.exec_driver_sql(s, (row[1],))
2957 index_sql = rs.scalar()
2958 predicate_match = partial_pred_re.search(index_sql)
2959 if predicate_match is None:
2960 # unless the regex is broken this case shouldn't happen
2961 # because we know this is a partial index, so the
2962 # definition sql should match the regex
2963 util.warn(
2964 "Failed to look up filter predicate of "
2965 "partial index %s" % row[1]
2966 )
2967 else:
2968 predicate = predicate_match.group(1)
2969 indexes[-1]["dialect_options"]["sqlite_where"] = text(
2970 predicate
2971 )
2972
2973 # loop thru unique indexes to get the column names.
2974 for idx in list(indexes):
2975 pragma_index = self._get_table_pragma(
2976 connection, "index_info", idx["name"], schema=schema
2977 )
2978
2979 for row in pragma_index:
2980 if row[2] is None:
2981 util.warn(
2982 "Skipped unsupported reflection of "
2983 "expression-based index %s" % idx["name"]
2984 )
2985 indexes.remove(idx)
2986 break
2987 else:
2988 idx["column_names"].append(row[2])
2989
2990 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last
2991 if indexes:
2992 return indexes
2993 elif not self.has_table(connection, table_name, schema):
2994 raise exc.NoSuchTableError(
2995 f"{schema}.{table_name}" if schema else table_name
2996 )
2997 else:
2998 return ReflectionDefaults.indexes()
2999
3000 def _is_sys_table(self, table_name):
3001 return table_name in {
3002 "sqlite_schema",
3003 "sqlite_master",
3004 "sqlite_temp_schema",
3005 "sqlite_temp_master",
3006 }
3007
3008 @reflection.cache
3009 def _get_table_sql(self, connection, table_name, schema=None, **kw):
3010 if schema:
3011 schema_expr = "%s." % (
3012 self.identifier_preparer.quote_identifier(schema)
3013 )
3014 else:
3015 schema_expr = ""
3016 try:
3017 s = (
3018 "SELECT sql FROM "
3019 " (SELECT * FROM %(schema)ssqlite_master UNION ALL "
3020 " SELECT * FROM %(schema)ssqlite_temp_master) "
3021 "WHERE name = ? "
3022 "AND type in ('table', 'view')" % {"schema": schema_expr}
3023 )
3024 rs = connection.exec_driver_sql(s, (table_name,))
3025 except exc.DBAPIError:
3026 s = (
3027 "SELECT sql FROM %(schema)ssqlite_master "
3028 "WHERE name = ? "
3029 "AND type in ('table', 'view')" % {"schema": schema_expr}
3030 )
3031 rs = connection.exec_driver_sql(s, (table_name,))
3032 value = rs.scalar()
3033 if value is None and not self._is_sys_table(table_name):
3034 raise exc.NoSuchTableError(f"{schema_expr}{table_name}")
3035 return value
3036
3037 def _get_table_pragma(self, connection, pragma, table_name, schema=None):
3038 quote = self.identifier_preparer.quote_identifier
3039 if schema is not None:
3040 statements = [f"PRAGMA {quote(schema)}."]
3041 else:
3042 # because PRAGMA looks in all attached databases if no schema
3043 # given, need to specify "main" schema, however since we want
3044 # 'temp' tables in the same namespace as 'main', need to run
3045 # the PRAGMA twice
3046 statements = ["PRAGMA main.", "PRAGMA temp."]
3047
3048 qtable = quote(table_name)
3049 for statement in statements:
3050 statement = f"{statement}{pragma}({qtable})"
3051 cursor = connection.exec_driver_sql(statement)
3052 if not cursor._soft_closed:
3053 # work around SQLite issue whereby cursor.description
3054 # is blank when PRAGMA returns no rows:
3055 # https://www.sqlite.org/cvstrac/tktview?tn=1884
3056 result = cursor.fetchall()
3057 else:
3058 result = []
3059 if result:
3060 return result
3061 else:
3062 return []