1# dialects/sqlite/base.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10r"""
11.. dialect:: sqlite
12 :name: SQLite
13 :full_support: 3.36.0
14 :normal_support: 3.12+
15 :best_effort: 3.7.16+
16
17.. _sqlite_datetime:
18
19Date and Time Types
20-------------------
21
22SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does
23not provide out of the box functionality for translating values between Python
24`datetime` objects and a SQLite-supported format. SQLAlchemy's own
25:class:`~sqlalchemy.types.DateTime` and related types provide date formatting
26and parsing functionality when SQLite is used. The implementation classes are
27:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`.
28These types represent dates and times as ISO formatted strings, which also
29nicely support ordering. There's no reliance on typical "libc" internals for
30these functions so historical dates are fully supported.
31
32Ensuring Text affinity
33^^^^^^^^^^^^^^^^^^^^^^
34
35The DDL rendered for these types is the standard ``DATE``, ``TIME``
36and ``DATETIME`` indicators. However, custom storage formats can also be
37applied to these types. When the
38storage format is detected as containing no alpha characters, the DDL for
39these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``,
40so that the column continues to have textual affinity.
41
42.. seealso::
43
44 `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ -
45 in the SQLite documentation
46
47.. _sqlite_autoincrement:
48
49SQLite Auto Incrementing Behavior
50----------------------------------
51
52Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html
53
54Key concepts:
55
56* SQLite has an implicit "auto increment" feature that takes place for any
57 non-composite primary-key column that is specifically created using
58 "INTEGER PRIMARY KEY" for the type + primary key.
59
60* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not**
61 equivalent to the implicit autoincrement feature; this keyword is not
62 recommended for general use. SQLAlchemy does not render this keyword
63 unless a special SQLite-specific directive is used (see below). However,
64 it still requires that the column's type is named "INTEGER".
65
66Using the AUTOINCREMENT Keyword
67^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
68
69To specifically render the AUTOINCREMENT keyword on the primary key column
70when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table
71construct::
72
73 Table('sometable', metadata,
74 Column('id', Integer, primary_key=True),
75 sqlite_autoincrement=True)
76
77Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER
78^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
79
80SQLite's typing model is based on naming conventions. Among other things, this
81means that any type name which contains the substring ``"INT"`` will be
82determined to be of "integer affinity". A type named ``"BIGINT"``,
83``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be
84of "integer" affinity. However, **the SQLite autoincrement feature, whether
85implicitly or explicitly enabled, requires that the name of the column's type
86is exactly the string "INTEGER"**. Therefore, if an application uses a type
87like :class:`.BigInteger` for a primary key, on SQLite this type will need to
88be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE
89TABLE`` statement in order for the autoincrement behavior to be available.
90
91One approach to achieve this is to use :class:`.Integer` on SQLite
92only using :meth:`.TypeEngine.with_variant`::
93
94 table = Table(
95 "my_table", metadata,
96 Column("id", BigInteger().with_variant(Integer, "sqlite"), primary_key=True)
97 )
98
99Another is to use a subclass of :class:`.BigInteger` that overrides its DDL
100name to be ``INTEGER`` when compiled against SQLite::
101
102 from sqlalchemy import BigInteger
103 from sqlalchemy.ext.compiler import compiles
104
105 class SLBigInteger(BigInteger):
106 pass
107
108 @compiles(SLBigInteger, 'sqlite')
109 def bi_c(element, compiler, **kw):
110 return "INTEGER"
111
112 @compiles(SLBigInteger)
113 def bi_c(element, compiler, **kw):
114 return compiler.visit_BIGINT(element, **kw)
115
116
117 table = Table(
118 "my_table", metadata,
119 Column("id", SLBigInteger(), primary_key=True)
120 )
121
122.. seealso::
123
124 :meth:`.TypeEngine.with_variant`
125
126 :ref:`sqlalchemy.ext.compiler_toplevel`
127
128 `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_
129
130.. _sqlite_concurrency:
131
132Database Locking Behavior / Concurrency
133---------------------------------------
134
135SQLite is not designed for a high level of write concurrency. The database
136itself, being a file, is locked completely during write operations within
137transactions, meaning exactly one "connection" (in reality a file handle)
138has exclusive access to the database during this period - all other
139"connections" will be blocked during this time.
140
141The Python DBAPI specification also calls for a connection model that is
142always in a transaction; there is no ``connection.begin()`` method,
143only ``connection.commit()`` and ``connection.rollback()``, upon which a
144new transaction is to be begun immediately. This may seem to imply
145that the SQLite driver would in theory allow only a single filehandle on a
146particular database file at any time; however, there are several
147factors both within SQLite itself as well as within the pysqlite driver
148which loosen this restriction significantly.
149
150However, no matter what locking modes are used, SQLite will still always
151lock the database file once a transaction is started and DML (e.g. INSERT,
152UPDATE, DELETE) has at least been emitted, and this will block
153other transactions at least at the point that they also attempt to emit DML.
154By default, the length of time on this block is very short before it times out
155with an error.
156
157This behavior becomes more critical when used in conjunction with the
158SQLAlchemy ORM. SQLAlchemy's :class:`.Session` object by default runs
159within a transaction, and with its autoflush model, may emit DML preceding
160any SELECT statement. This may lead to a SQLite database that locks
161more quickly than is expected. The locking mode of SQLite and the pysqlite
162driver can be manipulated to some degree, however it should be noted that
163achieving a high degree of write-concurrency with SQLite is a losing battle.
164
165For more information on SQLite's lack of write concurrency by design, please
166see
167`Situations Where Another RDBMS May Work Better - High Concurrency
168<https://www.sqlite.org/whentouse.html>`_ near the bottom of the page.
169
170The following subsections introduce areas that are impacted by SQLite's
171file-based architecture and additionally will usually require workarounds to
172work when using the pysqlite driver.
173
174.. _sqlite_isolation_level:
175
176Transaction Isolation Level / Autocommit
177----------------------------------------
178
179SQLite supports "transaction isolation" in a non-standard way, along two
180axes. One is that of the
181`PRAGMA read_uncommitted <https://www.sqlite.org/pragma.html#pragma_read_uncommitted>`_
182instruction. This setting can essentially switch SQLite between its
183default mode of ``SERIALIZABLE`` isolation, and a "dirty read" isolation
184mode normally referred to as ``READ UNCOMMITTED``.
185
186SQLAlchemy ties into this PRAGMA statement using the
187:paramref:`_sa.create_engine.isolation_level` parameter of
188:func:`_sa.create_engine`.
189Valid values for this parameter when used with SQLite are ``"SERIALIZABLE"``
190and ``"READ UNCOMMITTED"`` corresponding to a value of 0 and 1, respectively.
191SQLite defaults to ``SERIALIZABLE``, however its behavior is impacted by
192the pysqlite driver's default behavior.
193
194When using the pysqlite driver, the ``"AUTOCOMMIT"`` isolation level is also
195available, which will alter the pysqlite connection using the ``.isolation_level``
196attribute on the DBAPI connection and set it to None for the duration
197of the setting.
198
199.. versionadded:: 1.3.16 added support for SQLite AUTOCOMMIT isolation level
200 when using the pysqlite / sqlite3 SQLite driver.
201
202
203The other axis along which SQLite's transactional locking is impacted is
204via the nature of the ``BEGIN`` statement used. The three varieties
205are "deferred", "immediate", and "exclusive", as described at
206`BEGIN TRANSACTION <https://sqlite.org/lang_transaction.html>`_. A straight
207``BEGIN`` statement uses the "deferred" mode, where the database file is
208not locked until the first read or write operation, and read access remains
209open to other transactions until the first write operation. But again,
210it is critical to note that the pysqlite driver interferes with this behavior
211by *not even emitting BEGIN* until the first write operation.
212
213.. warning::
214
215 SQLite's transactional scope is impacted by unresolved
216 issues in the pysqlite driver, which defers BEGIN statements to a greater
217 degree than is often feasible. See the section :ref:`pysqlite_serializable`
218 or :ref:`aiosqlite_serializable` for techniques to work around this behavior.
219
220.. seealso::
221
222 :ref:`dbapi_autocommit`
223
224INSERT/UPDATE/DELETE...RETURNING
225---------------------------------
226
227The SQLite dialect supports SQLite 3.35's ``INSERT|UPDATE|DELETE..RETURNING``
228syntax. ``INSERT..RETURNING`` may be used
229automatically in some cases in order to fetch newly generated identifiers in
230place of the traditional approach of using ``cursor.lastrowid``, however
231``cursor.lastrowid`` is currently still preferred for simple single-statement
232cases for its better performance.
233
234To specify an explicit ``RETURNING`` clause, use the
235:meth:`._UpdateBase.returning` method on a per-statement basis::
236
237 # INSERT..RETURNING
238 result = connection.execute(
239 table.insert().
240 values(name='foo').
241 returning(table.c.col1, table.c.col2)
242 )
243 print(result.all())
244
245 # UPDATE..RETURNING
246 result = connection.execute(
247 table.update().
248 where(table.c.name=='foo').
249 values(name='bar').
250 returning(table.c.col1, table.c.col2)
251 )
252 print(result.all())
253
254 # DELETE..RETURNING
255 result = connection.execute(
256 table.delete().
257 where(table.c.name=='foo').
258 returning(table.c.col1, table.c.col2)
259 )
260 print(result.all())
261
262.. versionadded:: 2.0 Added support for SQLite RETURNING
263
264SAVEPOINT Support
265----------------------------
266
267SQLite supports SAVEPOINTs, which only function once a transaction is
268begun. SQLAlchemy's SAVEPOINT support is available using the
269:meth:`_engine.Connection.begin_nested` method at the Core level, and
270:meth:`.Session.begin_nested` at the ORM level. However, SAVEPOINTs
271won't work at all with pysqlite unless workarounds are taken.
272
273.. warning::
274
275 SQLite's SAVEPOINT feature is impacted by unresolved
276 issues in the pysqlite and aiosqlite drivers, which defer BEGIN statements
277 to a greater degree than is often feasible. See the sections
278 :ref:`pysqlite_serializable` and :ref:`aiosqlite_serializable`
279 for techniques to work around this behavior.
280
281Transactional DDL
282----------------------------
283
284The SQLite database supports transactional :term:`DDL` as well.
285In this case, the pysqlite driver is not only failing to start transactions,
286it also is ending any existing transaction when DDL is detected, so again,
287workarounds are required.
288
289.. warning::
290
291 SQLite's transactional DDL is impacted by unresolved issues
292 in the pysqlite driver, which fails to emit BEGIN and additionally
293 forces a COMMIT to cancel any transaction when DDL is encountered.
294 See the section :ref:`pysqlite_serializable`
295 for techniques to work around this behavior.
296
297.. _sqlite_foreign_keys:
298
299Foreign Key Support
300-------------------
301
302SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables,
303however by default these constraints have no effect on the operation of the
304table.
305
306Constraint checking on SQLite has three prerequisites:
307
308* At least version 3.6.19 of SQLite must be in use
309* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY
310 or SQLITE_OMIT_TRIGGER symbols enabled.
311* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all
312 connections before use -- including the initial call to
313 :meth:`sqlalchemy.schema.MetaData.create_all`.
314
315SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for
316new connections through the usage of events::
317
318 from sqlalchemy.engine import Engine
319 from sqlalchemy import event
320
321 @event.listens_for(Engine, "connect")
322 def set_sqlite_pragma(dbapi_connection, connection_record):
323 cursor = dbapi_connection.cursor()
324 cursor.execute("PRAGMA foreign_keys=ON")
325 cursor.close()
326
327.. warning::
328
329 When SQLite foreign keys are enabled, it is **not possible**
330 to emit CREATE or DROP statements for tables that contain
331 mutually-dependent foreign key constraints;
332 to emit the DDL for these tables requires that ALTER TABLE be used to
333 create or drop these constraints separately, for which SQLite has
334 no support.
335
336.. seealso::
337
338 `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_
339 - on the SQLite web site.
340
341 :ref:`event_toplevel` - SQLAlchemy event API.
342
343 :ref:`use_alter` - more information on SQLAlchemy's facilities for handling
344 mutually-dependent foreign key constraints.
345
346.. _sqlite_on_conflict_ddl:
347
348ON CONFLICT support for constraints
349-----------------------------------
350
351.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for
352 SQLite, which occurs within a CREATE TABLE statement. For "ON CONFLICT" as
353 applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`.
354
355SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied
356to primary key, unique, check, and not null constraints. In DDL, it is
357rendered either within the "CONSTRAINT" clause or within the column definition
358itself depending on the location of the target constraint. To render this
359clause within DDL, the extension parameter ``sqlite_on_conflict`` can be
360specified with a string conflict resolution algorithm within the
361:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`,
362:class:`.CheckConstraint` objects. Within the :class:`_schema.Column` object,
363there
364are individual parameters ``sqlite_on_conflict_not_null``,
365``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each
366correspond to the three types of relevant constraint types that can be
367indicated from a :class:`_schema.Column` object.
368
369.. seealso::
370
371 `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite
372 documentation
373
374.. versionadded:: 1.3
375
376
377The ``sqlite_on_conflict`` parameters accept a string argument which is just
378the resolution name to be chosen, which on SQLite can be one of ROLLBACK,
379ABORT, FAIL, IGNORE, and REPLACE. For example, to add a UNIQUE constraint
380that specifies the IGNORE algorithm::
381
382 some_table = Table(
383 'some_table', metadata,
384 Column('id', Integer, primary_key=True),
385 Column('data', Integer),
386 UniqueConstraint('id', 'data', sqlite_on_conflict='IGNORE')
387 )
388
389The above renders CREATE TABLE DDL as::
390
391 CREATE TABLE some_table (
392 id INTEGER NOT NULL,
393 data INTEGER,
394 PRIMARY KEY (id),
395 UNIQUE (id, data) ON CONFLICT IGNORE
396 )
397
398
399When using the :paramref:`_schema.Column.unique`
400flag to add a UNIQUE constraint
401to a single column, the ``sqlite_on_conflict_unique`` parameter can
402be added to the :class:`_schema.Column` as well, which will be added to the
403UNIQUE constraint in the DDL::
404
405 some_table = Table(
406 'some_table', metadata,
407 Column('id', Integer, primary_key=True),
408 Column('data', Integer, unique=True,
409 sqlite_on_conflict_unique='IGNORE')
410 )
411
412rendering::
413
414 CREATE TABLE some_table (
415 id INTEGER NOT NULL,
416 data INTEGER,
417 PRIMARY KEY (id),
418 UNIQUE (data) ON CONFLICT IGNORE
419 )
420
421To apply the FAIL algorithm for a NOT NULL constraint,
422``sqlite_on_conflict_not_null`` is used::
423
424 some_table = Table(
425 'some_table', metadata,
426 Column('id', Integer, primary_key=True),
427 Column('data', Integer, nullable=False,
428 sqlite_on_conflict_not_null='FAIL')
429 )
430
431this renders the column inline ON CONFLICT phrase::
432
433 CREATE TABLE some_table (
434 id INTEGER NOT NULL,
435 data INTEGER NOT NULL ON CONFLICT FAIL,
436 PRIMARY KEY (id)
437 )
438
439
440Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``::
441
442 some_table = Table(
443 'some_table', metadata,
444 Column('id', Integer, primary_key=True,
445 sqlite_on_conflict_primary_key='FAIL')
446 )
447
448SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict
449resolution algorithm is applied to the constraint itself::
450
451 CREATE TABLE some_table (
452 id INTEGER NOT NULL,
453 PRIMARY KEY (id) ON CONFLICT FAIL
454 )
455
456.. _sqlite_on_conflict_insert:
457
458INSERT...ON CONFLICT (Upsert)
459-----------------------------------
460
461.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for
462 SQLite, which occurs within an INSERT statement. For "ON CONFLICT" as
463 applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`.
464
465From version 3.24.0 onwards, SQLite supports "upserts" (update or insert)
466of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT``
467statement. A candidate row will only be inserted if that row does not violate
468any unique or primary key constraints. In the case of a unique constraint violation, a
469secondary action can occur which can be either "DO UPDATE", indicating that
470the data in the target row should be updated, or "DO NOTHING", which indicates
471to silently skip this row.
472
473Conflicts are determined using columns that are part of existing unique
474constraints and indexes. These constraints are identified by stating the
475columns and conditions that comprise the indexes.
476
477SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific
478:func:`_sqlite.insert()` function, which provides
479the generative methods :meth:`_sqlite.Insert.on_conflict_do_update`
480and :meth:`_sqlite.Insert.on_conflict_do_nothing`:
481
482.. sourcecode:: pycon+sql
483
484 >>> from sqlalchemy.dialects.sqlite import insert
485
486 >>> insert_stmt = insert(my_table).values(
487 ... id='some_existing_id',
488 ... data='inserted value')
489
490 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
491 ... index_elements=['id'],
492 ... set_=dict(data='updated value')
493 ... )
494
495 >>> print(do_update_stmt)
496 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
497 ON CONFLICT (id) DO UPDATE SET data = ?{stop}
498
499 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
500 ... index_elements=['id']
501 ... )
502
503 >>> print(do_nothing_stmt)
504 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
505 ON CONFLICT (id) DO NOTHING
506
507.. versionadded:: 1.4
508
509.. seealso::
510
511 `Upsert
512 <https://sqlite.org/lang_UPSERT.html>`_
513 - in the SQLite documentation.
514
515
516Specifying the Target
517^^^^^^^^^^^^^^^^^^^^^
518
519Both methods supply the "target" of the conflict using column inference:
520
521* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument
522 specifies a sequence containing string column names, :class:`_schema.Column`
523 objects, and/or SQL expression elements, which would identify a unique index
524 or unique constraint.
525
526* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements`
527 to infer an index, a partial index can be inferred by also specifying the
528 :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter:
529
530 .. sourcecode:: pycon+sql
531
532 >>> stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
533
534 >>> do_update_stmt = stmt.on_conflict_do_update(
535 ... index_elements=[my_table.c.user_email],
536 ... index_where=my_table.c.user_email.like('%@gmail.com'),
537 ... set_=dict(data=stmt.excluded.data)
538 ... )
539
540 >>> print(do_update_stmt)
541 {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?)
542 ON CONFLICT (user_email)
543 WHERE user_email LIKE '%@gmail.com'
544 DO UPDATE SET data = excluded.data
545
546The SET Clause
547^^^^^^^^^^^^^^^
548
549``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
550existing row, using any combination of new values as well as values
551from the proposed insertion. These values are specified using the
552:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter. This
553parameter accepts a dictionary which consists of direct values
554for UPDATE:
555
556.. sourcecode:: pycon+sql
557
558 >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
559
560 >>> do_update_stmt = stmt.on_conflict_do_update(
561 ... index_elements=['id'],
562 ... set_=dict(data='updated value')
563 ... )
564
565 >>> print(do_update_stmt)
566 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
567 ON CONFLICT (id) DO UPDATE SET data = ?
568
569.. warning::
570
571 The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take
572 into account Python-side default UPDATE values or generation functions,
573 e.g. those specified using :paramref:`_schema.Column.onupdate`. These
574 values will not be exercised for an ON CONFLICT style of UPDATE, unless
575 they are manually specified in the
576 :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary.
577
578Updating using the Excluded INSERT Values
579^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
580
581In order to refer to the proposed insertion row, the special alias
582:attr:`~.sqlite.Insert.excluded` is available as an attribute on
583the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix
584on a column, that informs the DO UPDATE to update the row with the value that
585would have been inserted had the constraint not failed:
586
587.. sourcecode:: pycon+sql
588
589 >>> stmt = insert(my_table).values(
590 ... id='some_id',
591 ... data='inserted value',
592 ... author='jlh'
593 ... )
594
595 >>> do_update_stmt = stmt.on_conflict_do_update(
596 ... index_elements=['id'],
597 ... set_=dict(data='updated value', author=stmt.excluded.author)
598 ... )
599
600 >>> print(do_update_stmt)
601 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
602 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
603
604Additional WHERE Criteria
605^^^^^^^^^^^^^^^^^^^^^^^^^
606
607The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts
608a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where`
609parameter, which will limit those rows which receive an UPDATE:
610
611.. sourcecode:: pycon+sql
612
613 >>> stmt = insert(my_table).values(
614 ... id='some_id',
615 ... data='inserted value',
616 ... author='jlh'
617 ... )
618
619 >>> on_update_stmt = stmt.on_conflict_do_update(
620 ... index_elements=['id'],
621 ... set_=dict(data='updated value', author=stmt.excluded.author),
622 ... where=(my_table.c.status == 2)
623 ... )
624 >>> print(on_update_stmt)
625 {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
626 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
627 WHERE my_table.status = ?
628
629
630Skipping Rows with DO NOTHING
631^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
632
633``ON CONFLICT`` may be used to skip inserting a row entirely
634if any conflict with a unique constraint occurs; below this is illustrated
635using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method:
636
637.. sourcecode:: pycon+sql
638
639 >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
640 >>> stmt = stmt.on_conflict_do_nothing(index_elements=['id'])
641 >>> print(stmt)
642 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING
643
644
645If ``DO NOTHING`` is used without specifying any columns or constraint,
646it has the effect of skipping the INSERT for any unique violation which
647occurs:
648
649.. sourcecode:: pycon+sql
650
651 >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
652 >>> stmt = stmt.on_conflict_do_nothing()
653 >>> print(stmt)
654 {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING
655
656.. _sqlite_type_reflection:
657
658Type Reflection
659---------------
660
661SQLite types are unlike those of most other database backends, in that
662the string name of the type usually does not correspond to a "type" in a
663one-to-one fashion. Instead, SQLite links per-column typing behavior
664to one of five so-called "type affinities" based on a string matching
665pattern for the type.
666
667SQLAlchemy's reflection process, when inspecting types, uses a simple
668lookup table to link the keywords returned to provided SQLAlchemy types.
669This lookup table is present within the SQLite dialect as it is for all
670other dialects. However, the SQLite dialect has a different "fallback"
671routine for when a particular type name is not located in the lookup map;
672it instead implements the SQLite "type affinity" scheme located at
673https://www.sqlite.org/datatype3.html section 2.1.
674
675The provided typemap will make direct associations from an exact string
676name match for the following types:
677
678:class:`_types.BIGINT`, :class:`_types.BLOB`,
679:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`,
680:class:`_types.CHAR`, :class:`_types.DATE`,
681:class:`_types.DATETIME`, :class:`_types.FLOAT`,
682:class:`_types.DECIMAL`, :class:`_types.FLOAT`,
683:class:`_types.INTEGER`, :class:`_types.INTEGER`,
684:class:`_types.NUMERIC`, :class:`_types.REAL`,
685:class:`_types.SMALLINT`, :class:`_types.TEXT`,
686:class:`_types.TIME`, :class:`_types.TIMESTAMP`,
687:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`,
688:class:`_types.NCHAR`
689
690When a type name does not match one of the above types, the "type affinity"
691lookup is used instead:
692
693* :class:`_types.INTEGER` is returned if the type name includes the
694 string ``INT``
695* :class:`_types.TEXT` is returned if the type name includes the
696 string ``CHAR``, ``CLOB`` or ``TEXT``
697* :class:`_types.NullType` is returned if the type name includes the
698 string ``BLOB``
699* :class:`_types.REAL` is returned if the type name includes the string
700 ``REAL``, ``FLOA`` or ``DOUB``.
701* Otherwise, the :class:`_types.NUMERIC` type is used.
702
703.. _sqlite_partial_index:
704
705Partial Indexes
706---------------
707
708A partial index, e.g. one which uses a WHERE clause, can be specified
709with the DDL system using the argument ``sqlite_where``::
710
711 tbl = Table('testtbl', m, Column('data', Integer))
712 idx = Index('test_idx1', tbl.c.data,
713 sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10))
714
715The index will be rendered at create time as::
716
717 CREATE INDEX test_idx1 ON testtbl (data)
718 WHERE data > 5 AND data < 10
719
720.. _sqlite_dotted_column_names:
721
722Dotted Column Names
723-------------------
724
725Using table or column names that explicitly have periods in them is
726**not recommended**. While this is generally a bad idea for relational
727databases in general, as the dot is a syntactically significant character,
728the SQLite driver up until version **3.10.0** of SQLite has a bug which
729requires that SQLAlchemy filter out these dots in result sets.
730
731The bug, entirely outside of SQLAlchemy, can be illustrated thusly::
732
733 import sqlite3
734
735 assert sqlite3.sqlite_version_info < (3, 10, 0), "bug is fixed in this version"
736
737 conn = sqlite3.connect(":memory:")
738 cursor = conn.cursor()
739
740 cursor.execute("create table x (a integer, b integer)")
741 cursor.execute("insert into x (a, b) values (1, 1)")
742 cursor.execute("insert into x (a, b) values (2, 2)")
743
744 cursor.execute("select x.a, x.b from x")
745 assert [c[0] for c in cursor.description] == ['a', 'b']
746
747 cursor.execute('''
748 select x.a, x.b from x where a=1
749 union
750 select x.a, x.b from x where a=2
751 ''')
752 assert [c[0] for c in cursor.description] == ['a', 'b'], \
753 [c[0] for c in cursor.description]
754
755The second assertion fails::
756
757 Traceback (most recent call last):
758 File "test.py", line 19, in <module>
759 [c[0] for c in cursor.description]
760 AssertionError: ['x.a', 'x.b']
761
762Where above, the driver incorrectly reports the names of the columns
763including the name of the table, which is entirely inconsistent vs.
764when the UNION is not present.
765
766SQLAlchemy relies upon column names being predictable in how they match
767to the original statement, so the SQLAlchemy dialect has no choice but
768to filter these out::
769
770
771 from sqlalchemy import create_engine
772
773 eng = create_engine("sqlite://")
774 conn = eng.connect()
775
776 conn.exec_driver_sql("create table x (a integer, b integer)")
777 conn.exec_driver_sql("insert into x (a, b) values (1, 1)")
778 conn.exec_driver_sql("insert into x (a, b) values (2, 2)")
779
780 result = conn.exec_driver_sql("select x.a, x.b from x")
781 assert result.keys() == ["a", "b"]
782
783 result = conn.exec_driver_sql('''
784 select x.a, x.b from x where a=1
785 union
786 select x.a, x.b from x where a=2
787 ''')
788 assert result.keys() == ["a", "b"]
789
790Note that above, even though SQLAlchemy filters out the dots, *both
791names are still addressable*::
792
793 >>> row = result.first()
794 >>> row["a"]
795 1
796 >>> row["x.a"]
797 1
798 >>> row["b"]
799 1
800 >>> row["x.b"]
801 1
802
803Therefore, the workaround applied by SQLAlchemy only impacts
804:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In
805the very specific case where an application is forced to use column names that
806contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and
807:meth:`.Row.keys()` is required to return these dotted names unmodified,
808the ``sqlite_raw_colnames`` execution option may be provided, either on a
809per-:class:`_engine.Connection` basis::
810
811 result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql('''
812 select x.a, x.b from x where a=1
813 union
814 select x.a, x.b from x where a=2
815 ''')
816 assert result.keys() == ["x.a", "x.b"]
817
818or on a per-:class:`_engine.Engine` basis::
819
820 engine = create_engine("sqlite://", execution_options={"sqlite_raw_colnames": True})
821
822When using the per-:class:`_engine.Engine` execution option, note that
823**Core and ORM queries that use UNION may not function properly**.
824
825SQLite-specific table options
826-----------------------------
827
828One option for CREATE TABLE is supported directly by the SQLite
829dialect in conjunction with the :class:`_schema.Table` construct:
830
831* ``WITHOUT ROWID``::
832
833 Table("some_table", metadata, ..., sqlite_with_rowid=False)
834
835.. seealso::
836
837 `SQLite CREATE TABLE options
838 <https://www.sqlite.org/lang_createtable.html>`_
839
840
841.. _sqlite_include_internal:
842
843Reflecting internal schema tables
844----------------------------------
845
846Reflection methods that return lists of tables will omit so-called
847"SQLite internal schema object" names, which are considered by SQLite
848as any object name that is prefixed with ``sqlite_``. An example of
849such an object is the ``sqlite_sequence`` table that's generated when
850the ``AUTOINCREMENT`` column parameter is used. In order to return
851these objects, the parameter ``sqlite_include_internal=True`` may be
852passed to methods such as :meth:`_schema.MetaData.reflect` or
853:meth:`.Inspector.get_table_names`.
854
855.. versionadded:: 2.0 Added the ``sqlite_include_internal=True`` parameter.
856 Previously, these tables were not ignored by SQLAlchemy reflection
857 methods.
858
859.. note::
860
861 The ``sqlite_include_internal`` parameter does not refer to the
862 "system" tables that are present in schemas such as ``sqlite_master``.
863
864.. seealso::
865
866 `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite
867 documentation.
868
869""" # noqa
870from __future__ import annotations
871
872import datetime
873import numbers
874import re
875from typing import Optional
876
877from .json import JSON
878from .json import JSONIndexType
879from .json import JSONPathType
880from ... import exc
881from ... import schema as sa_schema
882from ... import sql
883from ... import text
884from ... import types as sqltypes
885from ... import util
886from ...engine import default
887from ...engine import processors
888from ...engine import reflection
889from ...engine.reflection import ReflectionDefaults
890from ...sql import coercions
891from ...sql import ColumnElement
892from ...sql import compiler
893from ...sql import elements
894from ...sql import roles
895from ...sql import schema
896from ...types import BLOB # noqa
897from ...types import BOOLEAN # noqa
898from ...types import CHAR # noqa
899from ...types import DECIMAL # noqa
900from ...types import FLOAT # noqa
901from ...types import INTEGER # noqa
902from ...types import NUMERIC # noqa
903from ...types import REAL # noqa
904from ...types import SMALLINT # noqa
905from ...types import TEXT # noqa
906from ...types import TIMESTAMP # noqa
907from ...types import VARCHAR # noqa
908
909
910class _SQliteJson(JSON):
911 def result_processor(self, dialect, coltype):
912 default_processor = super().result_processor(dialect, coltype)
913
914 def process(value):
915 try:
916 return default_processor(value)
917 except TypeError:
918 if isinstance(value, numbers.Number):
919 return value
920 else:
921 raise
922
923 return process
924
925
926class _DateTimeMixin:
927 _reg = None
928 _storage_format = None
929
930 def __init__(self, storage_format=None, regexp=None, **kw):
931 super().__init__(**kw)
932 if regexp is not None:
933 self._reg = re.compile(regexp)
934 if storage_format is not None:
935 self._storage_format = storage_format
936
937 @property
938 def format_is_text_affinity(self):
939 """return True if the storage format will automatically imply
940 a TEXT affinity.
941
942 If the storage format contains no non-numeric characters,
943 it will imply a NUMERIC storage format on SQLite; in this case,
944 the type will generate its DDL as DATE_CHAR, DATETIME_CHAR,
945 TIME_CHAR.
946
947 """
948 spec = self._storage_format % {
949 "year": 0,
950 "month": 0,
951 "day": 0,
952 "hour": 0,
953 "minute": 0,
954 "second": 0,
955 "microsecond": 0,
956 }
957 return bool(re.search(r"[^0-9]", spec))
958
959 def adapt(self, cls, **kw):
960 if issubclass(cls, _DateTimeMixin):
961 if self._storage_format:
962 kw["storage_format"] = self._storage_format
963 if self._reg:
964 kw["regexp"] = self._reg
965 return super().adapt(cls, **kw)
966
967 def literal_processor(self, dialect):
968 bp = self.bind_processor(dialect)
969
970 def process(value):
971 return "'%s'" % bp(value)
972
973 return process
974
975
976class DATETIME(_DateTimeMixin, sqltypes.DateTime):
977 r"""Represent a Python datetime object in SQLite using a string.
978
979 The default string storage format is::
980
981 "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
982
983 e.g.::
984
985 2021-03-15 12:05:57.105542
986
987 The incoming storage format is by default parsed using the
988 Python ``datetime.fromisoformat()`` function.
989
990 .. versionchanged:: 2.0 ``datetime.fromisoformat()`` is used for default
991 datetime string parsing.
992
993 The storage format can be customized to some degree using the
994 ``storage_format`` and ``regexp`` parameters, such as::
995
996 import re
997 from sqlalchemy.dialects.sqlite import DATETIME
998
999 dt = DATETIME(storage_format="%(year)04d/%(month)02d/%(day)02d "
1000 "%(hour)02d:%(minute)02d:%(second)02d",
1001 regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)"
1002 )
1003
1004 :param storage_format: format string which will be applied to the dict
1005 with keys year, month, day, hour, minute, second, and microsecond.
1006
1007 :param regexp: regular expression which will be applied to incoming result
1008 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming
1009 strings. If the regexp contains named groups, the resulting match dict is
1010 applied to the Python datetime() constructor as keyword arguments.
1011 Otherwise, if positional groups are used, the datetime() constructor
1012 is called with positional arguments via
1013 ``*map(int, match_obj.groups(0))``.
1014
1015 """ # noqa
1016
1017 _storage_format = (
1018 "%(year)04d-%(month)02d-%(day)02d "
1019 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1020 )
1021
1022 def __init__(self, *args, **kwargs):
1023 truncate_microseconds = kwargs.pop("truncate_microseconds", False)
1024 super().__init__(*args, **kwargs)
1025 if truncate_microseconds:
1026 assert "storage_format" not in kwargs, (
1027 "You can specify only "
1028 "one of truncate_microseconds or storage_format."
1029 )
1030 assert "regexp" not in kwargs, (
1031 "You can specify only one of "
1032 "truncate_microseconds or regexp."
1033 )
1034 self._storage_format = (
1035 "%(year)04d-%(month)02d-%(day)02d "
1036 "%(hour)02d:%(minute)02d:%(second)02d"
1037 )
1038
1039 def bind_processor(self, dialect):
1040 datetime_datetime = datetime.datetime
1041 datetime_date = datetime.date
1042 format_ = self._storage_format
1043
1044 def process(value):
1045 if value is None:
1046 return None
1047 elif isinstance(value, datetime_datetime):
1048 return format_ % {
1049 "year": value.year,
1050 "month": value.month,
1051 "day": value.day,
1052 "hour": value.hour,
1053 "minute": value.minute,
1054 "second": value.second,
1055 "microsecond": value.microsecond,
1056 }
1057 elif isinstance(value, datetime_date):
1058 return format_ % {
1059 "year": value.year,
1060 "month": value.month,
1061 "day": value.day,
1062 "hour": 0,
1063 "minute": 0,
1064 "second": 0,
1065 "microsecond": 0,
1066 }
1067 else:
1068 raise TypeError(
1069 "SQLite DateTime type only accepts Python "
1070 "datetime and date objects as input."
1071 )
1072
1073 return process
1074
1075 def result_processor(self, dialect, coltype):
1076 if self._reg:
1077 return processors.str_to_datetime_processor_factory(
1078 self._reg, datetime.datetime
1079 )
1080 else:
1081 return processors.str_to_datetime
1082
1083
1084class DATE(_DateTimeMixin, sqltypes.Date):
1085 r"""Represent a Python date object in SQLite using a string.
1086
1087 The default string storage format is::
1088
1089 "%(year)04d-%(month)02d-%(day)02d"
1090
1091 e.g.::
1092
1093 2011-03-15
1094
1095 The incoming storage format is by default parsed using the
1096 Python ``date.fromisoformat()`` function.
1097
1098 .. versionchanged:: 2.0 ``date.fromisoformat()`` is used for default
1099 date string parsing.
1100
1101
1102 The storage format can be customized to some degree using the
1103 ``storage_format`` and ``regexp`` parameters, such as::
1104
1105 import re
1106 from sqlalchemy.dialects.sqlite import DATE
1107
1108 d = DATE(
1109 storage_format="%(month)02d/%(day)02d/%(year)04d",
1110 regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)")
1111 )
1112
1113 :param storage_format: format string which will be applied to the
1114 dict with keys year, month, and day.
1115
1116 :param regexp: regular expression which will be applied to
1117 incoming result rows, replacing the use of ``date.fromisoformat()`` to
1118 parse incoming strings. If the regexp contains named groups, the resulting
1119 match dict is applied to the Python date() constructor as keyword
1120 arguments. Otherwise, if positional groups are used, the date()
1121 constructor is called with positional arguments via
1122 ``*map(int, match_obj.groups(0))``.
1123
1124 """
1125
1126 _storage_format = "%(year)04d-%(month)02d-%(day)02d"
1127
1128 def bind_processor(self, dialect):
1129 datetime_date = datetime.date
1130 format_ = self._storage_format
1131
1132 def process(value):
1133 if value is None:
1134 return None
1135 elif isinstance(value, datetime_date):
1136 return format_ % {
1137 "year": value.year,
1138 "month": value.month,
1139 "day": value.day,
1140 }
1141 else:
1142 raise TypeError(
1143 "SQLite Date type only accepts Python "
1144 "date objects as input."
1145 )
1146
1147 return process
1148
1149 def result_processor(self, dialect, coltype):
1150 if self._reg:
1151 return processors.str_to_datetime_processor_factory(
1152 self._reg, datetime.date
1153 )
1154 else:
1155 return processors.str_to_date
1156
1157
1158class TIME(_DateTimeMixin, sqltypes.Time):
1159 r"""Represent a Python time object in SQLite using a string.
1160
1161 The default string storage format is::
1162
1163 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1164
1165 e.g.::
1166
1167 12:05:57.10558
1168
1169 The incoming storage format is by default parsed using the
1170 Python ``time.fromisoformat()`` function.
1171
1172 .. versionchanged:: 2.0 ``time.fromisoformat()`` is used for default
1173 time string parsing.
1174
1175 The storage format can be customized to some degree using the
1176 ``storage_format`` and ``regexp`` parameters, such as::
1177
1178 import re
1179 from sqlalchemy.dialects.sqlite import TIME
1180
1181 t = TIME(storage_format="%(hour)02d-%(minute)02d-"
1182 "%(second)02d-%(microsecond)06d",
1183 regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?")
1184 )
1185
1186 :param storage_format: format string which will be applied to the dict
1187 with keys hour, minute, second, and microsecond.
1188
1189 :param regexp: regular expression which will be applied to incoming result
1190 rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming
1191 strings. If the regexp contains named groups, the resulting match dict is
1192 applied to the Python time() constructor as keyword arguments. Otherwise,
1193 if positional groups are used, the time() constructor is called with
1194 positional arguments via ``*map(int, match_obj.groups(0))``.
1195
1196 """
1197
1198 _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1199
1200 def __init__(self, *args, **kwargs):
1201 truncate_microseconds = kwargs.pop("truncate_microseconds", False)
1202 super().__init__(*args, **kwargs)
1203 if truncate_microseconds:
1204 assert "storage_format" not in kwargs, (
1205 "You can specify only "
1206 "one of truncate_microseconds or storage_format."
1207 )
1208 assert "regexp" not in kwargs, (
1209 "You can specify only one of "
1210 "truncate_microseconds or regexp."
1211 )
1212 self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d"
1213
1214 def bind_processor(self, dialect):
1215 datetime_time = datetime.time
1216 format_ = self._storage_format
1217
1218 def process(value):
1219 if value is None:
1220 return None
1221 elif isinstance(value, datetime_time):
1222 return format_ % {
1223 "hour": value.hour,
1224 "minute": value.minute,
1225 "second": value.second,
1226 "microsecond": value.microsecond,
1227 }
1228 else:
1229 raise TypeError(
1230 "SQLite Time type only accepts Python "
1231 "time objects as input."
1232 )
1233
1234 return process
1235
1236 def result_processor(self, dialect, coltype):
1237 if self._reg:
1238 return processors.str_to_datetime_processor_factory(
1239 self._reg, datetime.time
1240 )
1241 else:
1242 return processors.str_to_time
1243
1244
1245colspecs = {
1246 sqltypes.Date: DATE,
1247 sqltypes.DateTime: DATETIME,
1248 sqltypes.JSON: _SQliteJson,
1249 sqltypes.JSON.JSONIndexType: JSONIndexType,
1250 sqltypes.JSON.JSONPathType: JSONPathType,
1251 sqltypes.Time: TIME,
1252}
1253
1254ischema_names = {
1255 "BIGINT": sqltypes.BIGINT,
1256 "BLOB": sqltypes.BLOB,
1257 "BOOL": sqltypes.BOOLEAN,
1258 "BOOLEAN": sqltypes.BOOLEAN,
1259 "CHAR": sqltypes.CHAR,
1260 "DATE": sqltypes.DATE,
1261 "DATE_CHAR": sqltypes.DATE,
1262 "DATETIME": sqltypes.DATETIME,
1263 "DATETIME_CHAR": sqltypes.DATETIME,
1264 "DOUBLE": sqltypes.DOUBLE,
1265 "DECIMAL": sqltypes.DECIMAL,
1266 "FLOAT": sqltypes.FLOAT,
1267 "INT": sqltypes.INTEGER,
1268 "INTEGER": sqltypes.INTEGER,
1269 "JSON": JSON,
1270 "NUMERIC": sqltypes.NUMERIC,
1271 "REAL": sqltypes.REAL,
1272 "SMALLINT": sqltypes.SMALLINT,
1273 "TEXT": sqltypes.TEXT,
1274 "TIME": sqltypes.TIME,
1275 "TIME_CHAR": sqltypes.TIME,
1276 "TIMESTAMP": sqltypes.TIMESTAMP,
1277 "VARCHAR": sqltypes.VARCHAR,
1278 "NVARCHAR": sqltypes.NVARCHAR,
1279 "NCHAR": sqltypes.NCHAR,
1280}
1281
1282
1283class SQLiteCompiler(compiler.SQLCompiler):
1284 extract_map = util.update_copy(
1285 compiler.SQLCompiler.extract_map,
1286 {
1287 "month": "%m",
1288 "day": "%d",
1289 "year": "%Y",
1290 "second": "%S",
1291 "hour": "%H",
1292 "doy": "%j",
1293 "minute": "%M",
1294 "epoch": "%s",
1295 "dow": "%w",
1296 "week": "%W",
1297 },
1298 )
1299
1300 def visit_truediv_binary(self, binary, operator, **kw):
1301 return (
1302 self.process(binary.left, **kw)
1303 + " / "
1304 + "(%s + 0.0)" % self.process(binary.right, **kw)
1305 )
1306
1307 def visit_now_func(self, fn, **kw):
1308 return "CURRENT_TIMESTAMP"
1309
1310 def visit_localtimestamp_func(self, func, **kw):
1311 return 'DATETIME(CURRENT_TIMESTAMP, "localtime")'
1312
1313 def visit_true(self, expr, **kw):
1314 return "1"
1315
1316 def visit_false(self, expr, **kw):
1317 return "0"
1318
1319 def visit_char_length_func(self, fn, **kw):
1320 return "length%s" % self.function_argspec(fn)
1321
1322 def visit_aggregate_strings_func(self, fn, **kw):
1323 return "group_concat%s" % self.function_argspec(fn)
1324
1325 def visit_cast(self, cast, **kwargs):
1326 if self.dialect.supports_cast:
1327 return super().visit_cast(cast, **kwargs)
1328 else:
1329 return self.process(cast.clause, **kwargs)
1330
1331 def visit_extract(self, extract, **kw):
1332 try:
1333 return "CAST(STRFTIME('%s', %s) AS INTEGER)" % (
1334 self.extract_map[extract.field],
1335 self.process(extract.expr, **kw),
1336 )
1337 except KeyError as err:
1338 raise exc.CompileError(
1339 "%s is not a valid extract argument." % extract.field
1340 ) from err
1341
1342 def returning_clause(
1343 self,
1344 stmt,
1345 returning_cols,
1346 *,
1347 populate_result_map,
1348 **kw,
1349 ):
1350 kw["include_table"] = False
1351 return super().returning_clause(
1352 stmt, returning_cols, populate_result_map=populate_result_map, **kw
1353 )
1354
1355 def limit_clause(self, select, **kw):
1356 text = ""
1357 if select._limit_clause is not None:
1358 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
1359 if select._offset_clause is not None:
1360 if select._limit_clause is None:
1361 text += "\n LIMIT " + self.process(sql.literal(-1))
1362 text += " OFFSET " + self.process(select._offset_clause, **kw)
1363 else:
1364 text += " OFFSET " + self.process(sql.literal(0), **kw)
1365 return text
1366
1367 def for_update_clause(self, select, **kw):
1368 # sqlite has no "FOR UPDATE" AFAICT
1369 return ""
1370
1371 def update_from_clause(
1372 self, update_stmt, from_table, extra_froms, from_hints, **kw
1373 ):
1374 kw["asfrom"] = True
1375 return "FROM " + ", ".join(
1376 t._compiler_dispatch(self, fromhints=from_hints, **kw)
1377 for t in extra_froms
1378 )
1379
1380 def visit_is_distinct_from_binary(self, binary, operator, **kw):
1381 return "%s IS NOT %s" % (
1382 self.process(binary.left),
1383 self.process(binary.right),
1384 )
1385
1386 def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
1387 return "%s IS %s" % (
1388 self.process(binary.left),
1389 self.process(binary.right),
1390 )
1391
1392 def visit_json_getitem_op_binary(self, binary, operator, **kw):
1393 if binary.type._type_affinity is sqltypes.JSON:
1394 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
1395 else:
1396 expr = "JSON_EXTRACT(%s, %s)"
1397
1398 return expr % (
1399 self.process(binary.left, **kw),
1400 self.process(binary.right, **kw),
1401 )
1402
1403 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
1404 if binary.type._type_affinity is sqltypes.JSON:
1405 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
1406 else:
1407 expr = "JSON_EXTRACT(%s, %s)"
1408
1409 return expr % (
1410 self.process(binary.left, **kw),
1411 self.process(binary.right, **kw),
1412 )
1413
1414 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
1415 # slightly old SQLite versions don't seem to be able to handle
1416 # the empty set impl
1417 return self.visit_empty_set_expr(type_)
1418
1419 def visit_empty_set_expr(self, element_types, **kw):
1420 return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % (
1421 ", ".join("1" for type_ in element_types or [INTEGER()]),
1422 ", ".join("1" for type_ in element_types or [INTEGER()]),
1423 )
1424
1425 def visit_regexp_match_op_binary(self, binary, operator, **kw):
1426 return self._generate_generic_binary(binary, " REGEXP ", **kw)
1427
1428 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
1429 return self._generate_generic_binary(binary, " NOT REGEXP ", **kw)
1430
1431 def _on_conflict_target(self, clause, **kw):
1432 if clause.constraint_target is not None:
1433 target_text = "(%s)" % clause.constraint_target
1434 elif clause.inferred_target_elements is not None:
1435 target_text = "(%s)" % ", ".join(
1436 (
1437 self.preparer.quote(c)
1438 if isinstance(c, str)
1439 else self.process(c, include_table=False, use_schema=False)
1440 )
1441 for c in clause.inferred_target_elements
1442 )
1443 if clause.inferred_target_whereclause is not None:
1444 target_text += " WHERE %s" % self.process(
1445 clause.inferred_target_whereclause,
1446 include_table=False,
1447 use_schema=False,
1448 literal_binds=True,
1449 )
1450
1451 else:
1452 target_text = ""
1453
1454 return target_text
1455
1456 def visit_on_conflict_do_nothing(self, on_conflict, **kw):
1457 target_text = self._on_conflict_target(on_conflict, **kw)
1458
1459 if target_text:
1460 return "ON CONFLICT %s DO NOTHING" % target_text
1461 else:
1462 return "ON CONFLICT DO NOTHING"
1463
1464 def visit_on_conflict_do_update(self, on_conflict, **kw):
1465 clause = on_conflict
1466
1467 target_text = self._on_conflict_target(on_conflict, **kw)
1468
1469 action_set_ops = []
1470
1471 set_parameters = dict(clause.update_values_to_set)
1472 # create a list of column assignment clauses as tuples
1473
1474 insert_statement = self.stack[-1]["selectable"]
1475 cols = insert_statement.table.c
1476 for c in cols:
1477 col_key = c.key
1478
1479 if col_key in set_parameters:
1480 value = set_parameters.pop(col_key)
1481 elif c in set_parameters:
1482 value = set_parameters.pop(c)
1483 else:
1484 continue
1485
1486 if coercions._is_literal(value):
1487 value = elements.BindParameter(None, value, type_=c.type)
1488
1489 else:
1490 if (
1491 isinstance(value, elements.BindParameter)
1492 and value.type._isnull
1493 ):
1494 value = value._clone()
1495 value.type = c.type
1496 value_text = self.process(value.self_group(), use_schema=False)
1497
1498 key_text = self.preparer.quote(c.name)
1499 action_set_ops.append("%s = %s" % (key_text, value_text))
1500
1501 # check for names that don't match columns
1502 if set_parameters:
1503 util.warn(
1504 "Additional column names not matching "
1505 "any column keys in table '%s': %s"
1506 % (
1507 self.current_executable.table.name,
1508 (", ".join("'%s'" % c for c in set_parameters)),
1509 )
1510 )
1511 for k, v in set_parameters.items():
1512 key_text = (
1513 self.preparer.quote(k)
1514 if isinstance(k, str)
1515 else self.process(k, use_schema=False)
1516 )
1517 value_text = self.process(
1518 coercions.expect(roles.ExpressionElementRole, v),
1519 use_schema=False,
1520 )
1521 action_set_ops.append("%s = %s" % (key_text, value_text))
1522
1523 action_text = ", ".join(action_set_ops)
1524 if clause.update_whereclause is not None:
1525 action_text += " WHERE %s" % self.process(
1526 clause.update_whereclause, include_table=True, use_schema=False
1527 )
1528
1529 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
1530
1531 def visit_bitwise_xor_op_binary(self, binary, operator, **kw):
1532 # sqlite has no xor. Use "a XOR b" = "(a | b) - (a & b)".
1533 kw["eager_grouping"] = True
1534 or_ = self._generate_generic_binary(binary, " | ", **kw)
1535 and_ = self._generate_generic_binary(binary, " & ", **kw)
1536 return f"({or_} - {and_})"
1537
1538
1539class SQLiteDDLCompiler(compiler.DDLCompiler):
1540 def get_column_specification(self, column, **kwargs):
1541 coltype = self.dialect.type_compiler_instance.process(
1542 column.type, type_expression=column
1543 )
1544 colspec = self.preparer.format_column(column) + " " + coltype
1545 default = self.get_column_default_string(column)
1546 if default is not None:
1547 if isinstance(column.server_default.arg, ColumnElement):
1548 default = "(" + default + ")"
1549 colspec += " DEFAULT " + default
1550
1551 if not column.nullable:
1552 colspec += " NOT NULL"
1553
1554 on_conflict_clause = column.dialect_options["sqlite"][
1555 "on_conflict_not_null"
1556 ]
1557 if on_conflict_clause is not None:
1558 colspec += " ON CONFLICT " + on_conflict_clause
1559
1560 if column.primary_key:
1561 if (
1562 column.autoincrement is True
1563 and len(column.table.primary_key.columns) != 1
1564 ):
1565 raise exc.CompileError(
1566 "SQLite does not support autoincrement for "
1567 "composite primary keys"
1568 )
1569
1570 if (
1571 column.table.dialect_options["sqlite"]["autoincrement"]
1572 and len(column.table.primary_key.columns) == 1
1573 and issubclass(column.type._type_affinity, sqltypes.Integer)
1574 and not column.foreign_keys
1575 ):
1576 colspec += " PRIMARY KEY"
1577
1578 on_conflict_clause = column.dialect_options["sqlite"][
1579 "on_conflict_primary_key"
1580 ]
1581 if on_conflict_clause is not None:
1582 colspec += " ON CONFLICT " + on_conflict_clause
1583
1584 colspec += " AUTOINCREMENT"
1585
1586 if column.computed is not None:
1587 colspec += " " + self.process(column.computed)
1588
1589 return colspec
1590
1591 def visit_primary_key_constraint(self, constraint, **kw):
1592 # for columns with sqlite_autoincrement=True,
1593 # the PRIMARY KEY constraint can only be inline
1594 # with the column itself.
1595 if len(constraint.columns) == 1:
1596 c = list(constraint)[0]
1597 if (
1598 c.primary_key
1599 and c.table.dialect_options["sqlite"]["autoincrement"]
1600 and issubclass(c.type._type_affinity, sqltypes.Integer)
1601 and not c.foreign_keys
1602 ):
1603 return None
1604
1605 text = super().visit_primary_key_constraint(constraint)
1606
1607 on_conflict_clause = constraint.dialect_options["sqlite"][
1608 "on_conflict"
1609 ]
1610 if on_conflict_clause is None and len(constraint.columns) == 1:
1611 on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][
1612 "on_conflict_primary_key"
1613 ]
1614
1615 if on_conflict_clause is not None:
1616 text += " ON CONFLICT " + on_conflict_clause
1617
1618 return text
1619
1620 def visit_unique_constraint(self, constraint, **kw):
1621 text = super().visit_unique_constraint(constraint)
1622
1623 on_conflict_clause = constraint.dialect_options["sqlite"][
1624 "on_conflict"
1625 ]
1626 if on_conflict_clause is None and len(constraint.columns) == 1:
1627 col1 = list(constraint)[0]
1628 if isinstance(col1, schema.SchemaItem):
1629 on_conflict_clause = list(constraint)[0].dialect_options[
1630 "sqlite"
1631 ]["on_conflict_unique"]
1632
1633 if on_conflict_clause is not None:
1634 text += " ON CONFLICT " + on_conflict_clause
1635
1636 return text
1637
1638 def visit_check_constraint(self, constraint, **kw):
1639 text = super().visit_check_constraint(constraint)
1640
1641 on_conflict_clause = constraint.dialect_options["sqlite"][
1642 "on_conflict"
1643 ]
1644
1645 if on_conflict_clause is not None:
1646 text += " ON CONFLICT " + on_conflict_clause
1647
1648 return text
1649
1650 def visit_column_check_constraint(self, constraint, **kw):
1651 text = super().visit_column_check_constraint(constraint)
1652
1653 if constraint.dialect_options["sqlite"]["on_conflict"] is not None:
1654 raise exc.CompileError(
1655 "SQLite does not support on conflict clause for "
1656 "column check constraint"
1657 )
1658
1659 return text
1660
1661 def visit_foreign_key_constraint(self, constraint, **kw):
1662 local_table = constraint.elements[0].parent.table
1663 remote_table = constraint.elements[0].column.table
1664
1665 if local_table.schema != remote_table.schema:
1666 return None
1667 else:
1668 return super().visit_foreign_key_constraint(constraint)
1669
1670 def define_constraint_remote_table(self, constraint, table, preparer):
1671 """Format the remote table clause of a CREATE CONSTRAINT clause."""
1672
1673 return preparer.format_table(table, use_schema=False)
1674
1675 def visit_create_index(
1676 self, create, include_schema=False, include_table_schema=True, **kw
1677 ):
1678 index = create.element
1679 self._verify_index_table(index)
1680 preparer = self.preparer
1681 text = "CREATE "
1682 if index.unique:
1683 text += "UNIQUE "
1684
1685 text += "INDEX "
1686
1687 if create.if_not_exists:
1688 text += "IF NOT EXISTS "
1689
1690 text += "%s ON %s (%s)" % (
1691 self._prepared_index_name(index, include_schema=True),
1692 preparer.format_table(index.table, use_schema=False),
1693 ", ".join(
1694 self.sql_compiler.process(
1695 expr, include_table=False, literal_binds=True
1696 )
1697 for expr in index.expressions
1698 ),
1699 )
1700
1701 whereclause = index.dialect_options["sqlite"]["where"]
1702 if whereclause is not None:
1703 where_compiled = self.sql_compiler.process(
1704 whereclause, include_table=False, literal_binds=True
1705 )
1706 text += " WHERE " + where_compiled
1707
1708 return text
1709
1710 def post_create_table(self, table):
1711 if table.dialect_options["sqlite"]["with_rowid"] is False:
1712 return "\n WITHOUT ROWID"
1713 return ""
1714
1715
1716class SQLiteTypeCompiler(compiler.GenericTypeCompiler):
1717 def visit_large_binary(self, type_, **kw):
1718 return self.visit_BLOB(type_)
1719
1720 def visit_DATETIME(self, type_, **kw):
1721 if (
1722 not isinstance(type_, _DateTimeMixin)
1723 or type_.format_is_text_affinity
1724 ):
1725 return super().visit_DATETIME(type_)
1726 else:
1727 return "DATETIME_CHAR"
1728
1729 def visit_DATE(self, type_, **kw):
1730 if (
1731 not isinstance(type_, _DateTimeMixin)
1732 or type_.format_is_text_affinity
1733 ):
1734 return super().visit_DATE(type_)
1735 else:
1736 return "DATE_CHAR"
1737
1738 def visit_TIME(self, type_, **kw):
1739 if (
1740 not isinstance(type_, _DateTimeMixin)
1741 or type_.format_is_text_affinity
1742 ):
1743 return super().visit_TIME(type_)
1744 else:
1745 return "TIME_CHAR"
1746
1747 def visit_JSON(self, type_, **kw):
1748 # note this name provides NUMERIC affinity, not TEXT.
1749 # should not be an issue unless the JSON value consists of a single
1750 # numeric value. JSONTEXT can be used if this case is required.
1751 return "JSON"
1752
1753
1754class SQLiteIdentifierPreparer(compiler.IdentifierPreparer):
1755 reserved_words = {
1756 "add",
1757 "after",
1758 "all",
1759 "alter",
1760 "analyze",
1761 "and",
1762 "as",
1763 "asc",
1764 "attach",
1765 "autoincrement",
1766 "before",
1767 "begin",
1768 "between",
1769 "by",
1770 "cascade",
1771 "case",
1772 "cast",
1773 "check",
1774 "collate",
1775 "column",
1776 "commit",
1777 "conflict",
1778 "constraint",
1779 "create",
1780 "cross",
1781 "current_date",
1782 "current_time",
1783 "current_timestamp",
1784 "database",
1785 "default",
1786 "deferrable",
1787 "deferred",
1788 "delete",
1789 "desc",
1790 "detach",
1791 "distinct",
1792 "drop",
1793 "each",
1794 "else",
1795 "end",
1796 "escape",
1797 "except",
1798 "exclusive",
1799 "exists",
1800 "explain",
1801 "false",
1802 "fail",
1803 "for",
1804 "foreign",
1805 "from",
1806 "full",
1807 "glob",
1808 "group",
1809 "having",
1810 "if",
1811 "ignore",
1812 "immediate",
1813 "in",
1814 "index",
1815 "indexed",
1816 "initially",
1817 "inner",
1818 "insert",
1819 "instead",
1820 "intersect",
1821 "into",
1822 "is",
1823 "isnull",
1824 "join",
1825 "key",
1826 "left",
1827 "like",
1828 "limit",
1829 "match",
1830 "natural",
1831 "not",
1832 "notnull",
1833 "null",
1834 "of",
1835 "offset",
1836 "on",
1837 "or",
1838 "order",
1839 "outer",
1840 "plan",
1841 "pragma",
1842 "primary",
1843 "query",
1844 "raise",
1845 "references",
1846 "reindex",
1847 "rename",
1848 "replace",
1849 "restrict",
1850 "right",
1851 "rollback",
1852 "row",
1853 "select",
1854 "set",
1855 "table",
1856 "temp",
1857 "temporary",
1858 "then",
1859 "to",
1860 "transaction",
1861 "trigger",
1862 "true",
1863 "union",
1864 "unique",
1865 "update",
1866 "using",
1867 "vacuum",
1868 "values",
1869 "view",
1870 "virtual",
1871 "when",
1872 "where",
1873 }
1874
1875
1876class SQLiteExecutionContext(default.DefaultExecutionContext):
1877 @util.memoized_property
1878 def _preserve_raw_colnames(self):
1879 return (
1880 not self.dialect._broken_dotted_colnames
1881 or self.execution_options.get("sqlite_raw_colnames", False)
1882 )
1883
1884 def _translate_colname(self, colname):
1885 # TODO: detect SQLite version 3.10.0 or greater;
1886 # see [ticket:3633]
1887
1888 # adjust for dotted column names. SQLite
1889 # in the case of UNION may store col names as
1890 # "tablename.colname", or if using an attached database,
1891 # "database.tablename.colname", in cursor.description
1892 if not self._preserve_raw_colnames and "." in colname:
1893 return colname.split(".")[-1], colname
1894 else:
1895 return colname, None
1896
1897
1898class SQLiteDialect(default.DefaultDialect):
1899 name = "sqlite"
1900 supports_alter = False
1901
1902 # SQlite supports "DEFAULT VALUES" but *does not* support
1903 # "VALUES (DEFAULT)"
1904 supports_default_values = True
1905 supports_default_metavalue = False
1906
1907 # sqlite issue:
1908 # https://github.com/python/cpython/issues/93421
1909 # note this parameter is no longer used by the ORM or default dialect
1910 # see #9414
1911 supports_sane_rowcount_returning = False
1912
1913 supports_empty_insert = False
1914 supports_cast = True
1915 supports_multivalues_insert = True
1916 use_insertmanyvalues = True
1917 tuple_in_values = True
1918 supports_statement_cache = True
1919 insert_null_pk_still_autoincrements = True
1920 insert_returning = True
1921 update_returning = True
1922 update_returning_multifrom = True
1923 delete_returning = True
1924 update_returning_multifrom = True
1925
1926 supports_default_metavalue = True
1927 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
1928
1929 default_metavalue_token = "NULL"
1930 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
1931 parenthesis."""
1932
1933 default_paramstyle = "qmark"
1934 execution_ctx_cls = SQLiteExecutionContext
1935 statement_compiler = SQLiteCompiler
1936 ddl_compiler = SQLiteDDLCompiler
1937 type_compiler_cls = SQLiteTypeCompiler
1938 preparer = SQLiteIdentifierPreparer
1939 ischema_names = ischema_names
1940 colspecs = colspecs
1941
1942 construct_arguments = [
1943 (
1944 sa_schema.Table,
1945 {
1946 "autoincrement": False,
1947 "with_rowid": True,
1948 },
1949 ),
1950 (sa_schema.Index, {"where": None}),
1951 (
1952 sa_schema.Column,
1953 {
1954 "on_conflict_primary_key": None,
1955 "on_conflict_not_null": None,
1956 "on_conflict_unique": None,
1957 },
1958 ),
1959 (sa_schema.Constraint, {"on_conflict": None}),
1960 ]
1961
1962 _broken_fk_pragma_quotes = False
1963 _broken_dotted_colnames = False
1964
1965 @util.deprecated_params(
1966 _json_serializer=(
1967 "1.3.7",
1968 "The _json_serializer argument to the SQLite dialect has "
1969 "been renamed to the correct name of json_serializer. The old "
1970 "argument name will be removed in a future release.",
1971 ),
1972 _json_deserializer=(
1973 "1.3.7",
1974 "The _json_deserializer argument to the SQLite dialect has "
1975 "been renamed to the correct name of json_deserializer. The old "
1976 "argument name will be removed in a future release.",
1977 ),
1978 )
1979 def __init__(
1980 self,
1981 native_datetime=False,
1982 json_serializer=None,
1983 json_deserializer=None,
1984 _json_serializer=None,
1985 _json_deserializer=None,
1986 **kwargs,
1987 ):
1988 default.DefaultDialect.__init__(self, **kwargs)
1989
1990 if _json_serializer:
1991 json_serializer = _json_serializer
1992 if _json_deserializer:
1993 json_deserializer = _json_deserializer
1994 self._json_serializer = json_serializer
1995 self._json_deserializer = json_deserializer
1996
1997 # this flag used by pysqlite dialect, and perhaps others in the
1998 # future, to indicate the driver is handling date/timestamp
1999 # conversions (and perhaps datetime/time as well on some hypothetical
2000 # driver ?)
2001 self.native_datetime = native_datetime
2002
2003 if self.dbapi is not None:
2004 if self.dbapi.sqlite_version_info < (3, 7, 16):
2005 util.warn(
2006 "SQLite version %s is older than 3.7.16, and will not "
2007 "support right nested joins, as are sometimes used in "
2008 "more complex ORM scenarios. SQLAlchemy 1.4 and above "
2009 "no longer tries to rewrite these joins."
2010 % (self.dbapi.sqlite_version_info,)
2011 )
2012
2013 # NOTE: python 3.7 on fedora for me has SQLite 3.34.1. These
2014 # version checks are getting very stale.
2015 self._broken_dotted_colnames = self.dbapi.sqlite_version_info < (
2016 3,
2017 10,
2018 0,
2019 )
2020 self.supports_default_values = self.dbapi.sqlite_version_info >= (
2021 3,
2022 3,
2023 8,
2024 )
2025 self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3)
2026 self.supports_multivalues_insert = (
2027 # https://www.sqlite.org/releaselog/3_7_11.html
2028 self.dbapi.sqlite_version_info
2029 >= (3, 7, 11)
2030 )
2031 # see https://www.sqlalchemy.org/trac/ticket/2568
2032 # as well as https://www.sqlite.org/src/info/600482d161
2033 self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < (
2034 3,
2035 6,
2036 14,
2037 )
2038
2039 if self.dbapi.sqlite_version_info < (3, 35) or util.pypy:
2040 self.update_returning = self.delete_returning = (
2041 self.insert_returning
2042 ) = False
2043
2044 if self.dbapi.sqlite_version_info < (3, 32, 0):
2045 # https://www.sqlite.org/limits.html
2046 self.insertmanyvalues_max_parameters = 999
2047
2048 _isolation_lookup = util.immutabledict(
2049 {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0}
2050 )
2051
2052 def get_isolation_level_values(self, dbapi_connection):
2053 return list(self._isolation_lookup)
2054
2055 def set_isolation_level(self, dbapi_connection, level):
2056 isolation_level = self._isolation_lookup[level]
2057
2058 cursor = dbapi_connection.cursor()
2059 cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}")
2060 cursor.close()
2061
2062 def get_isolation_level(self, dbapi_connection):
2063 cursor = dbapi_connection.cursor()
2064 cursor.execute("PRAGMA read_uncommitted")
2065 res = cursor.fetchone()
2066 if res:
2067 value = res[0]
2068 else:
2069 # https://www.sqlite.org/changes.html#version_3_3_3
2070 # "Optional READ UNCOMMITTED isolation (instead of the
2071 # default isolation level of SERIALIZABLE) and
2072 # table level locking when database connections
2073 # share a common cache.""
2074 # pre-SQLite 3.3.0 default to 0
2075 value = 0
2076 cursor.close()
2077 if value == 0:
2078 return "SERIALIZABLE"
2079 elif value == 1:
2080 return "READ UNCOMMITTED"
2081 else:
2082 assert False, "Unknown isolation level %s" % value
2083
2084 @reflection.cache
2085 def get_schema_names(self, connection, **kw):
2086 s = "PRAGMA database_list"
2087 dl = connection.exec_driver_sql(s)
2088
2089 return [db[1] for db in dl if db[1] != "temp"]
2090
2091 def _format_schema(self, schema, table_name):
2092 if schema is not None:
2093 qschema = self.identifier_preparer.quote_identifier(schema)
2094 name = f"{qschema}.{table_name}"
2095 else:
2096 name = table_name
2097 return name
2098
2099 def _sqlite_main_query(
2100 self,
2101 table: str,
2102 type_: str,
2103 schema: Optional[str],
2104 sqlite_include_internal: bool,
2105 ):
2106 main = self._format_schema(schema, table)
2107 if not sqlite_include_internal:
2108 filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'"
2109 else:
2110 filter_table = ""
2111 query = (
2112 f"SELECT name FROM {main} "
2113 f"WHERE type='{type_}'{filter_table} "
2114 "ORDER BY name"
2115 )
2116 return query
2117
2118 @reflection.cache
2119 def get_table_names(
2120 self, connection, schema=None, sqlite_include_internal=False, **kw
2121 ):
2122 query = self._sqlite_main_query(
2123 "sqlite_master", "table", schema, sqlite_include_internal
2124 )
2125 names = connection.exec_driver_sql(query).scalars().all()
2126 return names
2127
2128 @reflection.cache
2129 def get_temp_table_names(
2130 self, connection, sqlite_include_internal=False, **kw
2131 ):
2132 query = self._sqlite_main_query(
2133 "sqlite_temp_master", "table", None, sqlite_include_internal
2134 )
2135 names = connection.exec_driver_sql(query).scalars().all()
2136 return names
2137
2138 @reflection.cache
2139 def get_temp_view_names(
2140 self, connection, sqlite_include_internal=False, **kw
2141 ):
2142 query = self._sqlite_main_query(
2143 "sqlite_temp_master", "view", None, sqlite_include_internal
2144 )
2145 names = connection.exec_driver_sql(query).scalars().all()
2146 return names
2147
2148 @reflection.cache
2149 def has_table(self, connection, table_name, schema=None, **kw):
2150 self._ensure_has_table_connection(connection)
2151
2152 if schema is not None and schema not in self.get_schema_names(
2153 connection, **kw
2154 ):
2155 return False
2156
2157 info = self._get_table_pragma(
2158 connection, "table_info", table_name, schema=schema
2159 )
2160 return bool(info)
2161
2162 def _get_default_schema_name(self, connection):
2163 return "main"
2164
2165 @reflection.cache
2166 def get_view_names(
2167 self, connection, schema=None, sqlite_include_internal=False, **kw
2168 ):
2169 query = self._sqlite_main_query(
2170 "sqlite_master", "view", schema, sqlite_include_internal
2171 )
2172 names = connection.exec_driver_sql(query).scalars().all()
2173 return names
2174
2175 @reflection.cache
2176 def get_view_definition(self, connection, view_name, schema=None, **kw):
2177 if schema is not None:
2178 qschema = self.identifier_preparer.quote_identifier(schema)
2179 master = f"{qschema}.sqlite_master"
2180 s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % (
2181 master,
2182 )
2183 rs = connection.exec_driver_sql(s, (view_name,))
2184 else:
2185 try:
2186 s = (
2187 "SELECT sql FROM "
2188 " (SELECT * FROM sqlite_master UNION ALL "
2189 " SELECT * FROM sqlite_temp_master) "
2190 "WHERE name = ? "
2191 "AND type='view'"
2192 )
2193 rs = connection.exec_driver_sql(s, (view_name,))
2194 except exc.DBAPIError:
2195 s = (
2196 "SELECT sql FROM sqlite_master WHERE name = ? "
2197 "AND type='view'"
2198 )
2199 rs = connection.exec_driver_sql(s, (view_name,))
2200
2201 result = rs.fetchall()
2202 if result:
2203 return result[0].sql
2204 else:
2205 raise exc.NoSuchTableError(
2206 f"{schema}.{view_name}" if schema else view_name
2207 )
2208
2209 @reflection.cache
2210 def get_columns(self, connection, table_name, schema=None, **kw):
2211 pragma = "table_info"
2212 # computed columns are threaded as hidden, they require table_xinfo
2213 if self.server_version_info >= (3, 31):
2214 pragma = "table_xinfo"
2215 info = self._get_table_pragma(
2216 connection, pragma, table_name, schema=schema
2217 )
2218 columns = []
2219 tablesql = None
2220 for row in info:
2221 name = row[1]
2222 type_ = row[2].upper()
2223 nullable = not row[3]
2224 default = row[4]
2225 primary_key = row[5]
2226 hidden = row[6] if pragma == "table_xinfo" else 0
2227
2228 # hidden has value 0 for normal columns, 1 for hidden columns,
2229 # 2 for computed virtual columns and 3 for computed stored columns
2230 # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b
2231 if hidden == 1:
2232 continue
2233
2234 generated = bool(hidden)
2235 persisted = hidden == 3
2236
2237 if tablesql is None and generated:
2238 tablesql = self._get_table_sql(
2239 connection, table_name, schema, **kw
2240 )
2241 # remove create table
2242 match = re.match(
2243 r"create table .*?\((.*)\)$",
2244 tablesql.strip(),
2245 re.DOTALL | re.IGNORECASE,
2246 )
2247 assert match, f"create table not found in {tablesql}"
2248 tablesql = match.group(1).strip()
2249
2250 columns.append(
2251 self._get_column_info(
2252 name,
2253 type_,
2254 nullable,
2255 default,
2256 primary_key,
2257 generated,
2258 persisted,
2259 tablesql,
2260 )
2261 )
2262 if columns:
2263 return columns
2264 elif not self.has_table(connection, table_name, schema):
2265 raise exc.NoSuchTableError(
2266 f"{schema}.{table_name}" if schema else table_name
2267 )
2268 else:
2269 return ReflectionDefaults.columns()
2270
2271 def _get_column_info(
2272 self,
2273 name,
2274 type_,
2275 nullable,
2276 default,
2277 primary_key,
2278 generated,
2279 persisted,
2280 tablesql,
2281 ):
2282 if generated:
2283 # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)"
2284 # somehow is "INTEGER GENERATED ALWAYS"
2285 type_ = re.sub("generated", "", type_, flags=re.IGNORECASE)
2286 type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip()
2287
2288 coltype = self._resolve_type_affinity(type_)
2289
2290 if default is not None:
2291 default = str(default)
2292
2293 colspec = {
2294 "name": name,
2295 "type": coltype,
2296 "nullable": nullable,
2297 "default": default,
2298 "primary_key": primary_key,
2299 }
2300 if generated:
2301 sqltext = ""
2302 if tablesql:
2303 pattern = (
2304 r"[^,]*\s+GENERATED\s+ALWAYS\s+AS"
2305 r"\s+\((.*)\)\s*(?:virtual|stored)?"
2306 )
2307 match = re.search(
2308 re.escape(name) + pattern, tablesql, re.IGNORECASE
2309 )
2310 if match:
2311 sqltext = match.group(1)
2312 colspec["computed"] = {"sqltext": sqltext, "persisted": persisted}
2313 return colspec
2314
2315 def _resolve_type_affinity(self, type_):
2316 """Return a data type from a reflected column, using affinity rules.
2317
2318 SQLite's goal for universal compatibility introduces some complexity
2319 during reflection, as a column's defined type might not actually be a
2320 type that SQLite understands - or indeed, my not be defined *at all*.
2321 Internally, SQLite handles this with a 'data type affinity' for each
2322 column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER',
2323 'REAL', or 'NONE' (raw bits). The algorithm that determines this is
2324 listed in https://www.sqlite.org/datatype3.html section 2.1.
2325
2326 This method allows SQLAlchemy to support that algorithm, while still
2327 providing access to smarter reflection utilities by recognizing
2328 column definitions that SQLite only supports through affinity (like
2329 DATE and DOUBLE).
2330
2331 """
2332 match = re.match(r"([\w ]+)(\(.*?\))?", type_)
2333 if match:
2334 coltype = match.group(1)
2335 args = match.group(2)
2336 else:
2337 coltype = ""
2338 args = ""
2339
2340 if coltype in self.ischema_names:
2341 coltype = self.ischema_names[coltype]
2342 elif "INT" in coltype:
2343 coltype = sqltypes.INTEGER
2344 elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype:
2345 coltype = sqltypes.TEXT
2346 elif "BLOB" in coltype or not coltype:
2347 coltype = sqltypes.NullType
2348 elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype:
2349 coltype = sqltypes.REAL
2350 else:
2351 coltype = sqltypes.NUMERIC
2352
2353 if args is not None:
2354 args = re.findall(r"(\d+)", args)
2355 try:
2356 coltype = coltype(*[int(a) for a in args])
2357 except TypeError:
2358 util.warn(
2359 "Could not instantiate type %s with "
2360 "reflected arguments %s; using no arguments."
2361 % (coltype, args)
2362 )
2363 coltype = coltype()
2364 else:
2365 coltype = coltype()
2366
2367 return coltype
2368
2369 @reflection.cache
2370 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
2371 constraint_name = None
2372 table_data = self._get_table_sql(connection, table_name, schema=schema)
2373 if table_data:
2374 PK_PATTERN = r"CONSTRAINT (\w+) PRIMARY KEY"
2375 result = re.search(PK_PATTERN, table_data, re.I)
2376 constraint_name = result.group(1) if result else None
2377
2378 cols = self.get_columns(connection, table_name, schema, **kw)
2379 # consider only pk columns. This also avoids sorting the cached
2380 # value returned by get_columns
2381 cols = [col for col in cols if col.get("primary_key", 0) > 0]
2382 cols.sort(key=lambda col: col.get("primary_key"))
2383 pkeys = [col["name"] for col in cols]
2384
2385 if pkeys:
2386 return {"constrained_columns": pkeys, "name": constraint_name}
2387 else:
2388 return ReflectionDefaults.pk_constraint()
2389
2390 @reflection.cache
2391 def get_foreign_keys(self, connection, table_name, schema=None, **kw):
2392 # sqlite makes this *extremely difficult*.
2393 # First, use the pragma to get the actual FKs.
2394 pragma_fks = self._get_table_pragma(
2395 connection, "foreign_key_list", table_name, schema=schema
2396 )
2397
2398 fks = {}
2399
2400 for row in pragma_fks:
2401 (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4])
2402
2403 if not rcol:
2404 # no referred column, which means it was not named in the
2405 # original DDL. The referred columns of the foreign key
2406 # constraint are therefore the primary key of the referred
2407 # table.
2408 try:
2409 referred_pk = self.get_pk_constraint(
2410 connection, rtbl, schema=schema, **kw
2411 )
2412 referred_columns = referred_pk["constrained_columns"]
2413 except exc.NoSuchTableError:
2414 # ignore not existing parents
2415 referred_columns = []
2416 else:
2417 # note we use this list only if this is the first column
2418 # in the constraint. for subsequent columns we ignore the
2419 # list and append "rcol" if present.
2420 referred_columns = []
2421
2422 if self._broken_fk_pragma_quotes:
2423 rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl)
2424
2425 if numerical_id in fks:
2426 fk = fks[numerical_id]
2427 else:
2428 fk = fks[numerical_id] = {
2429 "name": None,
2430 "constrained_columns": [],
2431 "referred_schema": schema,
2432 "referred_table": rtbl,
2433 "referred_columns": referred_columns,
2434 "options": {},
2435 }
2436 fks[numerical_id] = fk
2437
2438 fk["constrained_columns"].append(lcol)
2439
2440 if rcol:
2441 fk["referred_columns"].append(rcol)
2442
2443 def fk_sig(constrained_columns, referred_table, referred_columns):
2444 return (
2445 tuple(constrained_columns)
2446 + (referred_table,)
2447 + tuple(referred_columns)
2448 )
2449
2450 # then, parse the actual SQL and attempt to find DDL that matches
2451 # the names as well. SQLite saves the DDL in whatever format
2452 # it was typed in as, so need to be liberal here.
2453
2454 keys_by_signature = {
2455 fk_sig(
2456 fk["constrained_columns"],
2457 fk["referred_table"],
2458 fk["referred_columns"],
2459 ): fk
2460 for fk in fks.values()
2461 }
2462
2463 table_data = self._get_table_sql(connection, table_name, schema=schema)
2464
2465 def parse_fks():
2466 if table_data is None:
2467 # system tables, etc.
2468 return
2469
2470 # note that we already have the FKs from PRAGMA above. This whole
2471 # regexp thing is trying to locate additional detail about the
2472 # FKs, namely the name of the constraint and other options.
2473 # so parsing the columns is really about matching it up to what
2474 # we already have.
2475 FK_PATTERN = (
2476 r"(?:CONSTRAINT (\w+) +)?"
2477 r"FOREIGN KEY *\( *(.+?) *\) +"
2478 r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *' # noqa: E501
2479 r"((?:ON (?:DELETE|UPDATE) "
2480 r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)"
2481 r"((?:NOT +)?DEFERRABLE)?"
2482 r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?"
2483 )
2484 for match in re.finditer(FK_PATTERN, table_data, re.I):
2485 (
2486 constraint_name,
2487 constrained_columns,
2488 referred_quoted_name,
2489 referred_name,
2490 referred_columns,
2491 onupdatedelete,
2492 deferrable,
2493 initially,
2494 ) = match.group(1, 2, 3, 4, 5, 6, 7, 8)
2495 constrained_columns = list(
2496 self._find_cols_in_sig(constrained_columns)
2497 )
2498 if not referred_columns:
2499 referred_columns = constrained_columns
2500 else:
2501 referred_columns = list(
2502 self._find_cols_in_sig(referred_columns)
2503 )
2504 referred_name = referred_quoted_name or referred_name
2505 options = {}
2506
2507 for token in re.split(r" *\bON\b *", onupdatedelete.upper()):
2508 if token.startswith("DELETE"):
2509 ondelete = token[6:].strip()
2510 if ondelete and ondelete != "NO ACTION":
2511 options["ondelete"] = ondelete
2512 elif token.startswith("UPDATE"):
2513 onupdate = token[6:].strip()
2514 if onupdate and onupdate != "NO ACTION":
2515 options["onupdate"] = onupdate
2516
2517 if deferrable:
2518 options["deferrable"] = "NOT" not in deferrable.upper()
2519 if initially:
2520 options["initially"] = initially.upper()
2521
2522 yield (
2523 constraint_name,
2524 constrained_columns,
2525 referred_name,
2526 referred_columns,
2527 options,
2528 )
2529
2530 fkeys = []
2531
2532 for (
2533 constraint_name,
2534 constrained_columns,
2535 referred_name,
2536 referred_columns,
2537 options,
2538 ) in parse_fks():
2539 sig = fk_sig(constrained_columns, referred_name, referred_columns)
2540 if sig not in keys_by_signature:
2541 util.warn(
2542 "WARNING: SQL-parsed foreign key constraint "
2543 "'%s' could not be located in PRAGMA "
2544 "foreign_keys for table %s" % (sig, table_name)
2545 )
2546 continue
2547 key = keys_by_signature.pop(sig)
2548 key["name"] = constraint_name
2549 key["options"] = options
2550 fkeys.append(key)
2551 # assume the remainders are the unnamed, inline constraints, just
2552 # use them as is as it's extremely difficult to parse inline
2553 # constraints
2554 fkeys.extend(keys_by_signature.values())
2555 if fkeys:
2556 return fkeys
2557 else:
2558 return ReflectionDefaults.foreign_keys()
2559
2560 def _find_cols_in_sig(self, sig):
2561 for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I):
2562 yield match.group(1) or match.group(2)
2563
2564 @reflection.cache
2565 def get_unique_constraints(
2566 self, connection, table_name, schema=None, **kw
2567 ):
2568 auto_index_by_sig = {}
2569 for idx in self.get_indexes(
2570 connection,
2571 table_name,
2572 schema=schema,
2573 include_auto_indexes=True,
2574 **kw,
2575 ):
2576 if not idx["name"].startswith("sqlite_autoindex"):
2577 continue
2578 sig = tuple(idx["column_names"])
2579 auto_index_by_sig[sig] = idx
2580
2581 table_data = self._get_table_sql(
2582 connection, table_name, schema=schema, **kw
2583 )
2584 unique_constraints = []
2585
2586 def parse_uqs():
2587 if table_data is None:
2588 return
2589 UNIQUE_PATTERN = r'(?:CONSTRAINT "?(.+?)"? +)?UNIQUE *\((.+?)\)'
2590 INLINE_UNIQUE_PATTERN = (
2591 r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?)[\t ]'
2592 r"+[a-z0-9_ ]+?[\t ]+UNIQUE"
2593 )
2594
2595 for match in re.finditer(UNIQUE_PATTERN, table_data, re.I):
2596 name, cols = match.group(1, 2)
2597 yield name, list(self._find_cols_in_sig(cols))
2598
2599 # we need to match inlines as well, as we seek to differentiate
2600 # a UNIQUE constraint from a UNIQUE INDEX, even though these
2601 # are kind of the same thing :)
2602 for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I):
2603 cols = list(
2604 self._find_cols_in_sig(match.group(1) or match.group(2))
2605 )
2606 yield None, cols
2607
2608 for name, cols in parse_uqs():
2609 sig = tuple(cols)
2610 if sig in auto_index_by_sig:
2611 auto_index_by_sig.pop(sig)
2612 parsed_constraint = {"name": name, "column_names": cols}
2613 unique_constraints.append(parsed_constraint)
2614 # NOTE: auto_index_by_sig might not be empty here,
2615 # the PRIMARY KEY may have an entry.
2616 if unique_constraints:
2617 return unique_constraints
2618 else:
2619 return ReflectionDefaults.unique_constraints()
2620
2621 @reflection.cache
2622 def get_check_constraints(self, connection, table_name, schema=None, **kw):
2623 table_data = self._get_table_sql(
2624 connection, table_name, schema=schema, **kw
2625 )
2626
2627 # NOTE NOTE NOTE
2628 # DO NOT CHANGE THIS REGULAR EXPRESSION. There is no known way
2629 # to parse CHECK constraints that contain newlines themselves using
2630 # regular expressions, and the approach here relies upon each
2631 # individual
2632 # CHECK constraint being on a single line by itself. This
2633 # necessarily makes assumptions as to how the CREATE TABLE
2634 # was emitted. A more comprehensive DDL parsing solution would be
2635 # needed to improve upon the current situation. See #11840 for
2636 # background
2637 CHECK_PATTERN = r"(?:CONSTRAINT (.+) +)?CHECK *\( *(.+) *\),? *"
2638 cks = []
2639
2640 for match in re.finditer(CHECK_PATTERN, table_data or "", re.I):
2641
2642 name = match.group(1)
2643
2644 if name:
2645 name = re.sub(r'^"|"$', "", name)
2646
2647 cks.append({"sqltext": match.group(2), "name": name})
2648 cks.sort(key=lambda d: d["name"] or "~") # sort None as last
2649 if cks:
2650 return cks
2651 else:
2652 return ReflectionDefaults.check_constraints()
2653
2654 @reflection.cache
2655 def get_indexes(self, connection, table_name, schema=None, **kw):
2656 pragma_indexes = self._get_table_pragma(
2657 connection, "index_list", table_name, schema=schema
2658 )
2659 indexes = []
2660
2661 # regular expression to extract the filter predicate of a partial
2662 # index. this could fail to extract the predicate correctly on
2663 # indexes created like
2664 # CREATE INDEX i ON t (col || ') where') WHERE col <> ''
2665 # but as this function does not support expression-based indexes
2666 # this case does not occur.
2667 partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE)
2668
2669 if schema:
2670 schema_expr = "%s." % self.identifier_preparer.quote_identifier(
2671 schema
2672 )
2673 else:
2674 schema_expr = ""
2675
2676 include_auto_indexes = kw.pop("include_auto_indexes", False)
2677 for row in pragma_indexes:
2678 # ignore implicit primary key index.
2679 # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html
2680 if not include_auto_indexes and row[1].startswith(
2681 "sqlite_autoindex"
2682 ):
2683 continue
2684 indexes.append(
2685 dict(
2686 name=row[1],
2687 column_names=[],
2688 unique=row[2],
2689 dialect_options={},
2690 )
2691 )
2692
2693 # check partial indexes
2694 if len(row) >= 5 and row[4]:
2695 s = (
2696 "SELECT sql FROM %(schema)ssqlite_master "
2697 "WHERE name = ? "
2698 "AND type = 'index'" % {"schema": schema_expr}
2699 )
2700 rs = connection.exec_driver_sql(s, (row[1],))
2701 index_sql = rs.scalar()
2702 predicate_match = partial_pred_re.search(index_sql)
2703 if predicate_match is None:
2704 # unless the regex is broken this case shouldn't happen
2705 # because we know this is a partial index, so the
2706 # definition sql should match the regex
2707 util.warn(
2708 "Failed to look up filter predicate of "
2709 "partial index %s" % row[1]
2710 )
2711 else:
2712 predicate = predicate_match.group(1)
2713 indexes[-1]["dialect_options"]["sqlite_where"] = text(
2714 predicate
2715 )
2716
2717 # loop thru unique indexes to get the column names.
2718 for idx in list(indexes):
2719 pragma_index = self._get_table_pragma(
2720 connection, "index_info", idx["name"], schema=schema
2721 )
2722
2723 for row in pragma_index:
2724 if row[2] is None:
2725 util.warn(
2726 "Skipped unsupported reflection of "
2727 "expression-based index %s" % idx["name"]
2728 )
2729 indexes.remove(idx)
2730 break
2731 else:
2732 idx["column_names"].append(row[2])
2733
2734 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last
2735 if indexes:
2736 return indexes
2737 elif not self.has_table(connection, table_name, schema):
2738 raise exc.NoSuchTableError(
2739 f"{schema}.{table_name}" if schema else table_name
2740 )
2741 else:
2742 return ReflectionDefaults.indexes()
2743
2744 def _is_sys_table(self, table_name):
2745 return table_name in {
2746 "sqlite_schema",
2747 "sqlite_master",
2748 "sqlite_temp_schema",
2749 "sqlite_temp_master",
2750 }
2751
2752 @reflection.cache
2753 def _get_table_sql(self, connection, table_name, schema=None, **kw):
2754 if schema:
2755 schema_expr = "%s." % (
2756 self.identifier_preparer.quote_identifier(schema)
2757 )
2758 else:
2759 schema_expr = ""
2760 try:
2761 s = (
2762 "SELECT sql FROM "
2763 " (SELECT * FROM %(schema)ssqlite_master UNION ALL "
2764 " SELECT * FROM %(schema)ssqlite_temp_master) "
2765 "WHERE name = ? "
2766 "AND type in ('table', 'view')" % {"schema": schema_expr}
2767 )
2768 rs = connection.exec_driver_sql(s, (table_name,))
2769 except exc.DBAPIError:
2770 s = (
2771 "SELECT sql FROM %(schema)ssqlite_master "
2772 "WHERE name = ? "
2773 "AND type in ('table', 'view')" % {"schema": schema_expr}
2774 )
2775 rs = connection.exec_driver_sql(s, (table_name,))
2776 value = rs.scalar()
2777 if value is None and not self._is_sys_table(table_name):
2778 raise exc.NoSuchTableError(f"{schema_expr}{table_name}")
2779 return value
2780
2781 def _get_table_pragma(self, connection, pragma, table_name, schema=None):
2782 quote = self.identifier_preparer.quote_identifier
2783 if schema is not None:
2784 statements = [f"PRAGMA {quote(schema)}."]
2785 else:
2786 # because PRAGMA looks in all attached databases if no schema
2787 # given, need to specify "main" schema, however since we want
2788 # 'temp' tables in the same namespace as 'main', need to run
2789 # the PRAGMA twice
2790 statements = ["PRAGMA main.", "PRAGMA temp."]
2791
2792 qtable = quote(table_name)
2793 for statement in statements:
2794 statement = f"{statement}{pragma}({qtable})"
2795 cursor = connection.exec_driver_sql(statement)
2796 if not cursor._soft_closed:
2797 # work around SQLite issue whereby cursor.description
2798 # is blank when PRAGMA returns no rows:
2799 # https://www.sqlite.org/cvstrac/tktview?tn=1884
2800 result = cursor.fetchall()
2801 else:
2802 result = []
2803 if result:
2804 return result
2805 else:
2806 return []