1# dialects/sqlite/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10r'''
11.. dialect:: sqlite
12 :name: SQLite
13 :normal_support: 3.12+
14 :best_effort: 3.7.16+
15
16.. _sqlite_datetime:
17
18Date and Time Types
19-------------------
20
21SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does
22not provide out of the box functionality for translating values between Python
23`datetime` objects and a SQLite-supported format. SQLAlchemy's own
24:class:`~sqlalchemy.types.DateTime` and related types provide date formatting
25and parsing functionality when SQLite is used. The implementation classes are
26:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`.
27These types represent dates and times as ISO formatted strings, which also
28nicely support ordering. There's no reliance on typical "libc" internals for
29these functions so historical dates are fully supported.
30
31Ensuring Text affinity
32^^^^^^^^^^^^^^^^^^^^^^
33
34The DDL rendered for these types is the standard ``DATE``, ``TIME``
35and ``DATETIME`` indicators. However, custom storage formats can also be
36applied to these types. When the
37storage format is detected as containing no alpha characters, the DDL for
38these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``,
39so that the column continues to have textual affinity.
40
41.. seealso::
42
43 `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ -
44 in the SQLite documentation
45
46.. _sqlite_autoincrement:
47
48SQLite Auto Incrementing Behavior
49----------------------------------
50
51Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html
52
53Key concepts:
54
55* SQLite has an implicit "auto increment" feature that takes place for any
56 non-composite primary-key column that is specifically created using
57 "INTEGER PRIMARY KEY" for the type + primary key.
58
59* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not**
60 equivalent to the implicit autoincrement feature; this keyword is not
61 recommended for general use. SQLAlchemy does not render this keyword
62 unless a special SQLite-specific directive is used (see below). However,
63 it still requires that the column's type is named "INTEGER".
64
65Using the AUTOINCREMENT Keyword
66^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
67
68To specifically render the AUTOINCREMENT keyword on the primary key column
69when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table
70construct::
71
72 Table(
73 "sometable",
74 metadata,
75 Column("id", Integer, primary_key=True),
76 sqlite_autoincrement=True,
77 )
78
79Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER
80^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
81
82SQLite's typing model is based on naming conventions. Among other things, this
83means that any type name which contains the substring ``"INT"`` will be
84determined to be of "integer affinity". A type named ``"BIGINT"``,
85``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be
86of "integer" affinity. However, **the SQLite autoincrement feature, whether
87implicitly or explicitly enabled, requires that the name of the column's type
88is exactly the string "INTEGER"**. Therefore, if an application uses a type
89like :class:`.BigInteger` for a primary key, on SQLite this type will need to
90be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE
91TABLE`` statement in order for the autoincrement behavior to be available.
92
93One approach to achieve this is to use :class:`.Integer` on SQLite
94only using :meth:`.TypeEngine.with_variant`::
95
96 table = Table(
97 "my_table",
98 metadata,
99 Column(
100 "id",
101 BigInteger().with_variant(Integer, "sqlite"),
102 primary_key=True,
103 ),
104 )
105
106Another is to use a subclass of :class:`.BigInteger` that overrides its DDL
107name to be ``INTEGER`` when compiled against SQLite::
108
109 from sqlalchemy import BigInteger
110 from sqlalchemy.ext.compiler import compiles
111
112
113 class SLBigInteger(BigInteger):
114 pass
115
116
117 @compiles(SLBigInteger, "sqlite")
118 def bi_c(element, compiler, **kw):
119 return "INTEGER"
120
121
122 @compiles(SLBigInteger)
123 def bi_c(element, compiler, **kw):
124 return compiler.visit_BIGINT(element, **kw)
125
126
127 table = Table(
128 "my_table", metadata, Column("id", SLBigInteger(), primary_key=True)
129 )
130
131.. seealso::
132
133 :meth:`.TypeEngine.with_variant`
134
135 :ref:`sqlalchemy.ext.compiler_toplevel`
136
137 `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_
138
139.. _sqlite_transactions:
140
141Transactions with SQLite and the sqlite3 driver
142-----------------------------------------------
143
144As a file-based database, SQLite's approach to transactions differs from
145traditional databases in many ways. Additionally, the ``sqlite3`` driver
146standard with Python (as well as the async version ``aiosqlite`` which builds
147on top of it) has several quirks, workarounds, and API features in the
148area of transaction control, all of which generally need to be addressed when
149constructing a SQLAlchemy application that uses SQLite.
150
151Legacy Transaction Mode with the sqlite3 driver
152^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
153
154The most important aspect of transaction handling with the sqlite3 driver is
155that it defaults (which will continue through Python 3.15 before being
156removed in Python 3.16) to legacy transactional behavior which does
157not strictly follow :pep:`249`. The way in which the driver diverges from the
158PEP is that it does not "begin" a transaction automatically as dictated by
159:pep:`249` except in the case of DML statements, e.g. INSERT, UPDATE, and
160DELETE. Normally, :pep:`249` dictates that a BEGIN must be emitted upon
161the first SQL statement of any kind, so that all subsequent operations will
162be established within a transaction until ``connection.commit()`` has been
163called. The ``sqlite3`` driver, in an effort to be easier to use in
164highly concurrent environments, skips this step for DQL (e.g. SELECT) statements,
165and also skips it for DDL (e.g. CREATE TABLE etc.) statements for more legacy
166reasons. Statements such as SAVEPOINT are also skipped.
167
168In modern versions of the ``sqlite3`` driver as of Python 3.12, this legacy
169mode of operation is referred to as
170`"legacy transaction control" <https://docs.python.org/3/library/sqlite3.html#sqlite3-transaction-control-isolation-level>`_, and is in
171effect by default due to the ``Connection.autocommit`` parameter being set to
172the constant ``sqlite3.LEGACY_TRANSACTION_CONTROL``. Prior to Python 3.12,
173the ``Connection.autocommit`` attribute did not exist.
174
175The implications of legacy transaction mode include:
176
177* **Incorrect support for transactional DDL** - statements like CREATE TABLE, ALTER TABLE,
178 CREATE INDEX etc. will not automatically BEGIN a transaction if one were not
179 started already, leading to the changes by each statement being
180 "autocommitted" immediately unless BEGIN were otherwise emitted first. Very
181 old (pre Python 3.6) versions of SQLite would also force a COMMIT for these
182 operations even if a transaction were present, however this is no longer the
183 case.
184* **SERIALIZABLE behavior not fully functional** - SQLite's transaction isolation
185 behavior is normally consistent with SERIALIZABLE isolation, as it is a file-
186 based system that locks the database file entirely for write operations,
187 preventing COMMIT until all reader transactions (and associated file locks)
188 have completed. However, sqlite3's legacy transaction mode fails to emit BEGIN for SELECT
189 statements, which causes these SELECT statements to no longer be "repeatable",
190 failing one of the consistency guarantees of SERIALIZABLE.
191* **Incorrect behavior for SAVEPOINT** - as the SAVEPOINT statement does not
192 imply a BEGIN, a new SAVEPOINT emitted before a BEGIN will function on its
193 own but fails to participate in the enclosing transaction, meaning a ROLLBACK
194 of the transaction will not rollback elements that were part of a released
195 savepoint.
196
197Legacy transaction mode first existed in order to faciliate working around
198SQLite's file locks. Because SQLite relies upon whole-file locks, it is easy to
199get "database is locked" errors, particularly when newer features like "write
200ahead logging" are disabled. This is a key reason why ``sqlite3``'s legacy
201transaction mode is still the default mode of operation; disabling it will
202produce behavior that is more susceptible to locked database errors. However
203note that **legacy transaction mode will no longer be the default** in a future
204Python version (3.16 as of this writing).
205
206.. _sqlite_enabling_transactions:
207
208Enabling Non-Legacy SQLite Transactional Modes with the sqlite3 or aiosqlite driver
209^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
210
211Current SQLAlchemy support allows either for setting the
212``.Connection.autocommit`` attribute, most directly by using a
213:func:`._sa.create_engine` parameter, or if on an older version of Python where
214the attribute is not available, using event hooks to control the behavior of
215BEGIN.
216
217* **Enabling modern sqlite3 transaction control via the autocommit connect parameter** (Python 3.12 and above)
218
219 To use SQLite in the mode described at `Transaction control via the autocommit attribute <https://docs.python.org/3/library/sqlite3.html#transaction-control-via-the-autocommit-attribute>`_,
220 the most straightforward approach is to set the attribute to its recommended value
221 of ``False`` at the connect level using :paramref:`_sa.create_engine.connect_args``::
222
223 from sqlalchemy import create_engine
224
225 engine = create_engine(
226 "sqlite:///myfile.db", connect_args={"autocommit": False}
227 )
228
229 This parameter is also passed through when using the aiosqlite driver::
230
231 from sqlalchemy.ext.asyncio import create_async_engine
232
233 engine = create_async_engine(
234 "sqlite+aiosqlite:///myfile.db", connect_args={"autocommit": False}
235 )
236
237 The parameter can also be set at the attribute level using the :meth:`.PoolEvents.connect`
238 event hook, however this will only work for sqlite3, as aiosqlite does not yet expose this
239 attribute on its ``Connection`` object::
240
241 from sqlalchemy import create_engine, event
242
243 engine = create_engine("sqlite:///myfile.db")
244
245
246 @event.listens_for(engine, "connect")
247 def do_connect(dbapi_connection, connection_record):
248 # enable autocommit=False mode
249 dbapi_connection.autocommit = False
250
251* **Using SQLAlchemy to emit BEGIN in lieu of SQLite's transaction control** (all Python versions, sqlite3 and aiosqlite)
252
253 For older versions of ``sqlite3`` or for cross-compatiblity with older and
254 newer versions, SQLAlchemy can also take over the job of transaction control.
255 This is achieved by using the :meth:`.ConnectionEvents.begin` hook
256 to emit the "BEGIN" command directly, while also disabling SQLite's control
257 of this command using the :meth:`.PoolEvents.connect` event hook to set the
258 ``Connection.isolation_level`` attribute to ``None``::
259
260
261 from sqlalchemy import create_engine, event
262
263 engine = create_engine("sqlite:///myfile.db")
264
265
266 @event.listens_for(engine, "connect")
267 def do_connect(dbapi_connection, connection_record):
268 # disable sqlite3's emitting of the BEGIN statement entirely.
269 dbapi_connection.isolation_level = None
270
271
272 @event.listens_for(engine, "begin")
273 def do_begin(conn):
274 # emit our own BEGIN. sqlite3 still emits COMMIT/ROLLBACK correctly
275 conn.exec_driver_sql("BEGIN")
276
277 When using the asyncio variant ``aiosqlite``, refer to ``engine.sync_engine``
278 as in the example below::
279
280 from sqlalchemy import create_engine, event
281 from sqlalchemy.ext.asyncio import create_async_engine
282
283 engine = create_async_engine("sqlite+aiosqlite:///myfile.db")
284
285
286 @event.listens_for(engine.sync_engine, "connect")
287 def do_connect(dbapi_connection, connection_record):
288 # disable aiosqlite's emitting of the BEGIN statement entirely.
289 dbapi_connection.isolation_level = None
290
291
292 @event.listens_for(engine.sync_engine, "begin")
293 def do_begin(conn):
294 # emit our own BEGIN. aiosqlite still emits COMMIT/ROLLBACK correctly
295 conn.exec_driver_sql("BEGIN")
296
297.. _sqlite_isolation_level:
298
299Using SQLAlchemy's Driver Level AUTOCOMMIT Feature with SQLite
300^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
301
302SQLAlchemy has a comprehensive database isolation feature with optional
303autocommit support that is introduced in the section :ref:`dbapi_autocommit`.
304
305For the ``sqlite3`` and ``aiosqlite`` drivers, SQLAlchemy only includes
306built-in support for "AUTOCOMMIT". Note that this mode is currently incompatible
307with the non-legacy isolation mode hooks documented in the previous
308section at :ref:`sqlite_enabling_transactions`.
309
310To use the ``sqlite3`` driver with SQLAlchemy driver-level autocommit,
311create an engine setting the :paramref:`_sa.create_engine.isolation_level`
312parameter to "AUTOCOMMIT"::
313
314 eng = create_engine("sqlite:///myfile.db", isolation_level="AUTOCOMMIT")
315
316When using the above mode, any event hooks that set the sqlite3 ``Connection.autocommit``
317parameter away from its default of ``sqlite3.LEGACY_TRANSACTION_CONTROL``
318as well as hooks that emit ``BEGIN`` should be disabled.
319
320Additional Reading for SQLite / sqlite3 transaction control
321^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
322
323Links with important information on SQLite, the sqlite3 driver,
324as well as long historical conversations on how things got to their current state:
325
326* `Isolation in SQLite <https://www.sqlite.org/isolation.html>`_ - on the SQLite website
327* `Transaction control <https://docs.python.org/3/library/sqlite3.html#transaction-control>`_ - describes the sqlite3 autocommit attribute as well
328 as the legacy isolation_level attribute.
329* `sqlite3 SELECT does not BEGIN a transaction, but should according to spec <https://github.com/python/cpython/issues/54133>`_ - imported Python standard library issue on github
330* `sqlite3 module breaks transactions and potentially corrupts data <https://github.com/python/cpython/issues/54949>`_ - imported Python standard library issue on github
331
332
333INSERT/UPDATE/DELETE...RETURNING
334---------------------------------
335
336The SQLite dialect supports SQLite 3.35's ``INSERT|UPDATE|DELETE..RETURNING``
337syntax. ``INSERT..RETURNING`` may be used
338automatically in some cases in order to fetch newly generated identifiers in
339place of the traditional approach of using ``cursor.lastrowid``, however
340``cursor.lastrowid`` is currently still preferred for simple single-statement
341cases for its better performance.
342
343To specify an explicit ``RETURNING`` clause, use the
344:meth:`._UpdateBase.returning` method on a per-statement basis::
345
346 # INSERT..RETURNING
347 result = connection.execute(
348 table.insert().values(name="foo").returning(table.c.col1, table.c.col2)
349 )
350 print(result.all())
351
352 # UPDATE..RETURNING
353 result = connection.execute(
354 table.update()
355 .where(table.c.name == "foo")
356 .values(name="bar")
357 .returning(table.c.col1, table.c.col2)
358 )
359 print(result.all())
360
361 # DELETE..RETURNING
362 result = connection.execute(
363 table.delete()
364 .where(table.c.name == "foo")
365 .returning(table.c.col1, table.c.col2)
366 )
367 print(result.all())
368
369.. versionadded:: 2.0 Added support for SQLite RETURNING
370
371
372.. _sqlite_foreign_keys:
373
374Foreign Key Support
375-------------------
376
377SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables,
378however by default these constraints have no effect on the operation of the
379table.
380
381Constraint checking on SQLite has three prerequisites:
382
383* At least version 3.6.19 of SQLite must be in use
384* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY
385 or SQLITE_OMIT_TRIGGER symbols enabled.
386* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all
387 connections before use -- including the initial call to
388 :meth:`sqlalchemy.schema.MetaData.create_all`.
389
390SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for
391new connections through the usage of events::
392
393 from sqlalchemy.engine import Engine
394 from sqlalchemy import event
395
396
397 @event.listens_for(Engine, "connect")
398 def set_sqlite_pragma(dbapi_connection, connection_record):
399 cursor = dbapi_connection.cursor()
400 cursor.execute("PRAGMA foreign_keys=ON")
401 cursor.close()
402
403.. warning::
404
405 When SQLite foreign keys are enabled, it is **not possible**
406 to emit CREATE or DROP statements for tables that contain
407 mutually-dependent foreign key constraints;
408 to emit the DDL for these tables requires that ALTER TABLE be used to
409 create or drop these constraints separately, for which SQLite has
410 no support.
411
412.. seealso::
413
414 `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_
415 - on the SQLite web site.
416
417 :ref:`event_toplevel` - SQLAlchemy event API.
418
419 :ref:`use_alter` - more information on SQLAlchemy's facilities for handling
420 mutually-dependent foreign key constraints.
421
422.. _sqlite_on_conflict_ddl:
423
424ON CONFLICT support for constraints
425-----------------------------------
426
427.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for
428 SQLite, which occurs within a CREATE TABLE statement. For "ON CONFLICT" as
429 applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`.
430
431SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied
432to primary key, unique, check, and not null constraints. In DDL, it is
433rendered either within the "CONSTRAINT" clause or within the column definition
434itself depending on the location of the target constraint. To render this
435clause within DDL, the extension parameter ``sqlite_on_conflict`` can be
436specified with a string conflict resolution algorithm within the
437:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`,
438:class:`.CheckConstraint` objects. Within the :class:`_schema.Column` object,
439there
440are individual parameters ``sqlite_on_conflict_not_null``,
441``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each
442correspond to the three types of relevant constraint types that can be
443indicated from a :class:`_schema.Column` object.
444
445.. seealso::
446
447 `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite
448 documentation
449
450.. versionadded:: 1.3
451
452
453The ``sqlite_on_conflict`` parameters accept a string argument which is just
454the resolution name to be chosen, which on SQLite can be one of ROLLBACK,
455ABORT, FAIL, IGNORE, and REPLACE. For example, to add a UNIQUE constraint
456that specifies the IGNORE algorithm::
457
458 some_table = Table(
459 "some_table",
460 metadata,
461 Column("id", Integer, primary_key=True),
462 Column("data", Integer),
463 UniqueConstraint("id", "data", sqlite_on_conflict="IGNORE"),
464 )
465
466The above renders CREATE TABLE DDL as:
467
468.. sourcecode:: sql
469
470 CREATE TABLE some_table (
471 id INTEGER NOT NULL,
472 data INTEGER,
473 PRIMARY KEY (id),
474 UNIQUE (id, data) ON CONFLICT IGNORE
475 )
476
477
478When using the :paramref:`_schema.Column.unique`
479flag to add a UNIQUE constraint
480to a single column, the ``sqlite_on_conflict_unique`` parameter can
481be added to the :class:`_schema.Column` as well, which will be added to the
482UNIQUE constraint in the DDL::
483
484 some_table = Table(
485 "some_table",
486 metadata,
487 Column("id", Integer, primary_key=True),
488 Column(
489 "data", Integer, unique=True, sqlite_on_conflict_unique="IGNORE"
490 ),
491 )
492
493rendering:
494
495.. sourcecode:: sql
496
497 CREATE TABLE some_table (
498 id INTEGER NOT NULL,
499 data INTEGER,
500 PRIMARY KEY (id),
501 UNIQUE (data) ON CONFLICT IGNORE
502 )
503
504To apply the FAIL algorithm for a NOT NULL constraint,
505``sqlite_on_conflict_not_null`` is used::
506
507 some_table = Table(
508 "some_table",
509 metadata,
510 Column("id", Integer, primary_key=True),
511 Column(
512 "data", Integer, nullable=False, sqlite_on_conflict_not_null="FAIL"
513 ),
514 )
515
516this renders the column inline ON CONFLICT phrase:
517
518.. sourcecode:: sql
519
520 CREATE TABLE some_table (
521 id INTEGER NOT NULL,
522 data INTEGER NOT NULL ON CONFLICT FAIL,
523 PRIMARY KEY (id)
524 )
525
526
527Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``::
528
529 some_table = Table(
530 "some_table",
531 metadata,
532 Column(
533 "id",
534 Integer,
535 primary_key=True,
536 sqlite_on_conflict_primary_key="FAIL",
537 ),
538 )
539
540SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict
541resolution algorithm is applied to the constraint itself:
542
543.. sourcecode:: sql
544
545 CREATE TABLE some_table (
546 id INTEGER NOT NULL,
547 PRIMARY KEY (id) ON CONFLICT FAIL
548 )
549
550.. _sqlite_on_conflict_insert:
551
552INSERT...ON CONFLICT (Upsert)
553-----------------------------
554
555.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for
556 SQLite, which occurs within an INSERT statement. For "ON CONFLICT" as
557 applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`.
558
559From version 3.24.0 onwards, SQLite supports "upserts" (update or insert)
560of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT``
561statement. A candidate row will only be inserted if that row does not violate
562any unique or primary key constraints. In the case of a unique constraint violation, a
563secondary action can occur which can be either "DO UPDATE", indicating that
564the data in the target row should be updated, or "DO NOTHING", which indicates
565to silently skip this row.
566
567Conflicts are determined using columns that are part of existing unique
568constraints and indexes. These constraints are identified by stating the
569columns and conditions that comprise the indexes.
570
571SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific
572:func:`_sqlite.insert()` function, which provides
573the generative methods :meth:`_sqlite.Insert.on_conflict_do_update`
574and :meth:`_sqlite.Insert.on_conflict_do_nothing`:
575
576.. sourcecode:: pycon+sql
577
578 >>> from sqlalchemy.dialects.sqlite import insert
579
580 >>> insert_stmt = insert(my_table).values(
581 ... id="some_existing_id", data="inserted value"
582 ... )
583
584 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
585 ... index_elements=["id"], set_=dict(data="updated value")
586 ... )
587
588 >>> print(do_update_stmt)
589 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
590 ON CONFLICT (id) DO UPDATE SET data = ?{stop}
591
592 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"])
593
594 >>> print(do_nothing_stmt)
595 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
596 ON CONFLICT (id) DO NOTHING
597
598.. versionadded:: 1.4
599
600.. seealso::
601
602 `Upsert
603 <https://sqlite.org/lang_UPSERT.html>`_
604 - in the SQLite documentation.
605
606
607Specifying the Target
608^^^^^^^^^^^^^^^^^^^^^
609
610Both methods supply the "target" of the conflict using column inference:
611
612* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument
613 specifies a sequence containing string column names, :class:`_schema.Column`
614 objects, and/or SQL expression elements, which would identify a unique index
615 or unique constraint.
616
617* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements`
618 to infer an index, a partial index can be inferred by also specifying the
619 :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter:
620
621 .. sourcecode:: pycon+sql
622
623 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data")
624
625 >>> do_update_stmt = stmt.on_conflict_do_update(
626 ... index_elements=[my_table.c.user_email],
627 ... index_where=my_table.c.user_email.like("%@gmail.com"),
628 ... set_=dict(data=stmt.excluded.data),
629 ... )
630
631 >>> print(do_update_stmt)
632 {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?)
633 ON CONFLICT (user_email)
634 WHERE user_email LIKE '%@gmail.com'
635 DO UPDATE SET data = excluded.data
636
637The SET Clause
638^^^^^^^^^^^^^^^
639
640``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
641existing row, using any combination of new values as well as values
642from the proposed insertion. These values are specified using the
643:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter. This
644parameter accepts a dictionary which consists of direct values
645for UPDATE:
646
647.. sourcecode:: pycon+sql
648
649 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
650
651 >>> do_update_stmt = stmt.on_conflict_do_update(
652 ... index_elements=["id"], set_=dict(data="updated value")
653 ... )
654
655 >>> print(do_update_stmt)
656 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
657 ON CONFLICT (id) DO UPDATE SET data = ?
658
659.. warning::
660
661 The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take
662 into account Python-side default UPDATE values or generation functions,
663 e.g. those specified using :paramref:`_schema.Column.onupdate`. These
664 values will not be exercised for an ON CONFLICT style of UPDATE, unless
665 they are manually specified in the
666 :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary.
667
668Updating using the Excluded INSERT Values
669^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
670
671In order to refer to the proposed insertion row, the special alias
672:attr:`~.sqlite.Insert.excluded` is available as an attribute on
673the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix
674on a column, that informs the DO UPDATE to update the row with the value that
675would have been inserted had the constraint not failed:
676
677.. sourcecode:: pycon+sql
678
679 >>> stmt = insert(my_table).values(
680 ... id="some_id", data="inserted value", author="jlh"
681 ... )
682
683 >>> do_update_stmt = stmt.on_conflict_do_update(
684 ... index_elements=["id"],
685 ... set_=dict(data="updated value", author=stmt.excluded.author),
686 ... )
687
688 >>> print(do_update_stmt)
689 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
690 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
691
692Additional WHERE Criteria
693^^^^^^^^^^^^^^^^^^^^^^^^^
694
695The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts
696a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where`
697parameter, which will limit those rows which receive an UPDATE:
698
699.. sourcecode:: pycon+sql
700
701 >>> stmt = insert(my_table).values(
702 ... id="some_id", data="inserted value", author="jlh"
703 ... )
704
705 >>> on_update_stmt = stmt.on_conflict_do_update(
706 ... index_elements=["id"],
707 ... set_=dict(data="updated value", author=stmt.excluded.author),
708 ... where=(my_table.c.status == 2),
709 ... )
710 >>> print(on_update_stmt)
711 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
712 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
713 WHERE my_table.status = ?
714
715
716Skipping Rows with DO NOTHING
717^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
718
719``ON CONFLICT`` may be used to skip inserting a row entirely
720if any conflict with a unique constraint occurs; below this is illustrated
721using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method:
722
723.. sourcecode:: pycon+sql
724
725 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
726 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
727 >>> print(stmt)
728 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING
729
730
731If ``DO NOTHING`` is used without specifying any columns or constraint,
732it has the effect of skipping the INSERT for any unique violation which
733occurs:
734
735.. sourcecode:: pycon+sql
736
737 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
738 >>> stmt = stmt.on_conflict_do_nothing()
739 >>> print(stmt)
740 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING
741
742.. _sqlite_type_reflection:
743
744Type Reflection
745---------------
746
747SQLite types are unlike those of most other database backends, in that
748the string name of the type usually does not correspond to a "type" in a
749one-to-one fashion. Instead, SQLite links per-column typing behavior
750to one of five so-called "type affinities" based on a string matching
751pattern for the type.
752
753SQLAlchemy's reflection process, when inspecting types, uses a simple
754lookup table to link the keywords returned to provided SQLAlchemy types.
755This lookup table is present within the SQLite dialect as it is for all
756other dialects. However, the SQLite dialect has a different "fallback"
757routine for when a particular type name is not located in the lookup map;
758it instead implements the SQLite "type affinity" scheme located at
759https://www.sqlite.org/datatype3.html section 2.1.
760
761The provided typemap will make direct associations from an exact string
762name match for the following types:
763
764:class:`_types.BIGINT`, :class:`_types.BLOB`,
765:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`,
766:class:`_types.CHAR`, :class:`_types.DATE`,
767:class:`_types.DATETIME`, :class:`_types.FLOAT`,
768:class:`_types.DECIMAL`, :class:`_types.FLOAT`,
769:class:`_types.INTEGER`, :class:`_types.INTEGER`,
770:class:`_types.NUMERIC`, :class:`_types.REAL`,
771:class:`_types.SMALLINT`, :class:`_types.TEXT`,
772:class:`_types.TIME`, :class:`_types.TIMESTAMP`,
773:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`,
774:class:`_types.NCHAR`
775
776When a type name does not match one of the above types, the "type affinity"
777lookup is used instead:
778
779* :class:`_types.INTEGER` is returned if the type name includes the
780 string ``INT``
781* :class:`_types.TEXT` is returned if the type name includes the
782 string ``CHAR``, ``CLOB`` or ``TEXT``
783* :class:`_types.NullType` is returned if the type name includes the
784 string ``BLOB``
785* :class:`_types.REAL` is returned if the type name includes the string
786 ``REAL``, ``FLOA`` or ``DOUB``.
787* Otherwise, the :class:`_types.NUMERIC` type is used.
788
789.. _sqlite_partial_index:
790
791Partial Indexes
792---------------
793
794A partial index, e.g. one which uses a WHERE clause, can be specified
795with the DDL system using the argument ``sqlite_where``::
796
797 tbl = Table("testtbl", m, Column("data", Integer))
798 idx = Index(
799 "test_idx1",
800 tbl.c.data,
801 sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10),
802 )
803
804The index will be rendered at create time as:
805
806.. sourcecode:: sql
807
808 CREATE INDEX test_idx1 ON testtbl (data)
809 WHERE data > 5 AND data < 10
810
811.. _sqlite_dotted_column_names:
812
813Dotted Column Names
814-------------------
815
816Using table or column names that explicitly have periods in them is
817**not recommended**. While this is generally a bad idea for relational
818databases in general, as the dot is a syntactically significant character,
819the SQLite driver up until version **3.10.0** of SQLite has a bug which
820requires that SQLAlchemy filter out these dots in result sets.
821
822The bug, entirely outside of SQLAlchemy, can be illustrated thusly::
823
824 import sqlite3
825
826 assert sqlite3.sqlite_version_info < (
827 3,
828 10,
829 0,
830 ), "bug is fixed in this version"
831
832 conn = sqlite3.connect(":memory:")
833 cursor = conn.cursor()
834
835 cursor.execute("create table x (a integer, b integer)")
836 cursor.execute("insert into x (a, b) values (1, 1)")
837 cursor.execute("insert into x (a, b) values (2, 2)")
838
839 cursor.execute("select x.a, x.b from x")
840 assert [c[0] for c in cursor.description] == ["a", "b"]
841
842 cursor.execute(
843 """
844 select x.a, x.b from x where a=1
845 union
846 select x.a, x.b from x where a=2
847 """
848 )
849 assert [c[0] for c in cursor.description] == ["a", "b"], [
850 c[0] for c in cursor.description
851 ]
852
853The second assertion fails:
854
855.. sourcecode:: text
856
857 Traceback (most recent call last):
858 File "test.py", line 19, in <module>
859 [c[0] for c in cursor.description]
860 AssertionError: ['x.a', 'x.b']
861
862Where above, the driver incorrectly reports the names of the columns
863including the name of the table, which is entirely inconsistent vs.
864when the UNION is not present.
865
866SQLAlchemy relies upon column names being predictable in how they match
867to the original statement, so the SQLAlchemy dialect has no choice but
868to filter these out::
869
870
871 from sqlalchemy import create_engine
872
873 eng = create_engine("sqlite://")
874 conn = eng.connect()
875
876 conn.exec_driver_sql("create table x (a integer, b integer)")
877 conn.exec_driver_sql("insert into x (a, b) values (1, 1)")
878 conn.exec_driver_sql("insert into x (a, b) values (2, 2)")
879
880 result = conn.exec_driver_sql("select x.a, x.b from x")
881 assert result.keys() == ["a", "b"]
882
883 result = conn.exec_driver_sql(
884 """
885 select x.a, x.b from x where a=1
886 union
887 select x.a, x.b from x where a=2
888 """
889 )
890 assert result.keys() == ["a", "b"]
891
892Note that above, even though SQLAlchemy filters out the dots, *both
893names are still addressable*::
894
895 >>> row = result.first()
896 >>> row["a"]
897 1
898 >>> row["x.a"]
899 1
900 >>> row["b"]
901 1
902 >>> row["x.b"]
903 1
904
905Therefore, the workaround applied by SQLAlchemy only impacts
906:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In
907the very specific case where an application is forced to use column names that
908contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and
909:meth:`.Row.keys()` is required to return these dotted names unmodified,
910the ``sqlite_raw_colnames`` execution option may be provided, either on a
911per-:class:`_engine.Connection` basis::
912
913 result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql(
914 """
915 select x.a, x.b from x where a=1
916 union
917 select x.a, x.b from x where a=2
918 """
919 )
920 assert result.keys() == ["x.a", "x.b"]
921
922or on a per-:class:`_engine.Engine` basis::
923
924 engine = create_engine(
925 "sqlite://", execution_options={"sqlite_raw_colnames": True}
926 )
927
928When using the per-:class:`_engine.Engine` execution option, note that
929**Core and ORM queries that use UNION may not function properly**.
930
931SQLite-specific table options
932-----------------------------
933
934One option for CREATE TABLE is supported directly by the SQLite
935dialect in conjunction with the :class:`_schema.Table` construct:
936
937* ``WITHOUT ROWID``::
938
939 Table("some_table", metadata, ..., sqlite_with_rowid=False)
940
941*
942 ``STRICT``::
943
944 Table("some_table", metadata, ..., sqlite_strict=True)
945
946 .. versionadded:: 2.0.37
947
948.. seealso::
949
950 `SQLite CREATE TABLE options
951 <https://www.sqlite.org/lang_createtable.html>`_
952
953.. _sqlite_include_internal:
954
955Reflecting internal schema tables
956----------------------------------
957
958Reflection methods that return lists of tables will omit so-called
959"SQLite internal schema object" names, which are considered by SQLite
960as any object name that is prefixed with ``sqlite_``. An example of
961such an object is the ``sqlite_sequence`` table that's generated when
962the ``AUTOINCREMENT`` column parameter is used. In order to return
963these objects, the parameter ``sqlite_include_internal=True`` may be
964passed to methods such as :meth:`_schema.MetaData.reflect` or
965:meth:`.Inspector.get_table_names`.
966
967.. versionadded:: 2.0 Added the ``sqlite_include_internal=True`` parameter.
968 Previously, these tables were not ignored by SQLAlchemy reflection
969 methods.
970
971.. note::
972
973 The ``sqlite_include_internal`` parameter does not refer to the
974 "system" tables that are present in schemas such as ``sqlite_master``.
975
976.. seealso::
977
978 `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite
979 documentation.
980
981''' # noqa
982from __future__ import annotations
983
984import datetime
985import numbers
986import re
987from typing import Optional
988
989from .json import JSON
990from .json import JSONIndexType
991from .json import JSONPathType
992from ... import exc
993from ... import schema as sa_schema
994from ... import sql
995from ... import text
996from ... import types as sqltypes
997from ... import util
998from ...engine import default
999from ...engine import processors
1000from ...engine import reflection
1001from ...engine.reflection import ReflectionDefaults
1002from ...sql import coercions
1003from ...sql import compiler
1004from ...sql import elements
1005from ...sql import roles
1006from ...sql import schema
1007from ...types import BLOB # noqa
1008from ...types import BOOLEAN # noqa
1009from ...types import CHAR # noqa
1010from ...types import DECIMAL # noqa
1011from ...types import FLOAT # noqa
1012from ...types import INTEGER # noqa
1013from ...types import NUMERIC # noqa
1014from ...types import REAL # noqa
1015from ...types import SMALLINT # noqa
1016from ...types import TEXT # noqa
1017from ...types import TIMESTAMP # noqa
1018from ...types import VARCHAR # noqa
1019
1020
1021class _SQliteJson(JSON):
1022 def result_processor(self, dialect, coltype):
1023 default_processor = super().result_processor(dialect, coltype)
1024
1025 def process(value):
1026 try:
1027 return default_processor(value)
1028 except TypeError:
1029 if isinstance(value, numbers.Number):
1030 return value
1031 else:
1032 raise
1033
1034 return process
1035
1036
1037class _DateTimeMixin:
1038 _reg = None
1039 _storage_format = None
1040
1041 def __init__(self, storage_format=None, regexp=None, **kw):
1042 super().__init__(**kw)
1043 if regexp is not None:
1044 self._reg = re.compile(regexp)
1045 if storage_format is not None:
1046 self._storage_format = storage_format
1047
1048 @property
1049 def format_is_text_affinity(self):
1050 """return True if the storage format will automatically imply
1051 a TEXT affinity.
1052
1053 If the storage format contains no non-numeric characters,
1054 it will imply a NUMERIC storage format on SQLite; in this case,
1055 the type will generate its DDL as DATE_CHAR, DATETIME_CHAR,
1056 TIME_CHAR.
1057
1058 """
1059 spec = self._storage_format % {
1060 "year": 0,
1061 "month": 0,
1062 "day": 0,
1063 "hour": 0,
1064 "minute": 0,
1065 "second": 0,
1066 "microsecond": 0,
1067 }
1068 return bool(re.search(r"[^0-9]", spec))
1069
1070 def adapt(self, cls, **kw):
1071 if issubclass(cls, _DateTimeMixin):
1072 if self._storage_format:
1073 kw["storage_format"] = self._storage_format
1074 if self._reg:
1075 kw["regexp"] = self._reg
1076 return super().adapt(cls, **kw)
1077
1078 def literal_processor(self, dialect):
1079 bp = self.bind_processor(dialect)
1080
1081 def process(value):
1082 return "'%s'" % bp(value)
1083
1084 return process
1085
1086
1087class DATETIME(_DateTimeMixin, sqltypes.DateTime):
1088 r"""Represent a Python datetime object in SQLite using a string.
1089
1090 The default string storage format is::
1091
1092 "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1093
1094 e.g.:
1095
1096 .. sourcecode:: text
1097
1098 2021-03-15 12:05:57.105542
1099
1100 The incoming storage format is by default parsed using the
1101 Python ``datetime.fromisoformat()`` function.
1102
1103 .. versionchanged:: 2.0 ``datetime.fromisoformat()`` is used for default
1104 datetime string parsing.
1105
1106 The storage format can be customized to some degree using the
1107 ``storage_format`` and ``regexp`` parameters, such as::
1108
1109 import re
1110 from sqlalchemy.dialects.sqlite import DATETIME
1111
1112 dt = DATETIME(
1113 storage_format=(
1114 "%(year)04d/%(month)02d/%(day)02d %(hour)02d:%(minute)02d:%(second)02d"
1115 ),
1116 regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)",
1117 )
1118
1119 :param truncate_microseconds: when ``True`` microseconds will be truncated
1120 from the datetime. Can't be specified together with ``storage_format``
1121 or ``regexp``.
1122
1123 :param storage_format: format string which will be applied to the dict
1124 with keys year, month, day, hour, minute, second, and microsecond.
1125
1126 :param regexp: regular expression which will be applied to incoming result
1127 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming
1128 strings. If the regexp contains named groups, the resulting match dict is
1129 applied to the Python datetime() constructor as keyword arguments.
1130 Otherwise, if positional groups are used, the datetime() constructor
1131 is called with positional arguments via
1132 ``*map(int, match_obj.groups(0))``.
1133
1134 """ # noqa
1135
1136 _storage_format = (
1137 "%(year)04d-%(month)02d-%(day)02d "
1138 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1139 )
1140
1141 def __init__(self, *args, **kwargs):
1142 truncate_microseconds = kwargs.pop("truncate_microseconds", False)
1143 super().__init__(*args, **kwargs)
1144 if truncate_microseconds:
1145 assert "storage_format" not in kwargs, (
1146 "You can specify only "
1147 "one of truncate_microseconds or storage_format."
1148 )
1149 assert "regexp" not in kwargs, (
1150 "You can specify only one of "
1151 "truncate_microseconds or regexp."
1152 )
1153 self._storage_format = (
1154 "%(year)04d-%(month)02d-%(day)02d "
1155 "%(hour)02d:%(minute)02d:%(second)02d"
1156 )
1157
1158 def bind_processor(self, dialect):
1159 datetime_datetime = datetime.datetime
1160 datetime_date = datetime.date
1161 format_ = self._storage_format
1162
1163 def process(value):
1164 if value is None:
1165 return None
1166 elif isinstance(value, datetime_datetime):
1167 return format_ % {
1168 "year": value.year,
1169 "month": value.month,
1170 "day": value.day,
1171 "hour": value.hour,
1172 "minute": value.minute,
1173 "second": value.second,
1174 "microsecond": value.microsecond,
1175 }
1176 elif isinstance(value, datetime_date):
1177 return format_ % {
1178 "year": value.year,
1179 "month": value.month,
1180 "day": value.day,
1181 "hour": 0,
1182 "minute": 0,
1183 "second": 0,
1184 "microsecond": 0,
1185 }
1186 else:
1187 raise TypeError(
1188 "SQLite DateTime type only accepts Python "
1189 "datetime and date objects as input."
1190 )
1191
1192 return process
1193
1194 def result_processor(self, dialect, coltype):
1195 if self._reg:
1196 return processors.str_to_datetime_processor_factory(
1197 self._reg, datetime.datetime
1198 )
1199 else:
1200 return processors.str_to_datetime
1201
1202
1203class DATE(_DateTimeMixin, sqltypes.Date):
1204 r"""Represent a Python date object in SQLite using a string.
1205
1206 The default string storage format is::
1207
1208 "%(year)04d-%(month)02d-%(day)02d"
1209
1210 e.g.:
1211
1212 .. sourcecode:: text
1213
1214 2011-03-15
1215
1216 The incoming storage format is by default parsed using the
1217 Python ``date.fromisoformat()`` function.
1218
1219 .. versionchanged:: 2.0 ``date.fromisoformat()`` is used for default
1220 date string parsing.
1221
1222
1223 The storage format can be customized to some degree using the
1224 ``storage_format`` and ``regexp`` parameters, such as::
1225
1226 import re
1227 from sqlalchemy.dialects.sqlite import DATE
1228
1229 d = DATE(
1230 storage_format="%(month)02d/%(day)02d/%(year)04d",
1231 regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)"),
1232 )
1233
1234 :param storage_format: format string which will be applied to the
1235 dict with keys year, month, and day.
1236
1237 :param regexp: regular expression which will be applied to
1238 incoming result rows, replacing the use of ``date.fromisoformat()`` to
1239 parse incoming strings. If the regexp contains named groups, the resulting
1240 match dict is applied to the Python date() constructor as keyword
1241 arguments. Otherwise, if positional groups are used, the date()
1242 constructor is called with positional arguments via
1243 ``*map(int, match_obj.groups(0))``.
1244
1245 """
1246
1247 _storage_format = "%(year)04d-%(month)02d-%(day)02d"
1248
1249 def bind_processor(self, dialect):
1250 datetime_date = datetime.date
1251 format_ = self._storage_format
1252
1253 def process(value):
1254 if value is None:
1255 return None
1256 elif isinstance(value, datetime_date):
1257 return format_ % {
1258 "year": value.year,
1259 "month": value.month,
1260 "day": value.day,
1261 }
1262 else:
1263 raise TypeError(
1264 "SQLite Date type only accepts Python "
1265 "date objects as input."
1266 )
1267
1268 return process
1269
1270 def result_processor(self, dialect, coltype):
1271 if self._reg:
1272 return processors.str_to_datetime_processor_factory(
1273 self._reg, datetime.date
1274 )
1275 else:
1276 return processors.str_to_date
1277
1278
1279class TIME(_DateTimeMixin, sqltypes.Time):
1280 r"""Represent a Python time object in SQLite using a string.
1281
1282 The default string storage format is::
1283
1284 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1285
1286 e.g.:
1287
1288 .. sourcecode:: text
1289
1290 12:05:57.10558
1291
1292 The incoming storage format is by default parsed using the
1293 Python ``time.fromisoformat()`` function.
1294
1295 .. versionchanged:: 2.0 ``time.fromisoformat()`` is used for default
1296 time string parsing.
1297
1298 The storage format can be customized to some degree using the
1299 ``storage_format`` and ``regexp`` parameters, such as::
1300
1301 import re
1302 from sqlalchemy.dialects.sqlite import TIME
1303
1304 t = TIME(
1305 storage_format="%(hour)02d-%(minute)02d-%(second)02d-%(microsecond)06d",
1306 regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?"),
1307 )
1308
1309 :param truncate_microseconds: when ``True`` microseconds will be truncated
1310 from the time. Can't be specified together with ``storage_format``
1311 or ``regexp``.
1312
1313 :param storage_format: format string which will be applied to the dict
1314 with keys hour, minute, second, and microsecond.
1315
1316 :param regexp: regular expression which will be applied to incoming result
1317 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming
1318 strings. If the regexp contains named groups, the resulting match dict is
1319 applied to the Python time() constructor as keyword arguments. Otherwise,
1320 if positional groups are used, the time() constructor is called with
1321 positional arguments via ``*map(int, match_obj.groups(0))``.
1322
1323 """
1324
1325 _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1326
1327 def __init__(self, *args, **kwargs):
1328 truncate_microseconds = kwargs.pop("truncate_microseconds", False)
1329 super().__init__(*args, **kwargs)
1330 if truncate_microseconds:
1331 assert "storage_format" not in kwargs, (
1332 "You can specify only "
1333 "one of truncate_microseconds or storage_format."
1334 )
1335 assert "regexp" not in kwargs, (
1336 "You can specify only one of "
1337 "truncate_microseconds or regexp."
1338 )
1339 self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d"
1340
1341 def bind_processor(self, dialect):
1342 datetime_time = datetime.time
1343 format_ = self._storage_format
1344
1345 def process(value):
1346 if value is None:
1347 return None
1348 elif isinstance(value, datetime_time):
1349 return format_ % {
1350 "hour": value.hour,
1351 "minute": value.minute,
1352 "second": value.second,
1353 "microsecond": value.microsecond,
1354 }
1355 else:
1356 raise TypeError(
1357 "SQLite Time type only accepts Python "
1358 "time objects as input."
1359 )
1360
1361 return process
1362
1363 def result_processor(self, dialect, coltype):
1364 if self._reg:
1365 return processors.str_to_datetime_processor_factory(
1366 self._reg, datetime.time
1367 )
1368 else:
1369 return processors.str_to_time
1370
1371
1372colspecs = {
1373 sqltypes.Date: DATE,
1374 sqltypes.DateTime: DATETIME,
1375 sqltypes.JSON: _SQliteJson,
1376 sqltypes.JSON.JSONIndexType: JSONIndexType,
1377 sqltypes.JSON.JSONPathType: JSONPathType,
1378 sqltypes.Time: TIME,
1379}
1380
1381ischema_names = {
1382 "BIGINT": sqltypes.BIGINT,
1383 "BLOB": sqltypes.BLOB,
1384 "BOOL": sqltypes.BOOLEAN,
1385 "BOOLEAN": sqltypes.BOOLEAN,
1386 "CHAR": sqltypes.CHAR,
1387 "DATE": sqltypes.DATE,
1388 "DATE_CHAR": sqltypes.DATE,
1389 "DATETIME": sqltypes.DATETIME,
1390 "DATETIME_CHAR": sqltypes.DATETIME,
1391 "DOUBLE": sqltypes.DOUBLE,
1392 "DECIMAL": sqltypes.DECIMAL,
1393 "FLOAT": sqltypes.FLOAT,
1394 "INT": sqltypes.INTEGER,
1395 "INTEGER": sqltypes.INTEGER,
1396 "JSON": JSON,
1397 "NUMERIC": sqltypes.NUMERIC,
1398 "REAL": sqltypes.REAL,
1399 "SMALLINT": sqltypes.SMALLINT,
1400 "TEXT": sqltypes.TEXT,
1401 "TIME": sqltypes.TIME,
1402 "TIME_CHAR": sqltypes.TIME,
1403 "TIMESTAMP": sqltypes.TIMESTAMP,
1404 "VARCHAR": sqltypes.VARCHAR,
1405 "NVARCHAR": sqltypes.NVARCHAR,
1406 "NCHAR": sqltypes.NCHAR,
1407}
1408
1409
1410class SQLiteCompiler(compiler.SQLCompiler):
1411 extract_map = util.update_copy(
1412 compiler.SQLCompiler.extract_map,
1413 {
1414 "month": "%m",
1415 "day": "%d",
1416 "year": "%Y",
1417 "second": "%S",
1418 "hour": "%H",
1419 "doy": "%j",
1420 "minute": "%M",
1421 "epoch": "%s",
1422 "dow": "%w",
1423 "week": "%W",
1424 },
1425 )
1426
1427 def visit_truediv_binary(self, binary, operator, **kw):
1428 return (
1429 self.process(binary.left, **kw)
1430 + " / "
1431 + "(%s + 0.0)" % self.process(binary.right, **kw)
1432 )
1433
1434 def visit_now_func(self, fn, **kw):
1435 return "CURRENT_TIMESTAMP"
1436
1437 def visit_localtimestamp_func(self, func, **kw):
1438 return "DATETIME(CURRENT_TIMESTAMP, 'localtime')"
1439
1440 def visit_true(self, expr, **kw):
1441 return "1"
1442
1443 def visit_false(self, expr, **kw):
1444 return "0"
1445
1446 def visit_char_length_func(self, fn, **kw):
1447 return "length%s" % self.function_argspec(fn)
1448
1449 def visit_aggregate_strings_func(self, fn, **kw):
1450 return "group_concat%s" % self.function_argspec(fn)
1451
1452 def visit_cast(self, cast, **kwargs):
1453 if self.dialect.supports_cast:
1454 return super().visit_cast(cast, **kwargs)
1455 else:
1456 return self.process(cast.clause, **kwargs)
1457
1458 def visit_extract(self, extract, **kw):
1459 try:
1460 return "CAST(STRFTIME('%s', %s) AS INTEGER)" % (
1461 self.extract_map[extract.field],
1462 self.process(extract.expr, **kw),
1463 )
1464 except KeyError as err:
1465 raise exc.CompileError(
1466 "%s is not a valid extract argument." % extract.field
1467 ) from err
1468
1469 def returning_clause(
1470 self,
1471 stmt,
1472 returning_cols,
1473 *,
1474 populate_result_map,
1475 **kw,
1476 ):
1477 kw["include_table"] = False
1478 return super().returning_clause(
1479 stmt, returning_cols, populate_result_map=populate_result_map, **kw
1480 )
1481
1482 def limit_clause(self, select, **kw):
1483 text = ""
1484 if select._limit_clause is not None:
1485 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
1486 if select._offset_clause is not None:
1487 if select._limit_clause is None:
1488 text += "\n LIMIT " + self.process(sql.literal(-1))
1489 text += " OFFSET " + self.process(select._offset_clause, **kw)
1490 else:
1491 text += " OFFSET " + self.process(sql.literal(0), **kw)
1492 return text
1493
1494 def for_update_clause(self, select, **kw):
1495 # sqlite has no "FOR UPDATE" AFAICT
1496 return ""
1497
1498 def update_from_clause(
1499 self, update_stmt, from_table, extra_froms, from_hints, **kw
1500 ):
1501 kw["asfrom"] = True
1502 return "FROM " + ", ".join(
1503 t._compiler_dispatch(self, fromhints=from_hints, **kw)
1504 for t in extra_froms
1505 )
1506
1507 def visit_is_distinct_from_binary(self, binary, operator, **kw):
1508 return "%s IS NOT %s" % (
1509 self.process(binary.left),
1510 self.process(binary.right),
1511 )
1512
1513 def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
1514 return "%s IS %s" % (
1515 self.process(binary.left),
1516 self.process(binary.right),
1517 )
1518
1519 def visit_json_getitem_op_binary(self, binary, operator, **kw):
1520 if binary.type._type_affinity is sqltypes.JSON:
1521 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
1522 else:
1523 expr = "JSON_EXTRACT(%s, %s)"
1524
1525 return expr % (
1526 self.process(binary.left, **kw),
1527 self.process(binary.right, **kw),
1528 )
1529
1530 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
1531 if binary.type._type_affinity is sqltypes.JSON:
1532 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
1533 else:
1534 expr = "JSON_EXTRACT(%s, %s)"
1535
1536 return expr % (
1537 self.process(binary.left, **kw),
1538 self.process(binary.right, **kw),
1539 )
1540
1541 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
1542 # slightly old SQLite versions don't seem to be able to handle
1543 # the empty set impl
1544 return self.visit_empty_set_expr(type_)
1545
1546 def visit_empty_set_expr(self, element_types, **kw):
1547 return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % (
1548 ", ".join("1" for type_ in element_types or [INTEGER()]),
1549 ", ".join("1" for type_ in element_types or [INTEGER()]),
1550 )
1551
1552 def visit_regexp_match_op_binary(self, binary, operator, **kw):
1553 return self._generate_generic_binary(binary, " REGEXP ", **kw)
1554
1555 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
1556 return self._generate_generic_binary(binary, " NOT REGEXP ", **kw)
1557
1558 def _on_conflict_target(self, clause, **kw):
1559 if clause.inferred_target_elements is not None:
1560 target_text = "(%s)" % ", ".join(
1561 (
1562 self.preparer.quote(c)
1563 if isinstance(c, str)
1564 else self.process(c, include_table=False, use_schema=False)
1565 )
1566 for c in clause.inferred_target_elements
1567 )
1568 if clause.inferred_target_whereclause is not None:
1569 target_text += " WHERE %s" % self.process(
1570 clause.inferred_target_whereclause,
1571 include_table=False,
1572 use_schema=False,
1573 literal_execute=True,
1574 )
1575
1576 else:
1577 target_text = ""
1578
1579 return target_text
1580
1581 def visit_on_conflict_do_nothing(self, on_conflict, **kw):
1582 target_text = self._on_conflict_target(on_conflict, **kw)
1583
1584 if target_text:
1585 return "ON CONFLICT %s DO NOTHING" % target_text
1586 else:
1587 return "ON CONFLICT DO NOTHING"
1588
1589 def visit_on_conflict_do_update(self, on_conflict, **kw):
1590 clause = on_conflict
1591
1592 target_text = self._on_conflict_target(on_conflict, **kw)
1593
1594 action_set_ops = []
1595
1596 set_parameters = dict(clause.update_values_to_set)
1597 # create a list of column assignment clauses as tuples
1598
1599 insert_statement = self.stack[-1]["selectable"]
1600 cols = insert_statement.table.c
1601 for c in cols:
1602 col_key = c.key
1603
1604 if col_key in set_parameters:
1605 value = set_parameters.pop(col_key)
1606 elif c in set_parameters:
1607 value = set_parameters.pop(c)
1608 else:
1609 continue
1610
1611 if coercions._is_literal(value):
1612 value = elements.BindParameter(None, value, type_=c.type)
1613
1614 else:
1615 if (
1616 isinstance(value, elements.BindParameter)
1617 and value.type._isnull
1618 ):
1619 value = value._clone()
1620 value.type = c.type
1621 value_text = self.process(value.self_group(), use_schema=False)
1622
1623 key_text = self.preparer.quote(c.name)
1624 action_set_ops.append("%s = %s" % (key_text, value_text))
1625
1626 # check for names that don't match columns
1627 if set_parameters:
1628 util.warn(
1629 "Additional column names not matching "
1630 "any column keys in table '%s': %s"
1631 % (
1632 self.current_executable.table.name,
1633 (", ".join("'%s'" % c for c in set_parameters)),
1634 )
1635 )
1636 for k, v in set_parameters.items():
1637 key_text = (
1638 self.preparer.quote(k)
1639 if isinstance(k, str)
1640 else self.process(k, use_schema=False)
1641 )
1642 value_text = self.process(
1643 coercions.expect(roles.ExpressionElementRole, v),
1644 use_schema=False,
1645 )
1646 action_set_ops.append("%s = %s" % (key_text, value_text))
1647
1648 action_text = ", ".join(action_set_ops)
1649 if clause.update_whereclause is not None:
1650 action_text += " WHERE %s" % self.process(
1651 clause.update_whereclause, include_table=True, use_schema=False
1652 )
1653
1654 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
1655
1656 def visit_bitwise_xor_op_binary(self, binary, operator, **kw):
1657 # sqlite has no xor. Use "a XOR b" = "(a | b) - (a & b)".
1658 kw["eager_grouping"] = True
1659 or_ = self._generate_generic_binary(binary, " | ", **kw)
1660 and_ = self._generate_generic_binary(binary, " & ", **kw)
1661 return f"({or_} - {and_})"
1662
1663
1664class SQLiteDDLCompiler(compiler.DDLCompiler):
1665 def get_column_specification(self, column, **kwargs):
1666 coltype = self.dialect.type_compiler_instance.process(
1667 column.type, type_expression=column
1668 )
1669 colspec = self.preparer.format_column(column) + " " + coltype
1670 default = self.get_column_default_string(column)
1671 if default is not None:
1672
1673 if not re.match(r"""^\s*[\'\"\(]""", default) and re.match(
1674 r".*\W.*", default
1675 ):
1676 colspec += f" DEFAULT ({default})"
1677 else:
1678 colspec += f" DEFAULT {default}"
1679
1680 if not column.nullable:
1681 colspec += " NOT NULL"
1682
1683 on_conflict_clause = column.dialect_options["sqlite"][
1684 "on_conflict_not_null"
1685 ]
1686 if on_conflict_clause is not None:
1687 colspec += " ON CONFLICT " + on_conflict_clause
1688
1689 if column.primary_key:
1690 if (
1691 column.autoincrement is True
1692 and len(column.table.primary_key.columns) != 1
1693 ):
1694 raise exc.CompileError(
1695 "SQLite does not support autoincrement for "
1696 "composite primary keys"
1697 )
1698
1699 if (
1700 column.table.dialect_options["sqlite"]["autoincrement"]
1701 and len(column.table.primary_key.columns) == 1
1702 and issubclass(column.type._type_affinity, sqltypes.Integer)
1703 and not column.foreign_keys
1704 ):
1705 colspec += " PRIMARY KEY"
1706
1707 on_conflict_clause = column.dialect_options["sqlite"][
1708 "on_conflict_primary_key"
1709 ]
1710 if on_conflict_clause is not None:
1711 colspec += " ON CONFLICT " + on_conflict_clause
1712
1713 colspec += " AUTOINCREMENT"
1714
1715 if column.computed is not None:
1716 colspec += " " + self.process(column.computed)
1717
1718 return colspec
1719
1720 def visit_primary_key_constraint(self, constraint, **kw):
1721 # for columns with sqlite_autoincrement=True,
1722 # the PRIMARY KEY constraint can only be inline
1723 # with the column itself.
1724 if len(constraint.columns) == 1:
1725 c = list(constraint)[0]
1726 if (
1727 c.primary_key
1728 and c.table.dialect_options["sqlite"]["autoincrement"]
1729 and issubclass(c.type._type_affinity, sqltypes.Integer)
1730 and not c.foreign_keys
1731 ):
1732 return None
1733
1734 text = super().visit_primary_key_constraint(constraint)
1735
1736 on_conflict_clause = constraint.dialect_options["sqlite"][
1737 "on_conflict"
1738 ]
1739 if on_conflict_clause is None and len(constraint.columns) == 1:
1740 on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][
1741 "on_conflict_primary_key"
1742 ]
1743
1744 if on_conflict_clause is not None:
1745 text += " ON CONFLICT " + on_conflict_clause
1746
1747 return text
1748
1749 def visit_unique_constraint(self, constraint, **kw):
1750 text = super().visit_unique_constraint(constraint)
1751
1752 on_conflict_clause = constraint.dialect_options["sqlite"][
1753 "on_conflict"
1754 ]
1755 if on_conflict_clause is None and len(constraint.columns) == 1:
1756 col1 = list(constraint)[0]
1757 if isinstance(col1, schema.SchemaItem):
1758 on_conflict_clause = list(constraint)[0].dialect_options[
1759 "sqlite"
1760 ]["on_conflict_unique"]
1761
1762 if on_conflict_clause is not None:
1763 text += " ON CONFLICT " + on_conflict_clause
1764
1765 return text
1766
1767 def visit_check_constraint(self, constraint, **kw):
1768 text = super().visit_check_constraint(constraint)
1769
1770 on_conflict_clause = constraint.dialect_options["sqlite"][
1771 "on_conflict"
1772 ]
1773
1774 if on_conflict_clause is not None:
1775 text += " ON CONFLICT " + on_conflict_clause
1776
1777 return text
1778
1779 def visit_column_check_constraint(self, constraint, **kw):
1780 text = super().visit_column_check_constraint(constraint)
1781
1782 if constraint.dialect_options["sqlite"]["on_conflict"] is not None:
1783 raise exc.CompileError(
1784 "SQLite does not support on conflict clause for "
1785 "column check constraint"
1786 )
1787
1788 return text
1789
1790 def visit_foreign_key_constraint(self, constraint, **kw):
1791 local_table = constraint.elements[0].parent.table
1792 remote_table = constraint.elements[0].column.table
1793
1794 if local_table.schema != remote_table.schema:
1795 return None
1796 else:
1797 return super().visit_foreign_key_constraint(constraint)
1798
1799 def define_constraint_remote_table(self, constraint, table, preparer):
1800 """Format the remote table clause of a CREATE CONSTRAINT clause."""
1801
1802 return preparer.format_table(table, use_schema=False)
1803
1804 def visit_create_index(
1805 self, create, include_schema=False, include_table_schema=True, **kw
1806 ):
1807 index = create.element
1808 self._verify_index_table(index)
1809 preparer = self.preparer
1810 text = "CREATE "
1811 if index.unique:
1812 text += "UNIQUE "
1813
1814 text += "INDEX "
1815
1816 if create.if_not_exists:
1817 text += "IF NOT EXISTS "
1818
1819 text += "%s ON %s (%s)" % (
1820 self._prepared_index_name(index, include_schema=True),
1821 preparer.format_table(index.table, use_schema=False),
1822 ", ".join(
1823 self.sql_compiler.process(
1824 expr, include_table=False, literal_binds=True
1825 )
1826 for expr in index.expressions
1827 ),
1828 )
1829
1830 whereclause = index.dialect_options["sqlite"]["where"]
1831 if whereclause is not None:
1832 where_compiled = self.sql_compiler.process(
1833 whereclause, include_table=False, literal_binds=True
1834 )
1835 text += " WHERE " + where_compiled
1836
1837 return text
1838
1839 def post_create_table(self, table):
1840 table_options = []
1841
1842 if not table.dialect_options["sqlite"]["with_rowid"]:
1843 table_options.append("WITHOUT ROWID")
1844
1845 if table.dialect_options["sqlite"]["strict"]:
1846 table_options.append("STRICT")
1847
1848 if table_options:
1849 return "\n " + ",\n ".join(table_options)
1850 else:
1851 return ""
1852
1853
1854class SQLiteTypeCompiler(compiler.GenericTypeCompiler):
1855 def visit_large_binary(self, type_, **kw):
1856 return self.visit_BLOB(type_)
1857
1858 def visit_DATETIME(self, type_, **kw):
1859 if (
1860 not isinstance(type_, _DateTimeMixin)
1861 or type_.format_is_text_affinity
1862 ):
1863 return super().visit_DATETIME(type_)
1864 else:
1865 return "DATETIME_CHAR"
1866
1867 def visit_DATE(self, type_, **kw):
1868 if (
1869 not isinstance(type_, _DateTimeMixin)
1870 or type_.format_is_text_affinity
1871 ):
1872 return super().visit_DATE(type_)
1873 else:
1874 return "DATE_CHAR"
1875
1876 def visit_TIME(self, type_, **kw):
1877 if (
1878 not isinstance(type_, _DateTimeMixin)
1879 or type_.format_is_text_affinity
1880 ):
1881 return super().visit_TIME(type_)
1882 else:
1883 return "TIME_CHAR"
1884
1885 def visit_JSON(self, type_, **kw):
1886 # note this name provides NUMERIC affinity, not TEXT.
1887 # should not be an issue unless the JSON value consists of a single
1888 # numeric value. JSONTEXT can be used if this case is required.
1889 return "JSON"
1890
1891
1892class SQLiteIdentifierPreparer(compiler.IdentifierPreparer):
1893 reserved_words = {
1894 "add",
1895 "after",
1896 "all",
1897 "alter",
1898 "analyze",
1899 "and",
1900 "as",
1901 "asc",
1902 "attach",
1903 "autoincrement",
1904 "before",
1905 "begin",
1906 "between",
1907 "by",
1908 "cascade",
1909 "case",
1910 "cast",
1911 "check",
1912 "collate",
1913 "column",
1914 "commit",
1915 "conflict",
1916 "constraint",
1917 "create",
1918 "cross",
1919 "current_date",
1920 "current_time",
1921 "current_timestamp",
1922 "database",
1923 "default",
1924 "deferrable",
1925 "deferred",
1926 "delete",
1927 "desc",
1928 "detach",
1929 "distinct",
1930 "drop",
1931 "each",
1932 "else",
1933 "end",
1934 "escape",
1935 "except",
1936 "exclusive",
1937 "exists",
1938 "explain",
1939 "false",
1940 "fail",
1941 "for",
1942 "foreign",
1943 "from",
1944 "full",
1945 "glob",
1946 "group",
1947 "having",
1948 "if",
1949 "ignore",
1950 "immediate",
1951 "in",
1952 "index",
1953 "indexed",
1954 "initially",
1955 "inner",
1956 "insert",
1957 "instead",
1958 "intersect",
1959 "into",
1960 "is",
1961 "isnull",
1962 "join",
1963 "key",
1964 "left",
1965 "like",
1966 "limit",
1967 "match",
1968 "natural",
1969 "not",
1970 "notnull",
1971 "null",
1972 "of",
1973 "offset",
1974 "on",
1975 "or",
1976 "order",
1977 "outer",
1978 "plan",
1979 "pragma",
1980 "primary",
1981 "query",
1982 "raise",
1983 "references",
1984 "reindex",
1985 "rename",
1986 "replace",
1987 "restrict",
1988 "right",
1989 "rollback",
1990 "row",
1991 "select",
1992 "set",
1993 "table",
1994 "temp",
1995 "temporary",
1996 "then",
1997 "to",
1998 "transaction",
1999 "trigger",
2000 "true",
2001 "union",
2002 "unique",
2003 "update",
2004 "using",
2005 "vacuum",
2006 "values",
2007 "view",
2008 "virtual",
2009 "when",
2010 "where",
2011 }
2012
2013
2014class SQLiteExecutionContext(default.DefaultExecutionContext):
2015 @util.memoized_property
2016 def _preserve_raw_colnames(self):
2017 return (
2018 not self.dialect._broken_dotted_colnames
2019 or self.execution_options.get("sqlite_raw_colnames", False)
2020 )
2021
2022 def _translate_colname(self, colname):
2023 # TODO: detect SQLite version 3.10.0 or greater;
2024 # see [ticket:3633]
2025
2026 # adjust for dotted column names. SQLite
2027 # in the case of UNION may store col names as
2028 # "tablename.colname", or if using an attached database,
2029 # "database.tablename.colname", in cursor.description
2030 if not self._preserve_raw_colnames and "." in colname:
2031 return colname.split(".")[-1], colname
2032 else:
2033 return colname, None
2034
2035
2036class SQLiteDialect(default.DefaultDialect):
2037 name = "sqlite"
2038 supports_alter = False
2039
2040 # SQlite supports "DEFAULT VALUES" but *does not* support
2041 # "VALUES (DEFAULT)"
2042 supports_default_values = True
2043 supports_default_metavalue = False
2044
2045 # sqlite issue:
2046 # https://github.com/python/cpython/issues/93421
2047 # note this parameter is no longer used by the ORM or default dialect
2048 # see #9414
2049 supports_sane_rowcount_returning = False
2050
2051 supports_empty_insert = False
2052 supports_cast = True
2053 supports_multivalues_insert = True
2054 use_insertmanyvalues = True
2055 tuple_in_values = True
2056 supports_statement_cache = True
2057 insert_null_pk_still_autoincrements = True
2058 insert_returning = True
2059 update_returning = True
2060 update_returning_multifrom = True
2061 delete_returning = True
2062 update_returning_multifrom = True
2063
2064 supports_default_metavalue = True
2065 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
2066
2067 default_metavalue_token = "NULL"
2068 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
2069 parenthesis."""
2070
2071 default_paramstyle = "qmark"
2072 execution_ctx_cls = SQLiteExecutionContext
2073 statement_compiler = SQLiteCompiler
2074 ddl_compiler = SQLiteDDLCompiler
2075 type_compiler_cls = SQLiteTypeCompiler
2076 preparer = SQLiteIdentifierPreparer
2077 ischema_names = ischema_names
2078 colspecs = colspecs
2079
2080 construct_arguments = [
2081 (
2082 sa_schema.Table,
2083 {
2084 "autoincrement": False,
2085 "with_rowid": True,
2086 "strict": False,
2087 },
2088 ),
2089 (sa_schema.Index, {"where": None}),
2090 (
2091 sa_schema.Column,
2092 {
2093 "on_conflict_primary_key": None,
2094 "on_conflict_not_null": None,
2095 "on_conflict_unique": None,
2096 },
2097 ),
2098 (sa_schema.Constraint, {"on_conflict": None}),
2099 ]
2100
2101 _broken_fk_pragma_quotes = False
2102 _broken_dotted_colnames = False
2103
2104 @util.deprecated_params(
2105 _json_serializer=(
2106 "1.3.7",
2107 "The _json_serializer argument to the SQLite dialect has "
2108 "been renamed to the correct name of json_serializer. The old "
2109 "argument name will be removed in a future release.",
2110 ),
2111 _json_deserializer=(
2112 "1.3.7",
2113 "The _json_deserializer argument to the SQLite dialect has "
2114 "been renamed to the correct name of json_deserializer. The old "
2115 "argument name will be removed in a future release.",
2116 ),
2117 )
2118 def __init__(
2119 self,
2120 native_datetime=False,
2121 json_serializer=None,
2122 json_deserializer=None,
2123 _json_serializer=None,
2124 _json_deserializer=None,
2125 **kwargs,
2126 ):
2127 default.DefaultDialect.__init__(self, **kwargs)
2128
2129 if _json_serializer:
2130 json_serializer = _json_serializer
2131 if _json_deserializer:
2132 json_deserializer = _json_deserializer
2133 self._json_serializer = json_serializer
2134 self._json_deserializer = json_deserializer
2135
2136 # this flag used by pysqlite dialect, and perhaps others in the
2137 # future, to indicate the driver is handling date/timestamp
2138 # conversions (and perhaps datetime/time as well on some hypothetical
2139 # driver ?)
2140 self.native_datetime = native_datetime
2141
2142 if self.dbapi is not None:
2143 if self.dbapi.sqlite_version_info < (3, 7, 16):
2144 util.warn(
2145 "SQLite version %s is older than 3.7.16, and will not "
2146 "support right nested joins, as are sometimes used in "
2147 "more complex ORM scenarios. SQLAlchemy 1.4 and above "
2148 "no longer tries to rewrite these joins."
2149 % (self.dbapi.sqlite_version_info,)
2150 )
2151
2152 # NOTE: python 3.7 on fedora for me has SQLite 3.34.1. These
2153 # version checks are getting very stale.
2154 self._broken_dotted_colnames = self.dbapi.sqlite_version_info < (
2155 3,
2156 10,
2157 0,
2158 )
2159 self.supports_default_values = self.dbapi.sqlite_version_info >= (
2160 3,
2161 3,
2162 8,
2163 )
2164 self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3)
2165 self.supports_multivalues_insert = (
2166 # https://www.sqlite.org/releaselog/3_7_11.html
2167 self.dbapi.sqlite_version_info
2168 >= (3, 7, 11)
2169 )
2170 # see https://www.sqlalchemy.org/trac/ticket/2568
2171 # as well as https://www.sqlite.org/src/info/600482d161
2172 self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < (
2173 3,
2174 6,
2175 14,
2176 )
2177
2178 if self.dbapi.sqlite_version_info < (3, 35) or util.pypy:
2179 self.update_returning = self.delete_returning = (
2180 self.insert_returning
2181 ) = False
2182
2183 if self.dbapi.sqlite_version_info < (3, 32, 0):
2184 # https://www.sqlite.org/limits.html
2185 self.insertmanyvalues_max_parameters = 999
2186
2187 _isolation_lookup = util.immutabledict(
2188 {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0}
2189 )
2190
2191 def get_isolation_level_values(self, dbapi_connection):
2192 return list(self._isolation_lookup)
2193
2194 def set_isolation_level(self, dbapi_connection, level):
2195 isolation_level = self._isolation_lookup[level]
2196
2197 cursor = dbapi_connection.cursor()
2198 cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}")
2199 cursor.close()
2200
2201 def get_isolation_level(self, dbapi_connection):
2202 cursor = dbapi_connection.cursor()
2203 cursor.execute("PRAGMA read_uncommitted")
2204 res = cursor.fetchone()
2205 if res:
2206 value = res[0]
2207 else:
2208 # https://www.sqlite.org/changes.html#version_3_3_3
2209 # "Optional READ UNCOMMITTED isolation (instead of the
2210 # default isolation level of SERIALIZABLE) and
2211 # table level locking when database connections
2212 # share a common cache.""
2213 # pre-SQLite 3.3.0 default to 0
2214 value = 0
2215 cursor.close()
2216 if value == 0:
2217 return "SERIALIZABLE"
2218 elif value == 1:
2219 return "READ UNCOMMITTED"
2220 else:
2221 assert False, "Unknown isolation level %s" % value
2222
2223 @reflection.cache
2224 def get_schema_names(self, connection, **kw):
2225 s = "PRAGMA database_list"
2226 dl = connection.exec_driver_sql(s)
2227
2228 return [db[1] for db in dl if db[1] != "temp"]
2229
2230 def _format_schema(self, schema, table_name):
2231 if schema is not None:
2232 qschema = self.identifier_preparer.quote_identifier(schema)
2233 name = f"{qschema}.{table_name}"
2234 else:
2235 name = table_name
2236 return name
2237
2238 def _sqlite_main_query(
2239 self,
2240 table: str,
2241 type_: str,
2242 schema: Optional[str],
2243 sqlite_include_internal: bool,
2244 ):
2245 main = self._format_schema(schema, table)
2246 if not sqlite_include_internal:
2247 filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'"
2248 else:
2249 filter_table = ""
2250 query = (
2251 f"SELECT name FROM {main} "
2252 f"WHERE type='{type_}'{filter_table} "
2253 "ORDER BY name"
2254 )
2255 return query
2256
2257 @reflection.cache
2258 def get_table_names(
2259 self, connection, schema=None, sqlite_include_internal=False, **kw
2260 ):
2261 query = self._sqlite_main_query(
2262 "sqlite_master", "table", schema, sqlite_include_internal
2263 )
2264 names = connection.exec_driver_sql(query).scalars().all()
2265 return names
2266
2267 @reflection.cache
2268 def get_temp_table_names(
2269 self, connection, sqlite_include_internal=False, **kw
2270 ):
2271 query = self._sqlite_main_query(
2272 "sqlite_temp_master", "table", None, sqlite_include_internal
2273 )
2274 names = connection.exec_driver_sql(query).scalars().all()
2275 return names
2276
2277 @reflection.cache
2278 def get_temp_view_names(
2279 self, connection, sqlite_include_internal=False, **kw
2280 ):
2281 query = self._sqlite_main_query(
2282 "sqlite_temp_master", "view", None, sqlite_include_internal
2283 )
2284 names = connection.exec_driver_sql(query).scalars().all()
2285 return names
2286
2287 @reflection.cache
2288 def has_table(self, connection, table_name, schema=None, **kw):
2289 self._ensure_has_table_connection(connection)
2290
2291 if schema is not None and schema not in self.get_schema_names(
2292 connection, **kw
2293 ):
2294 return False
2295
2296 info = self._get_table_pragma(
2297 connection, "table_info", table_name, schema=schema
2298 )
2299 return bool(info)
2300
2301 def _get_default_schema_name(self, connection):
2302 return "main"
2303
2304 @reflection.cache
2305 def get_view_names(
2306 self, connection, schema=None, sqlite_include_internal=False, **kw
2307 ):
2308 query = self._sqlite_main_query(
2309 "sqlite_master", "view", schema, sqlite_include_internal
2310 )
2311 names = connection.exec_driver_sql(query).scalars().all()
2312 return names
2313
2314 @reflection.cache
2315 def get_view_definition(self, connection, view_name, schema=None, **kw):
2316 if schema is not None:
2317 qschema = self.identifier_preparer.quote_identifier(schema)
2318 master = f"{qschema}.sqlite_master"
2319 s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % (
2320 master,
2321 )
2322 rs = connection.exec_driver_sql(s, (view_name,))
2323 else:
2324 try:
2325 s = (
2326 "SELECT sql FROM "
2327 " (SELECT * FROM sqlite_master UNION ALL "
2328 " SELECT * FROM sqlite_temp_master) "
2329 "WHERE name = ? "
2330 "AND type='view'"
2331 )
2332 rs = connection.exec_driver_sql(s, (view_name,))
2333 except exc.DBAPIError:
2334 s = (
2335 "SELECT sql FROM sqlite_master WHERE name = ? "
2336 "AND type='view'"
2337 )
2338 rs = connection.exec_driver_sql(s, (view_name,))
2339
2340 result = rs.fetchall()
2341 if result:
2342 return result[0].sql
2343 else:
2344 raise exc.NoSuchTableError(
2345 f"{schema}.{view_name}" if schema else view_name
2346 )
2347
2348 @reflection.cache
2349 def get_columns(self, connection, table_name, schema=None, **kw):
2350 pragma = "table_info"
2351 # computed columns are threaded as hidden, they require table_xinfo
2352 if self.server_version_info >= (3, 31):
2353 pragma = "table_xinfo"
2354 info = self._get_table_pragma(
2355 connection, pragma, table_name, schema=schema
2356 )
2357 columns = []
2358 tablesql = None
2359 for row in info:
2360 name = row[1]
2361 type_ = row[2].upper()
2362 nullable = not row[3]
2363 default = row[4]
2364 primary_key = row[5]
2365 hidden = row[6] if pragma == "table_xinfo" else 0
2366
2367 # hidden has value 0 for normal columns, 1 for hidden columns,
2368 # 2 for computed virtual columns and 3 for computed stored columns
2369 # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b
2370 if hidden == 1:
2371 continue
2372
2373 generated = bool(hidden)
2374 persisted = hidden == 3
2375
2376 if tablesql is None and generated:
2377 tablesql = self._get_table_sql(
2378 connection, table_name, schema, **kw
2379 )
2380 # remove create table
2381 match = re.match(
2382 r"create table .*?\((.*)\)$",
2383 tablesql.strip(),
2384 re.DOTALL | re.IGNORECASE,
2385 )
2386 assert match, f"create table not found in {tablesql}"
2387 tablesql = match.group(1).strip()
2388
2389 columns.append(
2390 self._get_column_info(
2391 name,
2392 type_,
2393 nullable,
2394 default,
2395 primary_key,
2396 generated,
2397 persisted,
2398 tablesql,
2399 )
2400 )
2401 if columns:
2402 return columns
2403 elif not self.has_table(connection, table_name, schema):
2404 raise exc.NoSuchTableError(
2405 f"{schema}.{table_name}" if schema else table_name
2406 )
2407 else:
2408 return ReflectionDefaults.columns()
2409
2410 def _get_column_info(
2411 self,
2412 name,
2413 type_,
2414 nullable,
2415 default,
2416 primary_key,
2417 generated,
2418 persisted,
2419 tablesql,
2420 ):
2421 if generated:
2422 # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)"
2423 # somehow is "INTEGER GENERATED ALWAYS"
2424 type_ = re.sub("generated", "", type_, flags=re.IGNORECASE)
2425 type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip()
2426
2427 coltype = self._resolve_type_affinity(type_)
2428
2429 if default is not None:
2430 default = str(default)
2431
2432 colspec = {
2433 "name": name,
2434 "type": coltype,
2435 "nullable": nullable,
2436 "default": default,
2437 "primary_key": primary_key,
2438 }
2439 if generated:
2440 sqltext = ""
2441 if tablesql:
2442 pattern = (
2443 r"[^,]*\s+GENERATED\s+ALWAYS\s+AS"
2444 r"\s+\((.*)\)\s*(?:virtual|stored)?"
2445 )
2446 match = re.search(
2447 re.escape(name) + pattern, tablesql, re.IGNORECASE
2448 )
2449 if match:
2450 sqltext = match.group(1)
2451 colspec["computed"] = {"sqltext": sqltext, "persisted": persisted}
2452 return colspec
2453
2454 def _resolve_type_affinity(self, type_):
2455 """Return a data type from a reflected column, using affinity rules.
2456
2457 SQLite's goal for universal compatibility introduces some complexity
2458 during reflection, as a column's defined type might not actually be a
2459 type that SQLite understands - or indeed, my not be defined *at all*.
2460 Internally, SQLite handles this with a 'data type affinity' for each
2461 column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER',
2462 'REAL', or 'NONE' (raw bits). The algorithm that determines this is
2463 listed in https://www.sqlite.org/datatype3.html section 2.1.
2464
2465 This method allows SQLAlchemy to support that algorithm, while still
2466 providing access to smarter reflection utilities by recognizing
2467 column definitions that SQLite only supports through affinity (like
2468 DATE and DOUBLE).
2469
2470 """
2471 match = re.match(r"([\w ]+)(\(.*?\))?", type_)
2472 if match:
2473 coltype = match.group(1)
2474 args = match.group(2)
2475 else:
2476 coltype = ""
2477 args = ""
2478
2479 if coltype in self.ischema_names:
2480 coltype = self.ischema_names[coltype]
2481 elif "INT" in coltype:
2482 coltype = sqltypes.INTEGER
2483 elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype:
2484 coltype = sqltypes.TEXT
2485 elif "BLOB" in coltype or not coltype:
2486 coltype = sqltypes.NullType
2487 elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype:
2488 coltype = sqltypes.REAL
2489 else:
2490 coltype = sqltypes.NUMERIC
2491
2492 if args is not None:
2493 args = re.findall(r"(\d+)", args)
2494 try:
2495 coltype = coltype(*[int(a) for a in args])
2496 except TypeError:
2497 util.warn(
2498 "Could not instantiate type %s with "
2499 "reflected arguments %s; using no arguments."
2500 % (coltype, args)
2501 )
2502 coltype = coltype()
2503 else:
2504 coltype = coltype()
2505
2506 return coltype
2507
2508 @reflection.cache
2509 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
2510 constraint_name = None
2511 table_data = self._get_table_sql(connection, table_name, schema=schema)
2512 if table_data:
2513 PK_PATTERN = r"CONSTRAINT (\w+) PRIMARY KEY"
2514 result = re.search(PK_PATTERN, table_data, re.I)
2515 constraint_name = result.group(1) if result else None
2516
2517 cols = self.get_columns(connection, table_name, schema, **kw)
2518 # consider only pk columns. This also avoids sorting the cached
2519 # value returned by get_columns
2520 cols = [col for col in cols if col.get("primary_key", 0) > 0]
2521 cols.sort(key=lambda col: col.get("primary_key"))
2522 pkeys = [col["name"] for col in cols]
2523
2524 if pkeys:
2525 return {"constrained_columns": pkeys, "name": constraint_name}
2526 else:
2527 return ReflectionDefaults.pk_constraint()
2528
2529 @reflection.cache
2530 def get_foreign_keys(self, connection, table_name, schema=None, **kw):
2531 # sqlite makes this *extremely difficult*.
2532 # First, use the pragma to get the actual FKs.
2533 pragma_fks = self._get_table_pragma(
2534 connection, "foreign_key_list", table_name, schema=schema
2535 )
2536
2537 fks = {}
2538
2539 for row in pragma_fks:
2540 (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4])
2541
2542 if not rcol:
2543 # no referred column, which means it was not named in the
2544 # original DDL. The referred columns of the foreign key
2545 # constraint are therefore the primary key of the referred
2546 # table.
2547 try:
2548 referred_pk = self.get_pk_constraint(
2549 connection, rtbl, schema=schema, **kw
2550 )
2551 referred_columns = referred_pk["constrained_columns"]
2552 except exc.NoSuchTableError:
2553 # ignore not existing parents
2554 referred_columns = []
2555 else:
2556 # note we use this list only if this is the first column
2557 # in the constraint. for subsequent columns we ignore the
2558 # list and append "rcol" if present.
2559 referred_columns = []
2560
2561 if self._broken_fk_pragma_quotes:
2562 rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl)
2563
2564 if numerical_id in fks:
2565 fk = fks[numerical_id]
2566 else:
2567 fk = fks[numerical_id] = {
2568 "name": None,
2569 "constrained_columns": [],
2570 "referred_schema": schema,
2571 "referred_table": rtbl,
2572 "referred_columns": referred_columns,
2573 "options": {},
2574 }
2575 fks[numerical_id] = fk
2576
2577 fk["constrained_columns"].append(lcol)
2578
2579 if rcol:
2580 fk["referred_columns"].append(rcol)
2581
2582 def fk_sig(constrained_columns, referred_table, referred_columns):
2583 return (
2584 tuple(constrained_columns)
2585 + (referred_table,)
2586 + tuple(referred_columns)
2587 )
2588
2589 # then, parse the actual SQL and attempt to find DDL that matches
2590 # the names as well. SQLite saves the DDL in whatever format
2591 # it was typed in as, so need to be liberal here.
2592
2593 keys_by_signature = {
2594 fk_sig(
2595 fk["constrained_columns"],
2596 fk["referred_table"],
2597 fk["referred_columns"],
2598 ): fk
2599 for fk in fks.values()
2600 }
2601
2602 table_data = self._get_table_sql(connection, table_name, schema=schema)
2603
2604 def parse_fks():
2605 if table_data is None:
2606 # system tables, etc.
2607 return
2608
2609 # note that we already have the FKs from PRAGMA above. This whole
2610 # regexp thing is trying to locate additional detail about the
2611 # FKs, namely the name of the constraint and other options.
2612 # so parsing the columns is really about matching it up to what
2613 # we already have.
2614 FK_PATTERN = (
2615 r"(?:CONSTRAINT (\w+) +)?"
2616 r"FOREIGN KEY *\( *(.+?) *\) +"
2617 r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *' # noqa: E501
2618 r"((?:ON (?:DELETE|UPDATE) "
2619 r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)"
2620 r"((?:NOT +)?DEFERRABLE)?"
2621 r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?"
2622 )
2623 for match in re.finditer(FK_PATTERN, table_data, re.I):
2624 (
2625 constraint_name,
2626 constrained_columns,
2627 referred_quoted_name,
2628 referred_name,
2629 referred_columns,
2630 onupdatedelete,
2631 deferrable,
2632 initially,
2633 ) = match.group(1, 2, 3, 4, 5, 6, 7, 8)
2634 constrained_columns = list(
2635 self._find_cols_in_sig(constrained_columns)
2636 )
2637 if not referred_columns:
2638 referred_columns = constrained_columns
2639 else:
2640 referred_columns = list(
2641 self._find_cols_in_sig(referred_columns)
2642 )
2643 referred_name = referred_quoted_name or referred_name
2644 options = {}
2645
2646 for token in re.split(r" *\bON\b *", onupdatedelete.upper()):
2647 if token.startswith("DELETE"):
2648 ondelete = token[6:].strip()
2649 if ondelete and ondelete != "NO ACTION":
2650 options["ondelete"] = ondelete
2651 elif token.startswith("UPDATE"):
2652 onupdate = token[6:].strip()
2653 if onupdate and onupdate != "NO ACTION":
2654 options["onupdate"] = onupdate
2655
2656 if deferrable:
2657 options["deferrable"] = "NOT" not in deferrable.upper()
2658 if initially:
2659 options["initially"] = initially.upper()
2660
2661 yield (
2662 constraint_name,
2663 constrained_columns,
2664 referred_name,
2665 referred_columns,
2666 options,
2667 )
2668
2669 fkeys = []
2670
2671 for (
2672 constraint_name,
2673 constrained_columns,
2674 referred_name,
2675 referred_columns,
2676 options,
2677 ) in parse_fks():
2678 sig = fk_sig(constrained_columns, referred_name, referred_columns)
2679 if sig not in keys_by_signature:
2680 util.warn(
2681 "WARNING: SQL-parsed foreign key constraint "
2682 "'%s' could not be located in PRAGMA "
2683 "foreign_keys for table %s" % (sig, table_name)
2684 )
2685 continue
2686 key = keys_by_signature.pop(sig)
2687 key["name"] = constraint_name
2688 key["options"] = options
2689 fkeys.append(key)
2690 # assume the remainders are the unnamed, inline constraints, just
2691 # use them as is as it's extremely difficult to parse inline
2692 # constraints
2693 fkeys.extend(keys_by_signature.values())
2694 if fkeys:
2695 return fkeys
2696 else:
2697 return ReflectionDefaults.foreign_keys()
2698
2699 def _find_cols_in_sig(self, sig):
2700 for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I):
2701 yield match.group(1) or match.group(2)
2702
2703 @reflection.cache
2704 def get_unique_constraints(
2705 self, connection, table_name, schema=None, **kw
2706 ):
2707 auto_index_by_sig = {}
2708 for idx in self.get_indexes(
2709 connection,
2710 table_name,
2711 schema=schema,
2712 include_auto_indexes=True,
2713 **kw,
2714 ):
2715 if not idx["name"].startswith("sqlite_autoindex"):
2716 continue
2717 sig = tuple(idx["column_names"])
2718 auto_index_by_sig[sig] = idx
2719
2720 table_data = self._get_table_sql(
2721 connection, table_name, schema=schema, **kw
2722 )
2723 unique_constraints = []
2724
2725 def parse_uqs():
2726 if table_data is None:
2727 return
2728 UNIQUE_PATTERN = r'(?:CONSTRAINT "?(.+?)"? +)?UNIQUE *\((.+?)\)'
2729 INLINE_UNIQUE_PATTERN = (
2730 r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?)[\t ]'
2731 r"+[a-z0-9_ ]+?[\t ]+UNIQUE"
2732 )
2733
2734 for match in re.finditer(UNIQUE_PATTERN, table_data, re.I):
2735 name, cols = match.group(1, 2)
2736 yield name, list(self._find_cols_in_sig(cols))
2737
2738 # we need to match inlines as well, as we seek to differentiate
2739 # a UNIQUE constraint from a UNIQUE INDEX, even though these
2740 # are kind of the same thing :)
2741 for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I):
2742 cols = list(
2743 self._find_cols_in_sig(match.group(1) or match.group(2))
2744 )
2745 yield None, cols
2746
2747 for name, cols in parse_uqs():
2748 sig = tuple(cols)
2749 if sig in auto_index_by_sig:
2750 auto_index_by_sig.pop(sig)
2751 parsed_constraint = {"name": name, "column_names": cols}
2752 unique_constraints.append(parsed_constraint)
2753 # NOTE: auto_index_by_sig might not be empty here,
2754 # the PRIMARY KEY may have an entry.
2755 if unique_constraints:
2756 return unique_constraints
2757 else:
2758 return ReflectionDefaults.unique_constraints()
2759
2760 @reflection.cache
2761 def get_check_constraints(self, connection, table_name, schema=None, **kw):
2762 table_data = self._get_table_sql(
2763 connection, table_name, schema=schema, **kw
2764 )
2765
2766 # NOTE NOTE NOTE
2767 # DO NOT CHANGE THIS REGULAR EXPRESSION. There is no known way
2768 # to parse CHECK constraints that contain newlines themselves using
2769 # regular expressions, and the approach here relies upon each
2770 # individual
2771 # CHECK constraint being on a single line by itself. This
2772 # necessarily makes assumptions as to how the CREATE TABLE
2773 # was emitted. A more comprehensive DDL parsing solution would be
2774 # needed to improve upon the current situation. See #11840 for
2775 # background
2776 CHECK_PATTERN = r"(?:CONSTRAINT (.+) +)?CHECK *\( *(.+) *\),? *"
2777 cks = []
2778
2779 for match in re.finditer(CHECK_PATTERN, table_data or "", re.I):
2780
2781 name = match.group(1)
2782
2783 if name:
2784 name = re.sub(r'^"|"$', "", name)
2785
2786 cks.append({"sqltext": match.group(2), "name": name})
2787 cks.sort(key=lambda d: d["name"] or "~") # sort None as last
2788 if cks:
2789 return cks
2790 else:
2791 return ReflectionDefaults.check_constraints()
2792
2793 @reflection.cache
2794 def get_indexes(self, connection, table_name, schema=None, **kw):
2795 pragma_indexes = self._get_table_pragma(
2796 connection, "index_list", table_name, schema=schema
2797 )
2798 indexes = []
2799
2800 # regular expression to extract the filter predicate of a partial
2801 # index. this could fail to extract the predicate correctly on
2802 # indexes created like
2803 # CREATE INDEX i ON t (col || ') where') WHERE col <> ''
2804 # but as this function does not support expression-based indexes
2805 # this case does not occur.
2806 partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE)
2807
2808 if schema:
2809 schema_expr = "%s." % self.identifier_preparer.quote_identifier(
2810 schema
2811 )
2812 else:
2813 schema_expr = ""
2814
2815 include_auto_indexes = kw.pop("include_auto_indexes", False)
2816 for row in pragma_indexes:
2817 # ignore implicit primary key index.
2818 # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html
2819 if not include_auto_indexes and row[1].startswith(
2820 "sqlite_autoindex"
2821 ):
2822 continue
2823 indexes.append(
2824 dict(
2825 name=row[1],
2826 column_names=[],
2827 unique=row[2],
2828 dialect_options={},
2829 )
2830 )
2831
2832 # check partial indexes
2833 if len(row) >= 5 and row[4]:
2834 s = (
2835 "SELECT sql FROM %(schema)ssqlite_master "
2836 "WHERE name = ? "
2837 "AND type = 'index'" % {"schema": schema_expr}
2838 )
2839 rs = connection.exec_driver_sql(s, (row[1],))
2840 index_sql = rs.scalar()
2841 predicate_match = partial_pred_re.search(index_sql)
2842 if predicate_match is None:
2843 # unless the regex is broken this case shouldn't happen
2844 # because we know this is a partial index, so the
2845 # definition sql should match the regex
2846 util.warn(
2847 "Failed to look up filter predicate of "
2848 "partial index %s" % row[1]
2849 )
2850 else:
2851 predicate = predicate_match.group(1)
2852 indexes[-1]["dialect_options"]["sqlite_where"] = text(
2853 predicate
2854 )
2855
2856 # loop thru unique indexes to get the column names.
2857 for idx in list(indexes):
2858 pragma_index = self._get_table_pragma(
2859 connection, "index_info", idx["name"], schema=schema
2860 )
2861
2862 for row in pragma_index:
2863 if row[2] is None:
2864 util.warn(
2865 "Skipped unsupported reflection of "
2866 "expression-based index %s" % idx["name"]
2867 )
2868 indexes.remove(idx)
2869 break
2870 else:
2871 idx["column_names"].append(row[2])
2872
2873 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last
2874 if indexes:
2875 return indexes
2876 elif not self.has_table(connection, table_name, schema):
2877 raise exc.NoSuchTableError(
2878 f"{schema}.{table_name}" if schema else table_name
2879 )
2880 else:
2881 return ReflectionDefaults.indexes()
2882
2883 def _is_sys_table(self, table_name):
2884 return table_name in {
2885 "sqlite_schema",
2886 "sqlite_master",
2887 "sqlite_temp_schema",
2888 "sqlite_temp_master",
2889 }
2890
2891 @reflection.cache
2892 def _get_table_sql(self, connection, table_name, schema=None, **kw):
2893 if schema:
2894 schema_expr = "%s." % (
2895 self.identifier_preparer.quote_identifier(schema)
2896 )
2897 else:
2898 schema_expr = ""
2899 try:
2900 s = (
2901 "SELECT sql FROM "
2902 " (SELECT * FROM %(schema)ssqlite_master UNION ALL "
2903 " SELECT * FROM %(schema)ssqlite_temp_master) "
2904 "WHERE name = ? "
2905 "AND type in ('table', 'view')" % {"schema": schema_expr}
2906 )
2907 rs = connection.exec_driver_sql(s, (table_name,))
2908 except exc.DBAPIError:
2909 s = (
2910 "SELECT sql FROM %(schema)ssqlite_master "
2911 "WHERE name = ? "
2912 "AND type in ('table', 'view')" % {"schema": schema_expr}
2913 )
2914 rs = connection.exec_driver_sql(s, (table_name,))
2915 value = rs.scalar()
2916 if value is None and not self._is_sys_table(table_name):
2917 raise exc.NoSuchTableError(f"{schema_expr}{table_name}")
2918 return value
2919
2920 def _get_table_pragma(self, connection, pragma, table_name, schema=None):
2921 quote = self.identifier_preparer.quote_identifier
2922 if schema is not None:
2923 statements = [f"PRAGMA {quote(schema)}."]
2924 else:
2925 # because PRAGMA looks in all attached databases if no schema
2926 # given, need to specify "main" schema, however since we want
2927 # 'temp' tables in the same namespace as 'main', need to run
2928 # the PRAGMA twice
2929 statements = ["PRAGMA main.", "PRAGMA temp."]
2930
2931 qtable = quote(table_name)
2932 for statement in statements:
2933 statement = f"{statement}{pragma}({qtable})"
2934 cursor = connection.exec_driver_sql(statement)
2935 if not cursor._soft_closed:
2936 # work around SQLite issue whereby cursor.description
2937 # is blank when PRAGMA returns no rows:
2938 # https://www.sqlite.org/cvstrac/tktview?tn=1884
2939 result = cursor.fetchall()
2940 else:
2941 result = []
2942 if result:
2943 return result
2944 else:
2945 return []