1# dialects/sqlite/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10r'''
11.. dialect:: sqlite
12 :name: SQLite
13 :normal_support: 3.12+
14 :best_effort: 3.7.16+
15
16.. _sqlite_datetime:
17
18Date and Time Types
19-------------------
20
21SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does
22not provide out of the box functionality for translating values between Python
23`datetime` objects and a SQLite-supported format. SQLAlchemy's own
24:class:`~sqlalchemy.types.DateTime` and related types provide date formatting
25and parsing functionality when SQLite is used. The implementation classes are
26:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`.
27These types represent dates and times as ISO formatted strings, which also
28nicely support ordering. There's no reliance on typical "libc" internals for
29these functions so historical dates are fully supported.
30
31Ensuring Text affinity
32^^^^^^^^^^^^^^^^^^^^^^
33
34The DDL rendered for these types is the standard ``DATE``, ``TIME``
35and ``DATETIME`` indicators. However, custom storage formats can also be
36applied to these types. When the
37storage format is detected as containing no alpha characters, the DDL for
38these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``,
39so that the column continues to have textual affinity.
40
41.. seealso::
42
43 `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ -
44 in the SQLite documentation
45
46.. _sqlite_autoincrement:
47
48SQLite Auto Incrementing Behavior
49----------------------------------
50
51Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html
52
53Key concepts:
54
55* SQLite has an implicit "auto increment" feature that takes place for any
56 non-composite primary-key column that is specifically created using
57 "INTEGER PRIMARY KEY" for the type + primary key.
58
59* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not**
60 equivalent to the implicit autoincrement feature; this keyword is not
61 recommended for general use. SQLAlchemy does not render this keyword
62 unless a special SQLite-specific directive is used (see below). However,
63 it still requires that the column's type is named "INTEGER".
64
65Using the AUTOINCREMENT Keyword
66^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
67
68To specifically render the AUTOINCREMENT keyword on the primary key column
69when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table
70construct::
71
72 Table(
73 "sometable",
74 metadata,
75 Column("id", Integer, primary_key=True),
76 sqlite_autoincrement=True,
77 )
78
79Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER
80^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
81
82SQLite's typing model is based on naming conventions. Among other things, this
83means that any type name which contains the substring ``"INT"`` will be
84determined to be of "integer affinity". A type named ``"BIGINT"``,
85``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be
86of "integer" affinity. However, **the SQLite autoincrement feature, whether
87implicitly or explicitly enabled, requires that the name of the column's type
88is exactly the string "INTEGER"**. Therefore, if an application uses a type
89like :class:`.BigInteger` for a primary key, on SQLite this type will need to
90be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE
91TABLE`` statement in order for the autoincrement behavior to be available.
92
93One approach to achieve this is to use :class:`.Integer` on SQLite
94only using :meth:`.TypeEngine.with_variant`::
95
96 table = Table(
97 "my_table",
98 metadata,
99 Column(
100 "id",
101 BigInteger().with_variant(Integer, "sqlite"),
102 primary_key=True,
103 ),
104 )
105
106Another is to use a subclass of :class:`.BigInteger` that overrides its DDL
107name to be ``INTEGER`` when compiled against SQLite::
108
109 from sqlalchemy import BigInteger
110 from sqlalchemy.ext.compiler import compiles
111
112
113 class SLBigInteger(BigInteger):
114 pass
115
116
117 @compiles(SLBigInteger, "sqlite")
118 def bi_c(element, compiler, **kw):
119 return "INTEGER"
120
121
122 @compiles(SLBigInteger)
123 def bi_c(element, compiler, **kw):
124 return compiler.visit_BIGINT(element, **kw)
125
126
127 table = Table(
128 "my_table", metadata, Column("id", SLBigInteger(), primary_key=True)
129 )
130
131.. seealso::
132
133 :meth:`.TypeEngine.with_variant`
134
135 :ref:`sqlalchemy.ext.compiler_toplevel`
136
137 `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_
138
139.. _sqlite_transactions:
140
141Transactions with SQLite and the sqlite3 driver
142-----------------------------------------------
143
144As a file-based database, SQLite's approach to transactions differs from
145traditional databases in many ways. Additionally, the ``sqlite3`` driver
146standard with Python (as well as the async version ``aiosqlite`` which builds
147on top of it) has several quirks, workarounds, and API features in the
148area of transaction control, all of which generally need to be addressed when
149constructing a SQLAlchemy application that uses SQLite.
150
151Legacy Transaction Mode with the sqlite3 driver
152^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
153
154The most important aspect of transaction handling with the sqlite3 driver is
155that it defaults (which will continue through Python 3.15 before being
156removed in Python 3.16) to legacy transactional behavior which does
157not strictly follow :pep:`249`. The way in which the driver diverges from the
158PEP is that it does not "begin" a transaction automatically as dictated by
159:pep:`249` except in the case of DML statements, e.g. INSERT, UPDATE, and
160DELETE. Normally, :pep:`249` dictates that a BEGIN must be emitted upon
161the first SQL statement of any kind, so that all subsequent operations will
162be established within a transaction until ``connection.commit()`` has been
163called. The ``sqlite3`` driver, in an effort to be easier to use in
164highly concurrent environments, skips this step for DQL (e.g. SELECT) statements,
165and also skips it for DDL (e.g. CREATE TABLE etc.) statements for more legacy
166reasons. Statements such as SAVEPOINT are also skipped.
167
168In modern versions of the ``sqlite3`` driver as of Python 3.12, this legacy
169mode of operation is referred to as
170`"legacy transaction control" <https://docs.python.org/3/library/sqlite3.html#sqlite3-transaction-control-isolation-level>`_, and is in
171effect by default due to the ``Connection.autocommit`` parameter being set to
172the constant ``sqlite3.LEGACY_TRANSACTION_CONTROL``. Prior to Python 3.12,
173the ``Connection.autocommit`` attribute did not exist.
174
175The implications of legacy transaction mode include:
176
177* **Incorrect support for transactional DDL** - statements like CREATE TABLE, ALTER TABLE,
178 CREATE INDEX etc. will not automatically BEGIN a transaction if one were not
179 started already, leading to the changes by each statement being
180 "autocommitted" immediately unless BEGIN were otherwise emitted first. Very
181 old (pre Python 3.6) versions of SQLite would also force a COMMIT for these
182 operations even if a transaction were present, however this is no longer the
183 case.
184* **SERIALIZABLE behavior not fully functional** - SQLite's transaction isolation
185 behavior is normally consistent with SERIALIZABLE isolation, as it is a file-
186 based system that locks the database file entirely for write operations,
187 preventing COMMIT until all reader transactions (and associated file locks)
188 have completed. However, sqlite3's legacy transaction mode fails to emit BEGIN for SELECT
189 statements, which causes these SELECT statements to no longer be "repeatable",
190 failing one of the consistency guarantees of SERIALIZABLE.
191* **Incorrect behavior for SAVEPOINT** - as the SAVEPOINT statement does not
192 imply a BEGIN, a new SAVEPOINT emitted before a BEGIN will function on its
193 own but fails to participate in the enclosing transaction, meaning a ROLLBACK
194 of the transaction will not rollback elements that were part of a released
195 savepoint.
196
197Legacy transaction mode first existed in order to faciliate working around
198SQLite's file locks. Because SQLite relies upon whole-file locks, it is easy to
199get "database is locked" errors, particularly when newer features like "write
200ahead logging" are disabled. This is a key reason why ``sqlite3``'s legacy
201transaction mode is still the default mode of operation; disabling it will
202produce behavior that is more susceptible to locked database errors. However
203note that **legacy transaction mode will no longer be the default** in a future
204Python version (3.16 as of this writing).
205
206.. _sqlite_enabling_transactions:
207
208Enabling Non-Legacy SQLite Transactional Modes with the sqlite3 or aiosqlite driver
209^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
210
211Current SQLAlchemy support allows either for setting the
212``.Connection.autocommit`` attribute, most directly by using a
213:func:`._sa.create_engine` parameter, or if on an older version of Python where
214the attribute is not available, using event hooks to control the behavior of
215BEGIN.
216
217* **Enabling modern sqlite3 transaction control via the autocommit connect parameter** (Python 3.12 and above)
218
219 To use SQLite in the mode described at `Transaction control via the autocommit attribute <https://docs.python.org/3/library/sqlite3.html#transaction-control-via-the-autocommit-attribute>`_,
220 the most straightforward approach is to set the attribute to its recommended value
221 of ``False`` at the connect level using :paramref:`_sa.create_engine.connect_args``::
222
223 from sqlalchemy import create_engine
224
225 engine = create_engine(
226 "sqlite:///myfile.db", connect_args={"autocommit": False}
227 )
228
229 This parameter is also passed through when using the aiosqlite driver::
230
231 from sqlalchemy.ext.asyncio import create_async_engine
232
233 engine = create_async_engine(
234 "sqlite+aiosqlite:///myfile.db", connect_args={"autocommit": False}
235 )
236
237 The parameter can also be set at the attribute level using the :meth:`.PoolEvents.connect`
238 event hook, however this will only work for sqlite3, as aiosqlite does not yet expose this
239 attribute on its ``Connection`` object::
240
241 from sqlalchemy import create_engine, event
242
243 engine = create_engine("sqlite:///myfile.db")
244
245
246 @event.listens_for(engine, "connect")
247 def do_connect(dbapi_connection, connection_record):
248 # enable autocommit=False mode
249 dbapi_connection.autocommit = False
250
251* **Using SQLAlchemy to emit BEGIN in lieu of SQLite's transaction control** (all Python versions, sqlite3 and aiosqlite)
252
253 For older versions of ``sqlite3`` or for cross-compatiblity with older and
254 newer versions, SQLAlchemy can also take over the job of transaction control.
255 This is achieved by using the :meth:`.ConnectionEvents.begin` hook
256 to emit the "BEGIN" command directly, while also disabling SQLite's control
257 of this command using the :meth:`.PoolEvents.connect` event hook to set the
258 ``Connection.isolation_level`` attribute to ``None``::
259
260
261 from sqlalchemy import create_engine, event
262
263 engine = create_engine("sqlite:///myfile.db")
264
265
266 @event.listens_for(engine, "connect")
267 def do_connect(dbapi_connection, connection_record):
268 # disable sqlite3's emitting of the BEGIN statement entirely.
269 dbapi_connection.isolation_level = None
270
271
272 @event.listens_for(engine, "begin")
273 def do_begin(conn):
274 # emit our own BEGIN. sqlite3 still emits COMMIT/ROLLBACK correctly
275 conn.exec_driver_sql("BEGIN")
276
277 When using the asyncio variant ``aiosqlite``, refer to ``engine.sync_engine``
278 as in the example below::
279
280 from sqlalchemy import create_engine, event
281 from sqlalchemy.ext.asyncio import create_async_engine
282
283 engine = create_async_engine("sqlite+aiosqlite:///myfile.db")
284
285
286 @event.listens_for(engine.sync_engine, "connect")
287 def do_connect(dbapi_connection, connection_record):
288 # disable aiosqlite's emitting of the BEGIN statement entirely.
289 dbapi_connection.isolation_level = None
290
291
292 @event.listens_for(engine.sync_engine, "begin")
293 def do_begin(conn):
294 # emit our own BEGIN. aiosqlite still emits COMMIT/ROLLBACK correctly
295 conn.exec_driver_sql("BEGIN")
296
297.. _sqlite_isolation_level:
298
299Using SQLAlchemy's Driver Level AUTOCOMMIT Feature with SQLite
300^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
301
302SQLAlchemy has a comprehensive database isolation feature with optional
303autocommit support that is introduced in the section :ref:`dbapi_autocommit`.
304
305For the ``sqlite3`` and ``aiosqlite`` drivers, SQLAlchemy only includes
306built-in support for "AUTOCOMMIT". Note that this mode is currently incompatible
307with the non-legacy isolation mode hooks documented in the previous
308section at :ref:`sqlite_enabling_transactions`.
309
310To use the ``sqlite3`` driver with SQLAlchemy driver-level autocommit,
311create an engine setting the :paramref:`_sa.create_engine.isolation_level`
312parameter to "AUTOCOMMIT"::
313
314 eng = create_engine("sqlite:///myfile.db", isolation_level="AUTOCOMMIT")
315
316When using the above mode, any event hooks that set the sqlite3 ``Connection.autocommit``
317parameter away from its default of ``sqlite3.LEGACY_TRANSACTION_CONTROL``
318as well as hooks that emit ``BEGIN`` should be disabled.
319
320Additional Reading for SQLite / sqlite3 transaction control
321^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
322
323Links with important information on SQLite, the sqlite3 driver,
324as well as long historical conversations on how things got to their current state:
325
326* `Isolation in SQLite <https://www.sqlite.org/isolation.html>`_ - on the SQLite website
327* `Transaction control <https://docs.python.org/3/library/sqlite3.html#transaction-control>`_ - describes the sqlite3 autocommit attribute as well
328 as the legacy isolation_level attribute.
329* `sqlite3 SELECT does not BEGIN a transaction, but should according to spec <https://github.com/python/cpython/issues/54133>`_ - imported Python standard library issue on github
330* `sqlite3 module breaks transactions and potentially corrupts data <https://github.com/python/cpython/issues/54949>`_ - imported Python standard library issue on github
331
332
333INSERT/UPDATE/DELETE...RETURNING
334---------------------------------
335
336The SQLite dialect supports SQLite 3.35's ``INSERT|UPDATE|DELETE..RETURNING``
337syntax. ``INSERT..RETURNING`` may be used
338automatically in some cases in order to fetch newly generated identifiers in
339place of the traditional approach of using ``cursor.lastrowid``, however
340``cursor.lastrowid`` is currently still preferred for simple single-statement
341cases for its better performance.
342
343To specify an explicit ``RETURNING`` clause, use the
344:meth:`._UpdateBase.returning` method on a per-statement basis::
345
346 # INSERT..RETURNING
347 result = connection.execute(
348 table.insert().values(name="foo").returning(table.c.col1, table.c.col2)
349 )
350 print(result.all())
351
352 # UPDATE..RETURNING
353 result = connection.execute(
354 table.update()
355 .where(table.c.name == "foo")
356 .values(name="bar")
357 .returning(table.c.col1, table.c.col2)
358 )
359 print(result.all())
360
361 # DELETE..RETURNING
362 result = connection.execute(
363 table.delete()
364 .where(table.c.name == "foo")
365 .returning(table.c.col1, table.c.col2)
366 )
367 print(result.all())
368
369.. versionadded:: 2.0 Added support for SQLite RETURNING
370
371
372.. _sqlite_foreign_keys:
373
374Foreign Key Support
375-------------------
376
377SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables,
378however by default these constraints have no effect on the operation of the
379table.
380
381Constraint checking on SQLite has three prerequisites:
382
383* At least version 3.6.19 of SQLite must be in use
384* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY
385 or SQLITE_OMIT_TRIGGER symbols enabled.
386* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all
387 connections before use -- including the initial call to
388 :meth:`sqlalchemy.schema.MetaData.create_all`.
389
390SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for
391new connections through the usage of events::
392
393 from sqlalchemy.engine import Engine
394 from sqlalchemy import event
395
396
397 @event.listens_for(Engine, "connect")
398 def set_sqlite_pragma(dbapi_connection, connection_record):
399 # the sqlite3 driver will not set PRAGMA foreign_keys
400 # if autocommit=False; set to True temporarily
401 ac = dbapi_connection.autocommit
402 dbapi_connection.autocommit = True
403
404 cursor = dbapi_connection.cursor()
405 cursor.execute("PRAGMA foreign_keys=ON")
406 cursor.close()
407
408 # restore previous autocommit setting
409 dbapi_connection.autocommit = ac
410
411.. warning::
412
413 When SQLite foreign keys are enabled, it is **not possible**
414 to emit CREATE or DROP statements for tables that contain
415 mutually-dependent foreign key constraints;
416 to emit the DDL for these tables requires that ALTER TABLE be used to
417 create or drop these constraints separately, for which SQLite has
418 no support.
419
420.. seealso::
421
422 `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_
423 - on the SQLite web site.
424
425 :ref:`event_toplevel` - SQLAlchemy event API.
426
427 :ref:`use_alter` - more information on SQLAlchemy's facilities for handling
428 mutually-dependent foreign key constraints.
429
430.. _sqlite_on_conflict_ddl:
431
432ON CONFLICT support for constraints
433-----------------------------------
434
435.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for
436 SQLite, which occurs within a CREATE TABLE statement. For "ON CONFLICT" as
437 applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`.
438
439SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied
440to primary key, unique, check, and not null constraints. In DDL, it is
441rendered either within the "CONSTRAINT" clause or within the column definition
442itself depending on the location of the target constraint. To render this
443clause within DDL, the extension parameter ``sqlite_on_conflict`` can be
444specified with a string conflict resolution algorithm within the
445:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`,
446:class:`.CheckConstraint` objects. Within the :class:`_schema.Column` object,
447there
448are individual parameters ``sqlite_on_conflict_not_null``,
449``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each
450correspond to the three types of relevant constraint types that can be
451indicated from a :class:`_schema.Column` object.
452
453.. seealso::
454
455 `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite
456 documentation
457
458The ``sqlite_on_conflict`` parameters accept a string argument which is just
459the resolution name to be chosen, which on SQLite can be one of ROLLBACK,
460ABORT, FAIL, IGNORE, and REPLACE. For example, to add a UNIQUE constraint
461that specifies the IGNORE algorithm::
462
463 some_table = Table(
464 "some_table",
465 metadata,
466 Column("id", Integer, primary_key=True),
467 Column("data", Integer),
468 UniqueConstraint("id", "data", sqlite_on_conflict="IGNORE"),
469 )
470
471The above renders CREATE TABLE DDL as:
472
473.. sourcecode:: sql
474
475 CREATE TABLE some_table (
476 id INTEGER NOT NULL,
477 data INTEGER,
478 PRIMARY KEY (id),
479 UNIQUE (id, data) ON CONFLICT IGNORE
480 )
481
482
483When using the :paramref:`_schema.Column.unique`
484flag to add a UNIQUE constraint
485to a single column, the ``sqlite_on_conflict_unique`` parameter can
486be added to the :class:`_schema.Column` as well, which will be added to the
487UNIQUE constraint in the DDL::
488
489 some_table = Table(
490 "some_table",
491 metadata,
492 Column("id", Integer, primary_key=True),
493 Column(
494 "data", Integer, unique=True, sqlite_on_conflict_unique="IGNORE"
495 ),
496 )
497
498rendering:
499
500.. sourcecode:: sql
501
502 CREATE TABLE some_table (
503 id INTEGER NOT NULL,
504 data INTEGER,
505 PRIMARY KEY (id),
506 UNIQUE (data) ON CONFLICT IGNORE
507 )
508
509To apply the FAIL algorithm for a NOT NULL constraint,
510``sqlite_on_conflict_not_null`` is used::
511
512 some_table = Table(
513 "some_table",
514 metadata,
515 Column("id", Integer, primary_key=True),
516 Column(
517 "data", Integer, nullable=False, sqlite_on_conflict_not_null="FAIL"
518 ),
519 )
520
521this renders the column inline ON CONFLICT phrase:
522
523.. sourcecode:: sql
524
525 CREATE TABLE some_table (
526 id INTEGER NOT NULL,
527 data INTEGER NOT NULL ON CONFLICT FAIL,
528 PRIMARY KEY (id)
529 )
530
531
532Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``::
533
534 some_table = Table(
535 "some_table",
536 metadata,
537 Column(
538 "id",
539 Integer,
540 primary_key=True,
541 sqlite_on_conflict_primary_key="FAIL",
542 ),
543 )
544
545SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict
546resolution algorithm is applied to the constraint itself:
547
548.. sourcecode:: sql
549
550 CREATE TABLE some_table (
551 id INTEGER NOT NULL,
552 PRIMARY KEY (id) ON CONFLICT FAIL
553 )
554
555.. _sqlite_on_conflict_insert:
556
557INSERT...ON CONFLICT (Upsert)
558-----------------------------
559
560.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for
561 SQLite, which occurs within an INSERT statement. For "ON CONFLICT" as
562 applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`.
563
564From version 3.24.0 onwards, SQLite supports "upserts" (update or insert)
565of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT``
566statement. A candidate row will only be inserted if that row does not violate
567any unique or primary key constraints. In the case of a unique constraint violation, a
568secondary action can occur which can be either "DO UPDATE", indicating that
569the data in the target row should be updated, or "DO NOTHING", which indicates
570to silently skip this row.
571
572Conflicts are determined using columns that are part of existing unique
573constraints and indexes. These constraints are identified by stating the
574columns and conditions that comprise the indexes.
575
576SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific
577:func:`_sqlite.insert()` function, which provides
578the generative methods :meth:`_sqlite.Insert.on_conflict_do_update`
579and :meth:`_sqlite.Insert.on_conflict_do_nothing`:
580
581.. sourcecode:: pycon+sql
582
583 >>> from sqlalchemy.dialects.sqlite import insert
584
585 >>> insert_stmt = insert(my_table).values(
586 ... id="some_existing_id", data="inserted value"
587 ... )
588
589 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
590 ... index_elements=["id"], set_=dict(data="updated value")
591 ... )
592
593 >>> print(do_update_stmt)
594 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
595 ON CONFLICT (id) DO UPDATE SET data = ?{stop}
596
597 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"])
598
599 >>> print(do_nothing_stmt)
600 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
601 ON CONFLICT (id) DO NOTHING
602
603.. versionadded:: 1.4
604
605.. seealso::
606
607 `Upsert
608 <https://sqlite.org/lang_UPSERT.html>`_
609 - in the SQLite documentation.
610
611
612Specifying the Target
613^^^^^^^^^^^^^^^^^^^^^
614
615Both methods supply the "target" of the conflict using column inference:
616
617* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument
618 specifies a sequence containing string column names, :class:`_schema.Column`
619 objects, and/or SQL expression elements, which would identify a unique index
620 or unique constraint.
621
622* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements`
623 to infer an index, a partial index can be inferred by also specifying the
624 :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter:
625
626 .. sourcecode:: pycon+sql
627
628 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data")
629
630 >>> do_update_stmt = stmt.on_conflict_do_update(
631 ... index_elements=[my_table.c.user_email],
632 ... index_where=my_table.c.user_email.like("%@gmail.com"),
633 ... set_=dict(data=stmt.excluded.data),
634 ... )
635
636 >>> print(do_update_stmt)
637 {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?)
638 ON CONFLICT (user_email)
639 WHERE user_email LIKE '%@gmail.com'
640 DO UPDATE SET data = excluded.data
641
642The SET Clause
643^^^^^^^^^^^^^^^
644
645``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
646existing row, using any combination of new values as well as values
647from the proposed insertion. These values are specified using the
648:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter. This
649parameter accepts a dictionary which consists of direct values
650for UPDATE:
651
652.. sourcecode:: pycon+sql
653
654 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
655
656 >>> do_update_stmt = stmt.on_conflict_do_update(
657 ... index_elements=["id"], set_=dict(data="updated value")
658 ... )
659
660 >>> print(do_update_stmt)
661 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
662 ON CONFLICT (id) DO UPDATE SET data = ?
663
664.. warning::
665
666 The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take
667 into account Python-side default UPDATE values or generation functions,
668 e.g. those specified using :paramref:`_schema.Column.onupdate`. These
669 values will not be exercised for an ON CONFLICT style of UPDATE, unless
670 they are manually specified in the
671 :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary.
672
673Updating using the Excluded INSERT Values
674^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
675
676In order to refer to the proposed insertion row, the special alias
677:attr:`~.sqlite.Insert.excluded` is available as an attribute on
678the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix
679on a column, that informs the DO UPDATE to update the row with the value that
680would have been inserted had the constraint not failed:
681
682.. sourcecode:: pycon+sql
683
684 >>> stmt = insert(my_table).values(
685 ... id="some_id", data="inserted value", author="jlh"
686 ... )
687
688 >>> do_update_stmt = stmt.on_conflict_do_update(
689 ... index_elements=["id"],
690 ... set_=dict(data="updated value", author=stmt.excluded.author),
691 ... )
692
693 >>> print(do_update_stmt)
694 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
695 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
696
697Additional WHERE Criteria
698^^^^^^^^^^^^^^^^^^^^^^^^^
699
700The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts
701a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where`
702parameter, which will limit those rows which receive an UPDATE:
703
704.. sourcecode:: pycon+sql
705
706 >>> stmt = insert(my_table).values(
707 ... id="some_id", data="inserted value", author="jlh"
708 ... )
709
710 >>> on_update_stmt = stmt.on_conflict_do_update(
711 ... index_elements=["id"],
712 ... set_=dict(data="updated value", author=stmt.excluded.author),
713 ... where=(my_table.c.status == 2),
714 ... )
715 >>> print(on_update_stmt)
716 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
717 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
718 WHERE my_table.status = ?
719
720
721Skipping Rows with DO NOTHING
722^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
723
724``ON CONFLICT`` may be used to skip inserting a row entirely
725if any conflict with a unique constraint occurs; below this is illustrated
726using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method:
727
728.. sourcecode:: pycon+sql
729
730 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
731 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
732 >>> print(stmt)
733 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING
734
735
736If ``DO NOTHING`` is used without specifying any columns or constraint,
737it has the effect of skipping the INSERT for any unique violation which
738occurs:
739
740.. sourcecode:: pycon+sql
741
742 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
743 >>> stmt = stmt.on_conflict_do_nothing()
744 >>> print(stmt)
745 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING
746
747.. _sqlite_type_reflection:
748
749Type Reflection
750---------------
751
752SQLite types are unlike those of most other database backends, in that
753the string name of the type usually does not correspond to a "type" in a
754one-to-one fashion. Instead, SQLite links per-column typing behavior
755to one of five so-called "type affinities" based on a string matching
756pattern for the type.
757
758SQLAlchemy's reflection process, when inspecting types, uses a simple
759lookup table to link the keywords returned to provided SQLAlchemy types.
760This lookup table is present within the SQLite dialect as it is for all
761other dialects. However, the SQLite dialect has a different "fallback"
762routine for when a particular type name is not located in the lookup map;
763it instead implements the SQLite "type affinity" scheme located at
764https://www.sqlite.org/datatype3.html section 2.1.
765
766The provided typemap will make direct associations from an exact string
767name match for the following types:
768
769:class:`_types.BIGINT`, :class:`_types.BLOB`,
770:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`,
771:class:`_types.CHAR`, :class:`_types.DATE`,
772:class:`_types.DATETIME`, :class:`_types.FLOAT`,
773:class:`_types.DECIMAL`, :class:`_types.FLOAT`,
774:class:`_types.INTEGER`, :class:`_types.INTEGER`,
775:class:`_types.NUMERIC`, :class:`_types.REAL`,
776:class:`_types.SMALLINT`, :class:`_types.TEXT`,
777:class:`_types.TIME`, :class:`_types.TIMESTAMP`,
778:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`,
779:class:`_types.NCHAR`
780
781When a type name does not match one of the above types, the "type affinity"
782lookup is used instead:
783
784* :class:`_types.INTEGER` is returned if the type name includes the
785 string ``INT``
786* :class:`_types.TEXT` is returned if the type name includes the
787 string ``CHAR``, ``CLOB`` or ``TEXT``
788* :class:`_types.NullType` is returned if the type name includes the
789 string ``BLOB``
790* :class:`_types.REAL` is returned if the type name includes the string
791 ``REAL``, ``FLOA`` or ``DOUB``.
792* Otherwise, the :class:`_types.NUMERIC` type is used.
793
794.. _sqlite_partial_index:
795
796Partial Indexes
797---------------
798
799A partial index, e.g. one which uses a WHERE clause, can be specified
800with the DDL system using the argument ``sqlite_where``::
801
802 tbl = Table("testtbl", m, Column("data", Integer))
803 idx = Index(
804 "test_idx1",
805 tbl.c.data,
806 sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10),
807 )
808
809The index will be rendered at create time as:
810
811.. sourcecode:: sql
812
813 CREATE INDEX test_idx1 ON testtbl (data)
814 WHERE data > 5 AND data < 10
815
816.. _sqlite_dotted_column_names:
817
818Dotted Column Names
819-------------------
820
821Using table or column names that explicitly have periods in them is
822**not recommended**. While this is generally a bad idea for relational
823databases in general, as the dot is a syntactically significant character,
824the SQLite driver up until version **3.10.0** of SQLite has a bug which
825requires that SQLAlchemy filter out these dots in result sets.
826
827The bug, entirely outside of SQLAlchemy, can be illustrated thusly::
828
829 import sqlite3
830
831 assert sqlite3.sqlite_version_info < (
832 3,
833 10,
834 0,
835 ), "bug is fixed in this version"
836
837 conn = sqlite3.connect(":memory:")
838 cursor = conn.cursor()
839
840 cursor.execute("create table x (a integer, b integer)")
841 cursor.execute("insert into x (a, b) values (1, 1)")
842 cursor.execute("insert into x (a, b) values (2, 2)")
843
844 cursor.execute("select x.a, x.b from x")
845 assert [c[0] for c in cursor.description] == ["a", "b"]
846
847 cursor.execute(
848 """
849 select x.a, x.b from x where a=1
850 union
851 select x.a, x.b from x where a=2
852 """
853 )
854 assert [c[0] for c in cursor.description] == ["a", "b"], [
855 c[0] for c in cursor.description
856 ]
857
858The second assertion fails:
859
860.. sourcecode:: text
861
862 Traceback (most recent call last):
863 File "test.py", line 19, in <module>
864 [c[0] for c in cursor.description]
865 AssertionError: ['x.a', 'x.b']
866
867Where above, the driver incorrectly reports the names of the columns
868including the name of the table, which is entirely inconsistent vs.
869when the UNION is not present.
870
871SQLAlchemy relies upon column names being predictable in how they match
872to the original statement, so the SQLAlchemy dialect has no choice but
873to filter these out::
874
875
876 from sqlalchemy import create_engine
877
878 eng = create_engine("sqlite://")
879 conn = eng.connect()
880
881 conn.exec_driver_sql("create table x (a integer, b integer)")
882 conn.exec_driver_sql("insert into x (a, b) values (1, 1)")
883 conn.exec_driver_sql("insert into x (a, b) values (2, 2)")
884
885 result = conn.exec_driver_sql("select x.a, x.b from x")
886 assert result.keys() == ["a", "b"]
887
888 result = conn.exec_driver_sql(
889 """
890 select x.a, x.b from x where a=1
891 union
892 select x.a, x.b from x where a=2
893 """
894 )
895 assert result.keys() == ["a", "b"]
896
897Note that above, even though SQLAlchemy filters out the dots, *both
898names are still addressable*::
899
900 >>> row = result.first()
901 >>> row["a"]
902 1
903 >>> row["x.a"]
904 1
905 >>> row["b"]
906 1
907 >>> row["x.b"]
908 1
909
910Therefore, the workaround applied by SQLAlchemy only impacts
911:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In
912the very specific case where an application is forced to use column names that
913contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and
914:meth:`.Row.keys()` is required to return these dotted names unmodified,
915the ``sqlite_raw_colnames`` execution option may be provided, either on a
916per-:class:`_engine.Connection` basis::
917
918 result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql(
919 """
920 select x.a, x.b from x where a=1
921 union
922 select x.a, x.b from x where a=2
923 """
924 )
925 assert result.keys() == ["x.a", "x.b"]
926
927or on a per-:class:`_engine.Engine` basis::
928
929 engine = create_engine(
930 "sqlite://", execution_options={"sqlite_raw_colnames": True}
931 )
932
933When using the per-:class:`_engine.Engine` execution option, note that
934**Core and ORM queries that use UNION may not function properly**.
935
936SQLite-specific table options
937-----------------------------
938
939One option for CREATE TABLE is supported directly by the SQLite
940dialect in conjunction with the :class:`_schema.Table` construct:
941
942* ``WITHOUT ROWID``::
943
944 Table("some_table", metadata, ..., sqlite_with_rowid=False)
945
946*
947 ``STRICT``::
948
949 Table("some_table", metadata, ..., sqlite_strict=True)
950
951 .. versionadded:: 2.0.37
952
953.. seealso::
954
955 `SQLite CREATE TABLE options
956 <https://www.sqlite.org/lang_createtable.html>`_
957
958.. _sqlite_include_internal:
959
960Reflecting internal schema tables
961----------------------------------
962
963Reflection methods that return lists of tables will omit so-called
964"SQLite internal schema object" names, which are considered by SQLite
965as any object name that is prefixed with ``sqlite_``. An example of
966such an object is the ``sqlite_sequence`` table that's generated when
967the ``AUTOINCREMENT`` column parameter is used. In order to return
968these objects, the parameter ``sqlite_include_internal=True`` may be
969passed to methods such as :meth:`_schema.MetaData.reflect` or
970:meth:`.Inspector.get_table_names`.
971
972.. versionadded:: 2.0 Added the ``sqlite_include_internal=True`` parameter.
973 Previously, these tables were not ignored by SQLAlchemy reflection
974 methods.
975
976.. note::
977
978 The ``sqlite_include_internal`` parameter does not refer to the
979 "system" tables that are present in schemas such as ``sqlite_master``.
980
981.. seealso::
982
983 `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite
984 documentation.
985
986''' # noqa
987from __future__ import annotations
988
989import datetime
990import numbers
991import re
992from typing import Optional
993
994from .json import JSON
995from .json import JSONIndexType
996from .json import JSONPathType
997from ... import exc
998from ... import schema as sa_schema
999from ... import sql
1000from ... import text
1001from ... import types as sqltypes
1002from ... import util
1003from ...engine import default
1004from ...engine import processors
1005from ...engine import reflection
1006from ...engine.reflection import ReflectionDefaults
1007from ...sql import coercions
1008from ...sql import compiler
1009from ...sql import elements
1010from ...sql import roles
1011from ...sql import schema
1012from ...types import BLOB # noqa
1013from ...types import BOOLEAN # noqa
1014from ...types import CHAR # noqa
1015from ...types import DECIMAL # noqa
1016from ...types import FLOAT # noqa
1017from ...types import INTEGER # noqa
1018from ...types import NUMERIC # noqa
1019from ...types import REAL # noqa
1020from ...types import SMALLINT # noqa
1021from ...types import TEXT # noqa
1022from ...types import TIMESTAMP # noqa
1023from ...types import VARCHAR # noqa
1024
1025
1026class _SQliteJson(JSON):
1027 def result_processor(self, dialect, coltype):
1028 default_processor = super().result_processor(dialect, coltype)
1029
1030 def process(value):
1031 try:
1032 return default_processor(value)
1033 except TypeError:
1034 if isinstance(value, numbers.Number):
1035 return value
1036 else:
1037 raise
1038
1039 return process
1040
1041
1042class _DateTimeMixin:
1043 _reg = None
1044 _storage_format = None
1045
1046 def __init__(self, storage_format=None, regexp=None, **kw):
1047 super().__init__(**kw)
1048 if regexp is not None:
1049 self._reg = re.compile(regexp)
1050 if storage_format is not None:
1051 self._storage_format = storage_format
1052
1053 @property
1054 def format_is_text_affinity(self):
1055 """return True if the storage format will automatically imply
1056 a TEXT affinity.
1057
1058 If the storage format contains no non-numeric characters,
1059 it will imply a NUMERIC storage format on SQLite; in this case,
1060 the type will generate its DDL as DATE_CHAR, DATETIME_CHAR,
1061 TIME_CHAR.
1062
1063 """
1064 spec = self._storage_format % {
1065 "year": 0,
1066 "month": 0,
1067 "day": 0,
1068 "hour": 0,
1069 "minute": 0,
1070 "second": 0,
1071 "microsecond": 0,
1072 }
1073 return bool(re.search(r"[^0-9]", spec))
1074
1075 def adapt(self, cls, **kw):
1076 if issubclass(cls, _DateTimeMixin):
1077 if self._storage_format:
1078 kw["storage_format"] = self._storage_format
1079 if self._reg:
1080 kw["regexp"] = self._reg
1081 return super().adapt(cls, **kw)
1082
1083 def literal_processor(self, dialect):
1084 bp = self.bind_processor(dialect)
1085
1086 def process(value):
1087 return "'%s'" % bp(value)
1088
1089 return process
1090
1091
1092class DATETIME(_DateTimeMixin, sqltypes.DateTime):
1093 r"""Represent a Python datetime object in SQLite using a string.
1094
1095 The default string storage format is::
1096
1097 "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1098
1099 e.g.:
1100
1101 .. sourcecode:: text
1102
1103 2021-03-15 12:05:57.105542
1104
1105 The incoming storage format is by default parsed using the
1106 Python ``datetime.fromisoformat()`` function.
1107
1108 .. versionchanged:: 2.0 ``datetime.fromisoformat()`` is used for default
1109 datetime string parsing.
1110
1111 The storage format can be customized to some degree using the
1112 ``storage_format`` and ``regexp`` parameters, such as::
1113
1114 import re
1115 from sqlalchemy.dialects.sqlite import DATETIME
1116
1117 dt = DATETIME(
1118 storage_format=(
1119 "%(year)04d/%(month)02d/%(day)02d %(hour)02d:%(minute)02d:%(second)02d"
1120 ),
1121 regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)",
1122 )
1123
1124 :param truncate_microseconds: when ``True`` microseconds will be truncated
1125 from the datetime. Can't be specified together with ``storage_format``
1126 or ``regexp``.
1127
1128 :param storage_format: format string which will be applied to the dict
1129 with keys year, month, day, hour, minute, second, and microsecond.
1130
1131 :param regexp: regular expression which will be applied to incoming result
1132 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming
1133 strings. If the regexp contains named groups, the resulting match dict is
1134 applied to the Python datetime() constructor as keyword arguments.
1135 Otherwise, if positional groups are used, the datetime() constructor
1136 is called with positional arguments via
1137 ``*map(int, match_obj.groups(0))``.
1138
1139 """ # noqa
1140
1141 _storage_format = (
1142 "%(year)04d-%(month)02d-%(day)02d "
1143 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1144 )
1145
1146 def __init__(self, *args, **kwargs):
1147 truncate_microseconds = kwargs.pop("truncate_microseconds", False)
1148 super().__init__(*args, **kwargs)
1149 if truncate_microseconds:
1150 assert "storage_format" not in kwargs, (
1151 "You can specify only "
1152 "one of truncate_microseconds or storage_format."
1153 )
1154 assert "regexp" not in kwargs, (
1155 "You can specify only one of "
1156 "truncate_microseconds or regexp."
1157 )
1158 self._storage_format = (
1159 "%(year)04d-%(month)02d-%(day)02d "
1160 "%(hour)02d:%(minute)02d:%(second)02d"
1161 )
1162
1163 def bind_processor(self, dialect):
1164 datetime_datetime = datetime.datetime
1165 datetime_date = datetime.date
1166 format_ = self._storage_format
1167
1168 def process(value):
1169 if value is None:
1170 return None
1171 elif isinstance(value, datetime_datetime):
1172 return format_ % {
1173 "year": value.year,
1174 "month": value.month,
1175 "day": value.day,
1176 "hour": value.hour,
1177 "minute": value.minute,
1178 "second": value.second,
1179 "microsecond": value.microsecond,
1180 }
1181 elif isinstance(value, datetime_date):
1182 return format_ % {
1183 "year": value.year,
1184 "month": value.month,
1185 "day": value.day,
1186 "hour": 0,
1187 "minute": 0,
1188 "second": 0,
1189 "microsecond": 0,
1190 }
1191 else:
1192 raise TypeError(
1193 "SQLite DateTime type only accepts Python "
1194 "datetime and date objects as input."
1195 )
1196
1197 return process
1198
1199 def result_processor(self, dialect, coltype):
1200 if self._reg:
1201 return processors.str_to_datetime_processor_factory(
1202 self._reg, datetime.datetime
1203 )
1204 else:
1205 return processors.str_to_datetime
1206
1207
1208class DATE(_DateTimeMixin, sqltypes.Date):
1209 r"""Represent a Python date object in SQLite using a string.
1210
1211 The default string storage format is::
1212
1213 "%(year)04d-%(month)02d-%(day)02d"
1214
1215 e.g.:
1216
1217 .. sourcecode:: text
1218
1219 2011-03-15
1220
1221 The incoming storage format is by default parsed using the
1222 Python ``date.fromisoformat()`` function.
1223
1224 .. versionchanged:: 2.0 ``date.fromisoformat()`` is used for default
1225 date string parsing.
1226
1227
1228 The storage format can be customized to some degree using the
1229 ``storage_format`` and ``regexp`` parameters, such as::
1230
1231 import re
1232 from sqlalchemy.dialects.sqlite import DATE
1233
1234 d = DATE(
1235 storage_format="%(month)02d/%(day)02d/%(year)04d",
1236 regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)"),
1237 )
1238
1239 :param storage_format: format string which will be applied to the
1240 dict with keys year, month, and day.
1241
1242 :param regexp: regular expression which will be applied to
1243 incoming result rows, replacing the use of ``date.fromisoformat()`` to
1244 parse incoming strings. If the regexp contains named groups, the resulting
1245 match dict is applied to the Python date() constructor as keyword
1246 arguments. Otherwise, if positional groups are used, the date()
1247 constructor is called with positional arguments via
1248 ``*map(int, match_obj.groups(0))``.
1249
1250 """
1251
1252 _storage_format = "%(year)04d-%(month)02d-%(day)02d"
1253
1254 def bind_processor(self, dialect):
1255 datetime_date = datetime.date
1256 format_ = self._storage_format
1257
1258 def process(value):
1259 if value is None:
1260 return None
1261 elif isinstance(value, datetime_date):
1262 return format_ % {
1263 "year": value.year,
1264 "month": value.month,
1265 "day": value.day,
1266 }
1267 else:
1268 raise TypeError(
1269 "SQLite Date type only accepts Python "
1270 "date objects as input."
1271 )
1272
1273 return process
1274
1275 def result_processor(self, dialect, coltype):
1276 if self._reg:
1277 return processors.str_to_datetime_processor_factory(
1278 self._reg, datetime.date
1279 )
1280 else:
1281 return processors.str_to_date
1282
1283
1284class TIME(_DateTimeMixin, sqltypes.Time):
1285 r"""Represent a Python time object in SQLite using a string.
1286
1287 The default string storage format is::
1288
1289 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1290
1291 e.g.:
1292
1293 .. sourcecode:: text
1294
1295 12:05:57.10558
1296
1297 The incoming storage format is by default parsed using the
1298 Python ``time.fromisoformat()`` function.
1299
1300 .. versionchanged:: 2.0 ``time.fromisoformat()`` is used for default
1301 time string parsing.
1302
1303 The storage format can be customized to some degree using the
1304 ``storage_format`` and ``regexp`` parameters, such as::
1305
1306 import re
1307 from sqlalchemy.dialects.sqlite import TIME
1308
1309 t = TIME(
1310 storage_format="%(hour)02d-%(minute)02d-%(second)02d-%(microsecond)06d",
1311 regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?"),
1312 )
1313
1314 :param truncate_microseconds: when ``True`` microseconds will be truncated
1315 from the time. Can't be specified together with ``storage_format``
1316 or ``regexp``.
1317
1318 :param storage_format: format string which will be applied to the dict
1319 with keys hour, minute, second, and microsecond.
1320
1321 :param regexp: regular expression which will be applied to incoming result
1322 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming
1323 strings. If the regexp contains named groups, the resulting match dict is
1324 applied to the Python time() constructor as keyword arguments. Otherwise,
1325 if positional groups are used, the time() constructor is called with
1326 positional arguments via ``*map(int, match_obj.groups(0))``.
1327
1328 """
1329
1330 _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1331
1332 def __init__(self, *args, **kwargs):
1333 truncate_microseconds = kwargs.pop("truncate_microseconds", False)
1334 super().__init__(*args, **kwargs)
1335 if truncate_microseconds:
1336 assert "storage_format" not in kwargs, (
1337 "You can specify only "
1338 "one of truncate_microseconds or storage_format."
1339 )
1340 assert "regexp" not in kwargs, (
1341 "You can specify only one of "
1342 "truncate_microseconds or regexp."
1343 )
1344 self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d"
1345
1346 def bind_processor(self, dialect):
1347 datetime_time = datetime.time
1348 format_ = self._storage_format
1349
1350 def process(value):
1351 if value is None:
1352 return None
1353 elif isinstance(value, datetime_time):
1354 return format_ % {
1355 "hour": value.hour,
1356 "minute": value.minute,
1357 "second": value.second,
1358 "microsecond": value.microsecond,
1359 }
1360 else:
1361 raise TypeError(
1362 "SQLite Time type only accepts Python "
1363 "time objects as input."
1364 )
1365
1366 return process
1367
1368 def result_processor(self, dialect, coltype):
1369 if self._reg:
1370 return processors.str_to_datetime_processor_factory(
1371 self._reg, datetime.time
1372 )
1373 else:
1374 return processors.str_to_time
1375
1376
1377colspecs = {
1378 sqltypes.Date: DATE,
1379 sqltypes.DateTime: DATETIME,
1380 sqltypes.JSON: _SQliteJson,
1381 sqltypes.JSON.JSONIndexType: JSONIndexType,
1382 sqltypes.JSON.JSONPathType: JSONPathType,
1383 sqltypes.Time: TIME,
1384}
1385
1386ischema_names = {
1387 "BIGINT": sqltypes.BIGINT,
1388 "BLOB": sqltypes.BLOB,
1389 "BOOL": sqltypes.BOOLEAN,
1390 "BOOLEAN": sqltypes.BOOLEAN,
1391 "CHAR": sqltypes.CHAR,
1392 "DATE": sqltypes.DATE,
1393 "DATE_CHAR": sqltypes.DATE,
1394 "DATETIME": sqltypes.DATETIME,
1395 "DATETIME_CHAR": sqltypes.DATETIME,
1396 "DOUBLE": sqltypes.DOUBLE,
1397 "DECIMAL": sqltypes.DECIMAL,
1398 "FLOAT": sqltypes.FLOAT,
1399 "INT": sqltypes.INTEGER,
1400 "INTEGER": sqltypes.INTEGER,
1401 "JSON": JSON,
1402 "NUMERIC": sqltypes.NUMERIC,
1403 "REAL": sqltypes.REAL,
1404 "SMALLINT": sqltypes.SMALLINT,
1405 "TEXT": sqltypes.TEXT,
1406 "TIME": sqltypes.TIME,
1407 "TIME_CHAR": sqltypes.TIME,
1408 "TIMESTAMP": sqltypes.TIMESTAMP,
1409 "VARCHAR": sqltypes.VARCHAR,
1410 "NVARCHAR": sqltypes.NVARCHAR,
1411 "NCHAR": sqltypes.NCHAR,
1412}
1413
1414
1415class SQLiteCompiler(compiler.SQLCompiler):
1416 extract_map = util.update_copy(
1417 compiler.SQLCompiler.extract_map,
1418 {
1419 "month": "%m",
1420 "day": "%d",
1421 "year": "%Y",
1422 "second": "%S",
1423 "hour": "%H",
1424 "doy": "%j",
1425 "minute": "%M",
1426 "epoch": "%s",
1427 "dow": "%w",
1428 "week": "%W",
1429 },
1430 )
1431
1432 def visit_truediv_binary(self, binary, operator, **kw):
1433 return (
1434 self.process(binary.left, **kw)
1435 + " / "
1436 + "(%s + 0.0)" % self.process(binary.right, **kw)
1437 )
1438
1439 def visit_now_func(self, fn, **kw):
1440 return "CURRENT_TIMESTAMP"
1441
1442 def visit_localtimestamp_func(self, func, **kw):
1443 return "DATETIME(CURRENT_TIMESTAMP, 'localtime')"
1444
1445 def visit_true(self, expr, **kw):
1446 return "1"
1447
1448 def visit_false(self, expr, **kw):
1449 return "0"
1450
1451 def visit_char_length_func(self, fn, **kw):
1452 return "length%s" % self.function_argspec(fn)
1453
1454 def visit_aggregate_strings_func(self, fn, **kw):
1455 return "group_concat%s" % self.function_argspec(fn)
1456
1457 def visit_cast(self, cast, **kwargs):
1458 if self.dialect.supports_cast:
1459 return super().visit_cast(cast, **kwargs)
1460 else:
1461 return self.process(cast.clause, **kwargs)
1462
1463 def visit_extract(self, extract, **kw):
1464 try:
1465 return "CAST(STRFTIME('%s', %s) AS INTEGER)" % (
1466 self.extract_map[extract.field],
1467 self.process(extract.expr, **kw),
1468 )
1469 except KeyError as err:
1470 raise exc.CompileError(
1471 "%s is not a valid extract argument." % extract.field
1472 ) from err
1473
1474 def returning_clause(
1475 self,
1476 stmt,
1477 returning_cols,
1478 *,
1479 populate_result_map,
1480 **kw,
1481 ):
1482 kw["include_table"] = False
1483 return super().returning_clause(
1484 stmt, returning_cols, populate_result_map=populate_result_map, **kw
1485 )
1486
1487 def limit_clause(self, select, **kw):
1488 text = ""
1489 if select._limit_clause is not None:
1490 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
1491 if select._offset_clause is not None:
1492 if select._limit_clause is None:
1493 text += "\n LIMIT " + self.process(sql.literal(-1))
1494 text += " OFFSET " + self.process(select._offset_clause, **kw)
1495 else:
1496 text += " OFFSET " + self.process(sql.literal(0), **kw)
1497 return text
1498
1499 def for_update_clause(self, select, **kw):
1500 # sqlite has no "FOR UPDATE" AFAICT
1501 return ""
1502
1503 def update_from_clause(
1504 self, update_stmt, from_table, extra_froms, from_hints, **kw
1505 ):
1506 kw["asfrom"] = True
1507 return "FROM " + ", ".join(
1508 t._compiler_dispatch(self, fromhints=from_hints, **kw)
1509 for t in extra_froms
1510 )
1511
1512 def visit_is_distinct_from_binary(self, binary, operator, **kw):
1513 return "%s IS NOT %s" % (
1514 self.process(binary.left),
1515 self.process(binary.right),
1516 )
1517
1518 def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
1519 return "%s IS %s" % (
1520 self.process(binary.left),
1521 self.process(binary.right),
1522 )
1523
1524 def visit_json_getitem_op_binary(
1525 self, binary, operator, _cast_applied=False, **kw
1526 ):
1527 if (
1528 not _cast_applied
1529 and binary.type._type_affinity is not sqltypes.JSON
1530 ):
1531 kw["_cast_applied"] = True
1532 return self.process(sql.cast(binary, binary.type), **kw)
1533
1534 if binary.type._type_affinity is sqltypes.JSON:
1535 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
1536 else:
1537 expr = "JSON_EXTRACT(%s, %s)"
1538
1539 return expr % (
1540 self.process(binary.left, **kw),
1541 self.process(binary.right, **kw),
1542 )
1543
1544 def visit_json_path_getitem_op_binary(
1545 self, binary, operator, _cast_applied=False, **kw
1546 ):
1547 if (
1548 not _cast_applied
1549 and binary.type._type_affinity is not sqltypes.JSON
1550 ):
1551 kw["_cast_applied"] = True
1552 return self.process(sql.cast(binary, binary.type), **kw)
1553
1554 if binary.type._type_affinity is sqltypes.JSON:
1555 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
1556 else:
1557 expr = "JSON_EXTRACT(%s, %s)"
1558
1559 return expr % (
1560 self.process(binary.left, **kw),
1561 self.process(binary.right, **kw),
1562 )
1563
1564 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
1565 # slightly old SQLite versions don't seem to be able to handle
1566 # the empty set impl
1567 return self.visit_empty_set_expr(type_)
1568
1569 def visit_empty_set_expr(self, element_types, **kw):
1570 return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % (
1571 ", ".join("1" for type_ in element_types or [INTEGER()]),
1572 ", ".join("1" for type_ in element_types or [INTEGER()]),
1573 )
1574
1575 def visit_regexp_match_op_binary(self, binary, operator, **kw):
1576 return self._generate_generic_binary(binary, " REGEXP ", **kw)
1577
1578 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
1579 return self._generate_generic_binary(binary, " NOT REGEXP ", **kw)
1580
1581 def _on_conflict_target(self, clause, **kw):
1582 if clause.inferred_target_elements is not None:
1583 target_text = "(%s)" % ", ".join(
1584 (
1585 self.preparer.quote(c)
1586 if isinstance(c, str)
1587 else self.process(c, include_table=False, use_schema=False)
1588 )
1589 for c in clause.inferred_target_elements
1590 )
1591 if clause.inferred_target_whereclause is not None:
1592 target_text += " WHERE %s" % self.process(
1593 clause.inferred_target_whereclause,
1594 include_table=False,
1595 use_schema=False,
1596 literal_execute=True,
1597 )
1598
1599 else:
1600 target_text = ""
1601
1602 return target_text
1603
1604 def visit_on_conflict_do_nothing(self, on_conflict, **kw):
1605 target_text = self._on_conflict_target(on_conflict, **kw)
1606
1607 if target_text:
1608 return "ON CONFLICT %s DO NOTHING" % target_text
1609 else:
1610 return "ON CONFLICT DO NOTHING"
1611
1612 def visit_on_conflict_do_update(self, on_conflict, **kw):
1613 clause = on_conflict
1614
1615 target_text = self._on_conflict_target(on_conflict, **kw)
1616
1617 action_set_ops = []
1618
1619 set_parameters = dict(clause.update_values_to_set)
1620 # create a list of column assignment clauses as tuples
1621
1622 insert_statement = self.stack[-1]["selectable"]
1623 cols = insert_statement.table.c
1624 for c in cols:
1625 col_key = c.key
1626
1627 if col_key in set_parameters:
1628 value = set_parameters.pop(col_key)
1629 elif c in set_parameters:
1630 value = set_parameters.pop(c)
1631 else:
1632 continue
1633
1634 if (
1635 isinstance(value, elements.BindParameter)
1636 and value.type._isnull
1637 ):
1638 value = value._with_binary_element_type(c.type)
1639 value_text = self.process(value.self_group(), use_schema=False)
1640
1641 key_text = self.preparer.quote(c.name)
1642 action_set_ops.append("%s = %s" % (key_text, value_text))
1643
1644 # check for names that don't match columns
1645 if set_parameters:
1646 util.warn(
1647 "Additional column names not matching "
1648 "any column keys in table '%s': %s"
1649 % (
1650 self.current_executable.table.name,
1651 (", ".join("'%s'" % c for c in set_parameters)),
1652 )
1653 )
1654 for k, v in set_parameters.items():
1655 key_text = (
1656 self.preparer.quote(k)
1657 if isinstance(k, str)
1658 else self.process(k, use_schema=False)
1659 )
1660 value_text = self.process(
1661 coercions.expect(roles.ExpressionElementRole, v),
1662 use_schema=False,
1663 )
1664 action_set_ops.append("%s = %s" % (key_text, value_text))
1665
1666 action_text = ", ".join(action_set_ops)
1667 if clause.update_whereclause is not None:
1668 action_text += " WHERE %s" % self.process(
1669 clause.update_whereclause, include_table=True, use_schema=False
1670 )
1671
1672 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
1673
1674 def visit_bitwise_xor_op_binary(self, binary, operator, **kw):
1675 # sqlite has no xor. Use "a XOR b" = "(a | b) - (a & b)".
1676 kw["eager_grouping"] = True
1677 or_ = self._generate_generic_binary(binary, " | ", **kw)
1678 and_ = self._generate_generic_binary(binary, " & ", **kw)
1679 return f"({or_} - {and_})"
1680
1681
1682class SQLiteDDLCompiler(compiler.DDLCompiler):
1683 def get_column_specification(self, column, **kwargs):
1684 coltype = self.dialect.type_compiler_instance.process(
1685 column.type, type_expression=column
1686 )
1687 colspec = self.preparer.format_column(column) + " " + coltype
1688 default = self.get_column_default_string(column)
1689 if default is not None:
1690
1691 if not re.match(r"""^\s*[\'\"\(]""", default) and re.match(
1692 r".*\W.*", default
1693 ):
1694 colspec += f" DEFAULT ({default})"
1695 else:
1696 colspec += f" DEFAULT {default}"
1697
1698 if not column.nullable:
1699 colspec += " NOT NULL"
1700
1701 on_conflict_clause = column.dialect_options["sqlite"][
1702 "on_conflict_not_null"
1703 ]
1704 if on_conflict_clause is not None:
1705 colspec += " ON CONFLICT " + on_conflict_clause
1706
1707 if column.primary_key:
1708 if (
1709 column.autoincrement is True
1710 and len(column.table.primary_key.columns) != 1
1711 ):
1712 raise exc.CompileError(
1713 "SQLite does not support autoincrement for "
1714 "composite primary keys"
1715 )
1716
1717 if (
1718 column.table.dialect_options["sqlite"]["autoincrement"]
1719 and len(column.table.primary_key.columns) == 1
1720 and issubclass(column.type._type_affinity, sqltypes.Integer)
1721 and not column.foreign_keys
1722 ):
1723 colspec += " PRIMARY KEY"
1724
1725 on_conflict_clause = column.dialect_options["sqlite"][
1726 "on_conflict_primary_key"
1727 ]
1728 if on_conflict_clause is not None:
1729 colspec += " ON CONFLICT " + on_conflict_clause
1730
1731 colspec += " AUTOINCREMENT"
1732
1733 if column.computed is not None:
1734 colspec += " " + self.process(column.computed)
1735
1736 return colspec
1737
1738 def visit_primary_key_constraint(self, constraint, **kw):
1739 # for columns with sqlite_autoincrement=True,
1740 # the PRIMARY KEY constraint can only be inline
1741 # with the column itself.
1742 if len(constraint.columns) == 1:
1743 c = list(constraint)[0]
1744 if (
1745 c.primary_key
1746 and c.table.dialect_options["sqlite"]["autoincrement"]
1747 and issubclass(c.type._type_affinity, sqltypes.Integer)
1748 and not c.foreign_keys
1749 ):
1750 return None
1751
1752 text = super().visit_primary_key_constraint(constraint)
1753
1754 on_conflict_clause = constraint.dialect_options["sqlite"][
1755 "on_conflict"
1756 ]
1757 if on_conflict_clause is None and len(constraint.columns) == 1:
1758 on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][
1759 "on_conflict_primary_key"
1760 ]
1761
1762 if on_conflict_clause is not None:
1763 text += " ON CONFLICT " + on_conflict_clause
1764
1765 return text
1766
1767 def visit_unique_constraint(self, constraint, **kw):
1768 text = super().visit_unique_constraint(constraint)
1769
1770 on_conflict_clause = constraint.dialect_options["sqlite"][
1771 "on_conflict"
1772 ]
1773 if on_conflict_clause is None and len(constraint.columns) == 1:
1774 col1 = list(constraint)[0]
1775 if isinstance(col1, schema.SchemaItem):
1776 on_conflict_clause = list(constraint)[0].dialect_options[
1777 "sqlite"
1778 ]["on_conflict_unique"]
1779
1780 if on_conflict_clause is not None:
1781 text += " ON CONFLICT " + on_conflict_clause
1782
1783 return text
1784
1785 def visit_check_constraint(self, constraint, **kw):
1786 text = super().visit_check_constraint(constraint)
1787
1788 on_conflict_clause = constraint.dialect_options["sqlite"][
1789 "on_conflict"
1790 ]
1791
1792 if on_conflict_clause is not None:
1793 text += " ON CONFLICT " + on_conflict_clause
1794
1795 return text
1796
1797 def visit_column_check_constraint(self, constraint, **kw):
1798 text = super().visit_column_check_constraint(constraint)
1799
1800 if constraint.dialect_options["sqlite"]["on_conflict"] is not None:
1801 raise exc.CompileError(
1802 "SQLite does not support on conflict clause for "
1803 "column check constraint"
1804 )
1805
1806 return text
1807
1808 def visit_foreign_key_constraint(self, constraint, **kw):
1809 local_table = constraint.elements[0].parent.table
1810 remote_table = constraint.elements[0].column.table
1811
1812 if local_table.schema != remote_table.schema:
1813 return None
1814 else:
1815 return super().visit_foreign_key_constraint(constraint)
1816
1817 def define_constraint_remote_table(self, constraint, table, preparer):
1818 """Format the remote table clause of a CREATE CONSTRAINT clause."""
1819
1820 return preparer.format_table(table, use_schema=False)
1821
1822 def visit_create_index(
1823 self, create, include_schema=False, include_table_schema=True, **kw
1824 ):
1825 index = create.element
1826 self._verify_index_table(index)
1827 preparer = self.preparer
1828 text = "CREATE "
1829 if index.unique:
1830 text += "UNIQUE "
1831
1832 text += "INDEX "
1833
1834 if create.if_not_exists:
1835 text += "IF NOT EXISTS "
1836
1837 text += "%s ON %s (%s)" % (
1838 self._prepared_index_name(index, include_schema=True),
1839 preparer.format_table(index.table, use_schema=False),
1840 ", ".join(
1841 self.sql_compiler.process(
1842 expr, include_table=False, literal_binds=True
1843 )
1844 for expr in index.expressions
1845 ),
1846 )
1847
1848 whereclause = index.dialect_options["sqlite"]["where"]
1849 if whereclause is not None:
1850 where_compiled = self.sql_compiler.process(
1851 whereclause, include_table=False, literal_binds=True
1852 )
1853 text += " WHERE " + where_compiled
1854
1855 return text
1856
1857 def post_create_table(self, table):
1858 table_options = []
1859
1860 if not table.dialect_options["sqlite"]["with_rowid"]:
1861 table_options.append("WITHOUT ROWID")
1862
1863 if table.dialect_options["sqlite"]["strict"]:
1864 table_options.append("STRICT")
1865
1866 if table_options:
1867 return "\n " + ",\n ".join(table_options)
1868 else:
1869 return ""
1870
1871
1872class SQLiteTypeCompiler(compiler.GenericTypeCompiler):
1873 def visit_large_binary(self, type_, **kw):
1874 return self.visit_BLOB(type_)
1875
1876 def visit_DATETIME(self, type_, **kw):
1877 if (
1878 not isinstance(type_, _DateTimeMixin)
1879 or type_.format_is_text_affinity
1880 ):
1881 return super().visit_DATETIME(type_)
1882 else:
1883 return "DATETIME_CHAR"
1884
1885 def visit_DATE(self, type_, **kw):
1886 if (
1887 not isinstance(type_, _DateTimeMixin)
1888 or type_.format_is_text_affinity
1889 ):
1890 return super().visit_DATE(type_)
1891 else:
1892 return "DATE_CHAR"
1893
1894 def visit_TIME(self, type_, **kw):
1895 if (
1896 not isinstance(type_, _DateTimeMixin)
1897 or type_.format_is_text_affinity
1898 ):
1899 return super().visit_TIME(type_)
1900 else:
1901 return "TIME_CHAR"
1902
1903 def visit_JSON(self, type_, **kw):
1904 # note this name provides NUMERIC affinity, not TEXT.
1905 # should not be an issue unless the JSON value consists of a single
1906 # numeric value. JSONTEXT can be used if this case is required.
1907 return "JSON"
1908
1909
1910class SQLiteIdentifierPreparer(compiler.IdentifierPreparer):
1911 reserved_words = {
1912 "add",
1913 "after",
1914 "all",
1915 "alter",
1916 "analyze",
1917 "and",
1918 "as",
1919 "asc",
1920 "attach",
1921 "autoincrement",
1922 "before",
1923 "begin",
1924 "between",
1925 "by",
1926 "cascade",
1927 "case",
1928 "cast",
1929 "check",
1930 "collate",
1931 "column",
1932 "commit",
1933 "conflict",
1934 "constraint",
1935 "create",
1936 "cross",
1937 "current_date",
1938 "current_time",
1939 "current_timestamp",
1940 "database",
1941 "default",
1942 "deferrable",
1943 "deferred",
1944 "delete",
1945 "desc",
1946 "detach",
1947 "distinct",
1948 "drop",
1949 "each",
1950 "else",
1951 "end",
1952 "escape",
1953 "except",
1954 "exclusive",
1955 "exists",
1956 "explain",
1957 "false",
1958 "fail",
1959 "for",
1960 "foreign",
1961 "from",
1962 "full",
1963 "glob",
1964 "group",
1965 "having",
1966 "if",
1967 "ignore",
1968 "immediate",
1969 "in",
1970 "index",
1971 "indexed",
1972 "initially",
1973 "inner",
1974 "insert",
1975 "instead",
1976 "intersect",
1977 "into",
1978 "is",
1979 "isnull",
1980 "join",
1981 "key",
1982 "left",
1983 "like",
1984 "limit",
1985 "match",
1986 "natural",
1987 "not",
1988 "notnull",
1989 "null",
1990 "of",
1991 "offset",
1992 "on",
1993 "or",
1994 "order",
1995 "outer",
1996 "plan",
1997 "pragma",
1998 "primary",
1999 "query",
2000 "raise",
2001 "references",
2002 "reindex",
2003 "rename",
2004 "replace",
2005 "restrict",
2006 "right",
2007 "rollback",
2008 "row",
2009 "select",
2010 "set",
2011 "table",
2012 "temp",
2013 "temporary",
2014 "then",
2015 "to",
2016 "transaction",
2017 "trigger",
2018 "true",
2019 "union",
2020 "unique",
2021 "update",
2022 "using",
2023 "vacuum",
2024 "values",
2025 "view",
2026 "virtual",
2027 "when",
2028 "where",
2029 }
2030
2031
2032class SQLiteExecutionContext(default.DefaultExecutionContext):
2033 @util.memoized_property
2034 def _preserve_raw_colnames(self):
2035 return (
2036 not self.dialect._broken_dotted_colnames
2037 or self.execution_options.get("sqlite_raw_colnames", False)
2038 )
2039
2040 def _translate_colname(self, colname):
2041 # TODO: detect SQLite version 3.10.0 or greater;
2042 # see [ticket:3633]
2043
2044 # adjust for dotted column names. SQLite
2045 # in the case of UNION may store col names as
2046 # "tablename.colname", or if using an attached database,
2047 # "database.tablename.colname", in cursor.description
2048 if not self._preserve_raw_colnames and "." in colname:
2049 return colname.split(".")[-1], colname
2050 else:
2051 return colname, None
2052
2053
2054class SQLiteDialect(default.DefaultDialect):
2055 name = "sqlite"
2056 supports_alter = False
2057
2058 # SQlite supports "DEFAULT VALUES" but *does not* support
2059 # "VALUES (DEFAULT)"
2060 supports_default_values = True
2061 supports_default_metavalue = False
2062
2063 # sqlite issue:
2064 # https://github.com/python/cpython/issues/93421
2065 # note this parameter is no longer used by the ORM or default dialect
2066 # see #9414
2067 supports_sane_rowcount_returning = False
2068
2069 supports_empty_insert = False
2070 supports_cast = True
2071 supports_multivalues_insert = True
2072 use_insertmanyvalues = True
2073 tuple_in_values = True
2074 supports_statement_cache = True
2075 insert_null_pk_still_autoincrements = True
2076 insert_returning = True
2077 update_returning = True
2078 update_returning_multifrom = True
2079 delete_returning = True
2080 update_returning_multifrom = True
2081
2082 supports_default_metavalue = True
2083 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
2084
2085 default_metavalue_token = "NULL"
2086 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
2087 parenthesis."""
2088
2089 default_paramstyle = "qmark"
2090 execution_ctx_cls = SQLiteExecutionContext
2091 statement_compiler = SQLiteCompiler
2092 ddl_compiler = SQLiteDDLCompiler
2093 type_compiler_cls = SQLiteTypeCompiler
2094 preparer = SQLiteIdentifierPreparer
2095 ischema_names = ischema_names
2096 colspecs = colspecs
2097
2098 construct_arguments = [
2099 (
2100 sa_schema.Table,
2101 {
2102 "autoincrement": False,
2103 "with_rowid": True,
2104 "strict": False,
2105 },
2106 ),
2107 (sa_schema.Index, {"where": None}),
2108 (
2109 sa_schema.Column,
2110 {
2111 "on_conflict_primary_key": None,
2112 "on_conflict_not_null": None,
2113 "on_conflict_unique": None,
2114 },
2115 ),
2116 (sa_schema.Constraint, {"on_conflict": None}),
2117 ]
2118
2119 _broken_fk_pragma_quotes = False
2120 _broken_dotted_colnames = False
2121
2122 def __init__(
2123 self,
2124 native_datetime=False,
2125 json_serializer=None,
2126 json_deserializer=None,
2127 **kwargs,
2128 ):
2129 default.DefaultDialect.__init__(self, **kwargs)
2130
2131 self._json_serializer = json_serializer
2132 self._json_deserializer = json_deserializer
2133
2134 # this flag used by pysqlite dialect, and perhaps others in the
2135 # future, to indicate the driver is handling date/timestamp
2136 # conversions (and perhaps datetime/time as well on some hypothetical
2137 # driver ?)
2138 self.native_datetime = native_datetime
2139
2140 if self.dbapi is not None:
2141 if self.dbapi.sqlite_version_info < (3, 7, 16):
2142 util.warn(
2143 "SQLite version %s is older than 3.7.16, and will not "
2144 "support right nested joins, as are sometimes used in "
2145 "more complex ORM scenarios. SQLAlchemy 1.4 and above "
2146 "no longer tries to rewrite these joins."
2147 % (self.dbapi.sqlite_version_info,)
2148 )
2149
2150 # NOTE: python 3.7 on fedora for me has SQLite 3.34.1. These
2151 # version checks are getting very stale.
2152 self._broken_dotted_colnames = self.dbapi.sqlite_version_info < (
2153 3,
2154 10,
2155 0,
2156 )
2157 self.supports_default_values = self.dbapi.sqlite_version_info >= (
2158 3,
2159 3,
2160 8,
2161 )
2162 self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3)
2163 self.supports_multivalues_insert = (
2164 # https://www.sqlite.org/releaselog/3_7_11.html
2165 self.dbapi.sqlite_version_info
2166 >= (3, 7, 11)
2167 )
2168 # see https://www.sqlalchemy.org/trac/ticket/2568
2169 # as well as https://www.sqlite.org/src/info/600482d161
2170 self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < (
2171 3,
2172 6,
2173 14,
2174 )
2175
2176 if self.dbapi.sqlite_version_info < (3, 35) or util.pypy:
2177 self.update_returning = self.delete_returning = (
2178 self.insert_returning
2179 ) = False
2180
2181 if self.dbapi.sqlite_version_info < (3, 32, 0):
2182 # https://www.sqlite.org/limits.html
2183 self.insertmanyvalues_max_parameters = 999
2184
2185 _isolation_lookup = util.immutabledict(
2186 {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0}
2187 )
2188
2189 def get_isolation_level_values(self, dbapi_connection):
2190 return list(self._isolation_lookup)
2191
2192 def set_isolation_level(self, dbapi_connection, level):
2193 isolation_level = self._isolation_lookup[level]
2194
2195 cursor = dbapi_connection.cursor()
2196 cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}")
2197 cursor.close()
2198
2199 def get_isolation_level(self, dbapi_connection):
2200 cursor = dbapi_connection.cursor()
2201 cursor.execute("PRAGMA read_uncommitted")
2202 res = cursor.fetchone()
2203 if res:
2204 value = res[0]
2205 else:
2206 # https://www.sqlite.org/changes.html#version_3_3_3
2207 # "Optional READ UNCOMMITTED isolation (instead of the
2208 # default isolation level of SERIALIZABLE) and
2209 # table level locking when database connections
2210 # share a common cache.""
2211 # pre-SQLite 3.3.0 default to 0
2212 value = 0
2213 cursor.close()
2214 if value == 0:
2215 return "SERIALIZABLE"
2216 elif value == 1:
2217 return "READ UNCOMMITTED"
2218 else:
2219 assert False, "Unknown isolation level %s" % value
2220
2221 @reflection.cache
2222 def get_schema_names(self, connection, **kw):
2223 s = "PRAGMA database_list"
2224 dl = connection.exec_driver_sql(s)
2225
2226 return [db[1] for db in dl if db[1] != "temp"]
2227
2228 def _format_schema(self, schema, table_name):
2229 if schema is not None:
2230 qschema = self.identifier_preparer.quote_identifier(schema)
2231 name = f"{qschema}.{table_name}"
2232 else:
2233 name = table_name
2234 return name
2235
2236 def _sqlite_main_query(
2237 self,
2238 table: str,
2239 type_: str,
2240 schema: Optional[str],
2241 sqlite_include_internal: bool,
2242 ):
2243 main = self._format_schema(schema, table)
2244 if not sqlite_include_internal:
2245 filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'"
2246 else:
2247 filter_table = ""
2248 query = (
2249 f"SELECT name FROM {main} "
2250 f"WHERE type='{type_}'{filter_table} "
2251 "ORDER BY name"
2252 )
2253 return query
2254
2255 @reflection.cache
2256 def get_table_names(
2257 self, connection, schema=None, sqlite_include_internal=False, **kw
2258 ):
2259 query = self._sqlite_main_query(
2260 "sqlite_master", "table", schema, sqlite_include_internal
2261 )
2262 names = connection.exec_driver_sql(query).scalars().all()
2263 return names
2264
2265 @reflection.cache
2266 def get_temp_table_names(
2267 self, connection, sqlite_include_internal=False, **kw
2268 ):
2269 query = self._sqlite_main_query(
2270 "sqlite_temp_master", "table", None, sqlite_include_internal
2271 )
2272 names = connection.exec_driver_sql(query).scalars().all()
2273 return names
2274
2275 @reflection.cache
2276 def get_temp_view_names(
2277 self, connection, sqlite_include_internal=False, **kw
2278 ):
2279 query = self._sqlite_main_query(
2280 "sqlite_temp_master", "view", None, sqlite_include_internal
2281 )
2282 names = connection.exec_driver_sql(query).scalars().all()
2283 return names
2284
2285 @reflection.cache
2286 def has_table(self, connection, table_name, schema=None, **kw):
2287 self._ensure_has_table_connection(connection)
2288
2289 if schema is not None and schema not in self.get_schema_names(
2290 connection, **kw
2291 ):
2292 return False
2293
2294 info = self._get_table_pragma(
2295 connection, "table_info", table_name, schema=schema
2296 )
2297 return bool(info)
2298
2299 def _get_default_schema_name(self, connection):
2300 return "main"
2301
2302 @reflection.cache
2303 def get_view_names(
2304 self, connection, schema=None, sqlite_include_internal=False, **kw
2305 ):
2306 query = self._sqlite_main_query(
2307 "sqlite_master", "view", schema, sqlite_include_internal
2308 )
2309 names = connection.exec_driver_sql(query).scalars().all()
2310 return names
2311
2312 @reflection.cache
2313 def get_view_definition(self, connection, view_name, schema=None, **kw):
2314 if schema is not None:
2315 qschema = self.identifier_preparer.quote_identifier(schema)
2316 master = f"{qschema}.sqlite_master"
2317 s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % (
2318 master,
2319 )
2320 rs = connection.exec_driver_sql(s, (view_name,))
2321 else:
2322 try:
2323 s = (
2324 "SELECT sql FROM "
2325 " (SELECT * FROM sqlite_master UNION ALL "
2326 " SELECT * FROM sqlite_temp_master) "
2327 "WHERE name = ? "
2328 "AND type='view'"
2329 )
2330 rs = connection.exec_driver_sql(s, (view_name,))
2331 except exc.DBAPIError:
2332 s = (
2333 "SELECT sql FROM sqlite_master WHERE name = ? "
2334 "AND type='view'"
2335 )
2336 rs = connection.exec_driver_sql(s, (view_name,))
2337
2338 result = rs.fetchall()
2339 if result:
2340 return result[0].sql
2341 else:
2342 raise exc.NoSuchTableError(
2343 f"{schema}.{view_name}" if schema else view_name
2344 )
2345
2346 @reflection.cache
2347 def get_columns(self, connection, table_name, schema=None, **kw):
2348 pragma = "table_info"
2349 # computed columns are threaded as hidden, they require table_xinfo
2350 if self.server_version_info >= (3, 31):
2351 pragma = "table_xinfo"
2352 info = self._get_table_pragma(
2353 connection, pragma, table_name, schema=schema
2354 )
2355 columns = []
2356 tablesql = None
2357 for row in info:
2358 name = row[1]
2359 type_ = row[2].upper()
2360 nullable = not row[3]
2361 default = row[4]
2362 primary_key = row[5]
2363 hidden = row[6] if pragma == "table_xinfo" else 0
2364
2365 # hidden has value 0 for normal columns, 1 for hidden columns,
2366 # 2 for computed virtual columns and 3 for computed stored columns
2367 # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b
2368 if hidden == 1:
2369 continue
2370
2371 generated = bool(hidden)
2372 persisted = hidden == 3
2373
2374 if tablesql is None and generated:
2375 tablesql = self._get_table_sql(
2376 connection, table_name, schema, **kw
2377 )
2378 # remove create table
2379 match = re.match(
2380 r"create table .*?\((.*)\)$",
2381 tablesql.strip(),
2382 re.DOTALL | re.IGNORECASE,
2383 )
2384 assert match, f"create table not found in {tablesql}"
2385 tablesql = match.group(1).strip()
2386
2387 columns.append(
2388 self._get_column_info(
2389 name,
2390 type_,
2391 nullable,
2392 default,
2393 primary_key,
2394 generated,
2395 persisted,
2396 tablesql,
2397 )
2398 )
2399 if columns:
2400 return columns
2401 elif not self.has_table(connection, table_name, schema):
2402 raise exc.NoSuchTableError(
2403 f"{schema}.{table_name}" if schema else table_name
2404 )
2405 else:
2406 return ReflectionDefaults.columns()
2407
2408 def _get_column_info(
2409 self,
2410 name,
2411 type_,
2412 nullable,
2413 default,
2414 primary_key,
2415 generated,
2416 persisted,
2417 tablesql,
2418 ):
2419 if generated:
2420 # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)"
2421 # somehow is "INTEGER GENERATED ALWAYS"
2422 type_ = re.sub("generated", "", type_, flags=re.IGNORECASE)
2423 type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip()
2424
2425 coltype = self._resolve_type_affinity(type_)
2426
2427 if default is not None:
2428 default = str(default)
2429
2430 colspec = {
2431 "name": name,
2432 "type": coltype,
2433 "nullable": nullable,
2434 "default": default,
2435 "primary_key": primary_key,
2436 }
2437 if generated:
2438 sqltext = ""
2439 if tablesql:
2440 pattern = (
2441 r"[^,]*\s+GENERATED\s+ALWAYS\s+AS"
2442 r"\s+\((.*)\)\s*(?:virtual|stored)?"
2443 )
2444 match = re.search(
2445 re.escape(name) + pattern, tablesql, re.IGNORECASE
2446 )
2447 if match:
2448 sqltext = match.group(1)
2449 colspec["computed"] = {"sqltext": sqltext, "persisted": persisted}
2450 return colspec
2451
2452 def _resolve_type_affinity(self, type_):
2453 """Return a data type from a reflected column, using affinity rules.
2454
2455 SQLite's goal for universal compatibility introduces some complexity
2456 during reflection, as a column's defined type might not actually be a
2457 type that SQLite understands - or indeed, my not be defined *at all*.
2458 Internally, SQLite handles this with a 'data type affinity' for each
2459 column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER',
2460 'REAL', or 'NONE' (raw bits). The algorithm that determines this is
2461 listed in https://www.sqlite.org/datatype3.html section 2.1.
2462
2463 This method allows SQLAlchemy to support that algorithm, while still
2464 providing access to smarter reflection utilities by recognizing
2465 column definitions that SQLite only supports through affinity (like
2466 DATE and DOUBLE).
2467
2468 """
2469 match = re.match(r"([\w ]+)(\(.*?\))?", type_)
2470 if match:
2471 coltype = match.group(1)
2472 args = match.group(2)
2473 else:
2474 coltype = ""
2475 args = ""
2476
2477 if coltype in self.ischema_names:
2478 coltype = self.ischema_names[coltype]
2479 elif "INT" in coltype:
2480 coltype = sqltypes.INTEGER
2481 elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype:
2482 coltype = sqltypes.TEXT
2483 elif "BLOB" in coltype or not coltype:
2484 coltype = sqltypes.NullType
2485 elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype:
2486 coltype = sqltypes.REAL
2487 else:
2488 coltype = sqltypes.NUMERIC
2489
2490 if args is not None:
2491 args = re.findall(r"(\d+)", args)
2492 try:
2493 coltype = coltype(*[int(a) for a in args])
2494 except TypeError:
2495 util.warn(
2496 "Could not instantiate type %s with "
2497 "reflected arguments %s; using no arguments."
2498 % (coltype, args)
2499 )
2500 coltype = coltype()
2501 else:
2502 coltype = coltype()
2503
2504 return coltype
2505
2506 @reflection.cache
2507 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
2508 constraint_name = None
2509 table_data = self._get_table_sql(connection, table_name, schema=schema)
2510 if table_data:
2511 PK_PATTERN = r"CONSTRAINT (\w+) PRIMARY KEY"
2512 result = re.search(PK_PATTERN, table_data, re.I)
2513 constraint_name = result.group(1) if result else None
2514
2515 cols = self.get_columns(connection, table_name, schema, **kw)
2516 # consider only pk columns. This also avoids sorting the cached
2517 # value returned by get_columns
2518 cols = [col for col in cols if col.get("primary_key", 0) > 0]
2519 cols.sort(key=lambda col: col.get("primary_key"))
2520 pkeys = [col["name"] for col in cols]
2521
2522 if pkeys:
2523 return {"constrained_columns": pkeys, "name": constraint_name}
2524 else:
2525 return ReflectionDefaults.pk_constraint()
2526
2527 @reflection.cache
2528 def get_foreign_keys(self, connection, table_name, schema=None, **kw):
2529 # sqlite makes this *extremely difficult*.
2530 # First, use the pragma to get the actual FKs.
2531 pragma_fks = self._get_table_pragma(
2532 connection, "foreign_key_list", table_name, schema=schema
2533 )
2534
2535 fks = {}
2536
2537 for row in pragma_fks:
2538 (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4])
2539
2540 if not rcol:
2541 # no referred column, which means it was not named in the
2542 # original DDL. The referred columns of the foreign key
2543 # constraint are therefore the primary key of the referred
2544 # table.
2545 try:
2546 referred_pk = self.get_pk_constraint(
2547 connection, rtbl, schema=schema, **kw
2548 )
2549 referred_columns = referred_pk["constrained_columns"]
2550 except exc.NoSuchTableError:
2551 # ignore not existing parents
2552 referred_columns = []
2553 else:
2554 # note we use this list only if this is the first column
2555 # in the constraint. for subsequent columns we ignore the
2556 # list and append "rcol" if present.
2557 referred_columns = []
2558
2559 if self._broken_fk_pragma_quotes:
2560 rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl)
2561
2562 if numerical_id in fks:
2563 fk = fks[numerical_id]
2564 else:
2565 fk = fks[numerical_id] = {
2566 "name": None,
2567 "constrained_columns": [],
2568 "referred_schema": schema,
2569 "referred_table": rtbl,
2570 "referred_columns": referred_columns,
2571 "options": {},
2572 }
2573 fks[numerical_id] = fk
2574
2575 fk["constrained_columns"].append(lcol)
2576
2577 if rcol:
2578 fk["referred_columns"].append(rcol)
2579
2580 def fk_sig(constrained_columns, referred_table, referred_columns):
2581 return (
2582 tuple(constrained_columns)
2583 + (referred_table,)
2584 + tuple(referred_columns)
2585 )
2586
2587 # then, parse the actual SQL and attempt to find DDL that matches
2588 # the names as well. SQLite saves the DDL in whatever format
2589 # it was typed in as, so need to be liberal here.
2590
2591 keys_by_signature = {
2592 fk_sig(
2593 fk["constrained_columns"],
2594 fk["referred_table"],
2595 fk["referred_columns"],
2596 ): fk
2597 for fk in fks.values()
2598 }
2599
2600 table_data = self._get_table_sql(connection, table_name, schema=schema)
2601
2602 def parse_fks():
2603 if table_data is None:
2604 # system tables, etc.
2605 return
2606
2607 # note that we already have the FKs from PRAGMA above. This whole
2608 # regexp thing is trying to locate additional detail about the
2609 # FKs, namely the name of the constraint and other options.
2610 # so parsing the columns is really about matching it up to what
2611 # we already have.
2612 FK_PATTERN = (
2613 r"(?:CONSTRAINT (\w+) +)?"
2614 r"FOREIGN KEY *\( *(.+?) *\) +"
2615 r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *' # noqa: E501
2616 r"((?:ON (?:DELETE|UPDATE) "
2617 r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)"
2618 r"((?:NOT +)?DEFERRABLE)?"
2619 r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?"
2620 )
2621 for match in re.finditer(FK_PATTERN, table_data, re.I):
2622 (
2623 constraint_name,
2624 constrained_columns,
2625 referred_quoted_name,
2626 referred_name,
2627 referred_columns,
2628 onupdatedelete,
2629 deferrable,
2630 initially,
2631 ) = match.group(1, 2, 3, 4, 5, 6, 7, 8)
2632 constrained_columns = list(
2633 self._find_cols_in_sig(constrained_columns)
2634 )
2635 if not referred_columns:
2636 referred_columns = constrained_columns
2637 else:
2638 referred_columns = list(
2639 self._find_cols_in_sig(referred_columns)
2640 )
2641 referred_name = referred_quoted_name or referred_name
2642 options = {}
2643
2644 for token in re.split(r" *\bON\b *", onupdatedelete.upper()):
2645 if token.startswith("DELETE"):
2646 ondelete = token[6:].strip()
2647 if ondelete and ondelete != "NO ACTION":
2648 options["ondelete"] = ondelete
2649 elif token.startswith("UPDATE"):
2650 onupdate = token[6:].strip()
2651 if onupdate and onupdate != "NO ACTION":
2652 options["onupdate"] = onupdate
2653
2654 if deferrable:
2655 options["deferrable"] = "NOT" not in deferrable.upper()
2656 if initially:
2657 options["initially"] = initially.upper()
2658
2659 yield (
2660 constraint_name,
2661 constrained_columns,
2662 referred_name,
2663 referred_columns,
2664 options,
2665 )
2666
2667 fkeys = []
2668
2669 for (
2670 constraint_name,
2671 constrained_columns,
2672 referred_name,
2673 referred_columns,
2674 options,
2675 ) in parse_fks():
2676 sig = fk_sig(constrained_columns, referred_name, referred_columns)
2677 if sig not in keys_by_signature:
2678 util.warn(
2679 "WARNING: SQL-parsed foreign key constraint "
2680 "'%s' could not be located in PRAGMA "
2681 "foreign_keys for table %s" % (sig, table_name)
2682 )
2683 continue
2684 key = keys_by_signature.pop(sig)
2685 key["name"] = constraint_name
2686 key["options"] = options
2687 fkeys.append(key)
2688 # assume the remainders are the unnamed, inline constraints, just
2689 # use them as is as it's extremely difficult to parse inline
2690 # constraints
2691 fkeys.extend(keys_by_signature.values())
2692 if fkeys:
2693 return fkeys
2694 else:
2695 return ReflectionDefaults.foreign_keys()
2696
2697 def _find_cols_in_sig(self, sig):
2698 for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I):
2699 yield match.group(1) or match.group(2)
2700
2701 @reflection.cache
2702 def get_unique_constraints(
2703 self, connection, table_name, schema=None, **kw
2704 ):
2705 auto_index_by_sig = {}
2706 for idx in self.get_indexes(
2707 connection,
2708 table_name,
2709 schema=schema,
2710 include_auto_indexes=True,
2711 **kw,
2712 ):
2713 if not idx["name"].startswith("sqlite_autoindex"):
2714 continue
2715 sig = tuple(idx["column_names"])
2716 auto_index_by_sig[sig] = idx
2717
2718 table_data = self._get_table_sql(
2719 connection, table_name, schema=schema, **kw
2720 )
2721 unique_constraints = []
2722
2723 def parse_uqs():
2724 if table_data is None:
2725 return
2726 UNIQUE_PATTERN = r'(?:CONSTRAINT "?(.+?)"? +)?UNIQUE *\((.+?)\)'
2727 INLINE_UNIQUE_PATTERN = (
2728 r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?)[\t ]'
2729 r"+[a-z0-9_ ]+?[\t ]+UNIQUE"
2730 )
2731
2732 for match in re.finditer(UNIQUE_PATTERN, table_data, re.I):
2733 name, cols = match.group(1, 2)
2734 yield name, list(self._find_cols_in_sig(cols))
2735
2736 # we need to match inlines as well, as we seek to differentiate
2737 # a UNIQUE constraint from a UNIQUE INDEX, even though these
2738 # are kind of the same thing :)
2739 for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I):
2740 cols = list(
2741 self._find_cols_in_sig(match.group(1) or match.group(2))
2742 )
2743 yield None, cols
2744
2745 for name, cols in parse_uqs():
2746 sig = tuple(cols)
2747 if sig in auto_index_by_sig:
2748 auto_index_by_sig.pop(sig)
2749 parsed_constraint = {"name": name, "column_names": cols}
2750 unique_constraints.append(parsed_constraint)
2751 # NOTE: auto_index_by_sig might not be empty here,
2752 # the PRIMARY KEY may have an entry.
2753 if unique_constraints:
2754 return unique_constraints
2755 else:
2756 return ReflectionDefaults.unique_constraints()
2757
2758 @reflection.cache
2759 def get_check_constraints(self, connection, table_name, schema=None, **kw):
2760 table_data = self._get_table_sql(
2761 connection, table_name, schema=schema, **kw
2762 )
2763
2764 # NOTE NOTE NOTE
2765 # DO NOT CHANGE THIS REGULAR EXPRESSION. There is no known way
2766 # to parse CHECK constraints that contain newlines themselves using
2767 # regular expressions, and the approach here relies upon each
2768 # individual
2769 # CHECK constraint being on a single line by itself. This
2770 # necessarily makes assumptions as to how the CREATE TABLE
2771 # was emitted. A more comprehensive DDL parsing solution would be
2772 # needed to improve upon the current situation. See #11840 for
2773 # background
2774 CHECK_PATTERN = r"(?:CONSTRAINT (.+) +)?CHECK *\( *(.+) *\),? *"
2775 cks = []
2776
2777 for match in re.finditer(CHECK_PATTERN, table_data or "", re.I):
2778
2779 name = match.group(1)
2780
2781 if name:
2782 name = re.sub(r'^"|"$', "", name)
2783
2784 cks.append({"sqltext": match.group(2), "name": name})
2785 cks.sort(key=lambda d: d["name"] or "~") # sort None as last
2786 if cks:
2787 return cks
2788 else:
2789 return ReflectionDefaults.check_constraints()
2790
2791 @reflection.cache
2792 def get_indexes(self, connection, table_name, schema=None, **kw):
2793 pragma_indexes = self._get_table_pragma(
2794 connection, "index_list", table_name, schema=schema
2795 )
2796 indexes = []
2797
2798 # regular expression to extract the filter predicate of a partial
2799 # index. this could fail to extract the predicate correctly on
2800 # indexes created like
2801 # CREATE INDEX i ON t (col || ') where') WHERE col <> ''
2802 # but as this function does not support expression-based indexes
2803 # this case does not occur.
2804 partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE)
2805
2806 if schema:
2807 schema_expr = "%s." % self.identifier_preparer.quote_identifier(
2808 schema
2809 )
2810 else:
2811 schema_expr = ""
2812
2813 include_auto_indexes = kw.pop("include_auto_indexes", False)
2814 for row in pragma_indexes:
2815 # ignore implicit primary key index.
2816 # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html
2817 if not include_auto_indexes and row[1].startswith(
2818 "sqlite_autoindex"
2819 ):
2820 continue
2821 indexes.append(
2822 dict(
2823 name=row[1],
2824 column_names=[],
2825 unique=row[2],
2826 dialect_options={},
2827 )
2828 )
2829
2830 # check partial indexes
2831 if len(row) >= 5 and row[4]:
2832 s = (
2833 "SELECT sql FROM %(schema)ssqlite_master "
2834 "WHERE name = ? "
2835 "AND type = 'index'" % {"schema": schema_expr}
2836 )
2837 rs = connection.exec_driver_sql(s, (row[1],))
2838 index_sql = rs.scalar()
2839 predicate_match = partial_pred_re.search(index_sql)
2840 if predicate_match is None:
2841 # unless the regex is broken this case shouldn't happen
2842 # because we know this is a partial index, so the
2843 # definition sql should match the regex
2844 util.warn(
2845 "Failed to look up filter predicate of "
2846 "partial index %s" % row[1]
2847 )
2848 else:
2849 predicate = predicate_match.group(1)
2850 indexes[-1]["dialect_options"]["sqlite_where"] = text(
2851 predicate
2852 )
2853
2854 # loop thru unique indexes to get the column names.
2855 for idx in list(indexes):
2856 pragma_index = self._get_table_pragma(
2857 connection, "index_info", idx["name"], schema=schema
2858 )
2859
2860 for row in pragma_index:
2861 if row[2] is None:
2862 util.warn(
2863 "Skipped unsupported reflection of "
2864 "expression-based index %s" % idx["name"]
2865 )
2866 indexes.remove(idx)
2867 break
2868 else:
2869 idx["column_names"].append(row[2])
2870
2871 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last
2872 if indexes:
2873 return indexes
2874 elif not self.has_table(connection, table_name, schema):
2875 raise exc.NoSuchTableError(
2876 f"{schema}.{table_name}" if schema else table_name
2877 )
2878 else:
2879 return ReflectionDefaults.indexes()
2880
2881 def _is_sys_table(self, table_name):
2882 return table_name in {
2883 "sqlite_schema",
2884 "sqlite_master",
2885 "sqlite_temp_schema",
2886 "sqlite_temp_master",
2887 }
2888
2889 @reflection.cache
2890 def _get_table_sql(self, connection, table_name, schema=None, **kw):
2891 if schema:
2892 schema_expr = "%s." % (
2893 self.identifier_preparer.quote_identifier(schema)
2894 )
2895 else:
2896 schema_expr = ""
2897 try:
2898 s = (
2899 "SELECT sql FROM "
2900 " (SELECT * FROM %(schema)ssqlite_master UNION ALL "
2901 " SELECT * FROM %(schema)ssqlite_temp_master) "
2902 "WHERE name = ? "
2903 "AND type in ('table', 'view')" % {"schema": schema_expr}
2904 )
2905 rs = connection.exec_driver_sql(s, (table_name,))
2906 except exc.DBAPIError:
2907 s = (
2908 "SELECT sql FROM %(schema)ssqlite_master "
2909 "WHERE name = ? "
2910 "AND type in ('table', 'view')" % {"schema": schema_expr}
2911 )
2912 rs = connection.exec_driver_sql(s, (table_name,))
2913 value = rs.scalar()
2914 if value is None and not self._is_sys_table(table_name):
2915 raise exc.NoSuchTableError(f"{schema_expr}{table_name}")
2916 return value
2917
2918 def _get_table_pragma(self, connection, pragma, table_name, schema=None):
2919 quote = self.identifier_preparer.quote_identifier
2920 if schema is not None:
2921 statements = [f"PRAGMA {quote(schema)}."]
2922 else:
2923 # because PRAGMA looks in all attached databases if no schema
2924 # given, need to specify "main" schema, however since we want
2925 # 'temp' tables in the same namespace as 'main', need to run
2926 # the PRAGMA twice
2927 statements = ["PRAGMA main.", "PRAGMA temp."]
2928
2929 qtable = quote(table_name)
2930 for statement in statements:
2931 statement = f"{statement}{pragma}({qtable})"
2932 cursor = connection.exec_driver_sql(statement)
2933 if not cursor._soft_closed:
2934 # work around SQLite issue whereby cursor.description
2935 # is blank when PRAGMA returns no rows:
2936 # https://www.sqlite.org/cvstrac/tktview?tn=1884
2937 result = cursor.fetchall()
2938 else:
2939 result = []
2940 if result:
2941 return result
2942 else:
2943 return []