1# dialects/sqlite/base.py 
    2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: https://www.opensource.org/licenses/mit-license.php 
    7# mypy: ignore-errors 
    8 
    9 
    10r''' 
    11.. dialect:: sqlite 
    12    :name: SQLite 
    13    :normal_support: 3.12+ 
    14    :best_effort: 3.7.16+ 
    15 
    16.. _sqlite_datetime: 
    17 
    18Date and Time Types 
    19------------------- 
    20 
    21SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does 
    22not provide out of the box functionality for translating values between Python 
    23`datetime` objects and a SQLite-supported format. SQLAlchemy's own 
    24:class:`~sqlalchemy.types.DateTime` and related types provide date formatting 
    25and parsing functionality when SQLite is used. The implementation classes are 
    26:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`. 
    27These types represent dates and times as ISO formatted strings, which also 
    28nicely support ordering. There's no reliance on typical "libc" internals for 
    29these functions so historical dates are fully supported. 
    30 
    31Ensuring Text affinity 
    32^^^^^^^^^^^^^^^^^^^^^^ 
    33 
    34The DDL rendered for these types is the standard ``DATE``, ``TIME`` 
    35and ``DATETIME`` indicators.    However, custom storage formats can also be 
    36applied to these types.   When the 
    37storage format is detected as containing no alpha characters, the DDL for 
    38these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``, 
    39so that the column continues to have textual affinity. 
    40 
    41.. seealso:: 
    42 
    43    `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ - 
    44    in the SQLite documentation 
    45 
    46.. _sqlite_autoincrement: 
    47 
    48SQLite Auto Incrementing Behavior 
    49---------------------------------- 
    50 
    51Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html 
    52 
    53Key concepts: 
    54 
    55* SQLite has an implicit "auto increment" feature that takes place for any 
    56  non-composite primary-key column that is specifically created using 
    57  "INTEGER PRIMARY KEY" for the type + primary key. 
    58 
    59* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not** 
    60  equivalent to the implicit autoincrement feature; this keyword is not 
    61  recommended for general use.  SQLAlchemy does not render this keyword 
    62  unless a special SQLite-specific directive is used (see below).  However, 
    63  it still requires that the column's type is named "INTEGER". 
    64 
    65Using the AUTOINCREMENT Keyword 
    66^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 
    67 
    68To specifically render the AUTOINCREMENT keyword on the primary key column 
    69when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table 
    70construct:: 
    71 
    72    Table( 
    73        "sometable", 
    74        metadata, 
    75        Column("id", Integer, primary_key=True), 
    76        sqlite_autoincrement=True, 
    77    ) 
    78 
    79Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER 
    80^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 
    81 
    82SQLite's typing model is based on naming conventions.  Among other things, this 
    83means that any type name which contains the substring ``"INT"`` will be 
    84determined to be of "integer affinity".  A type named ``"BIGINT"``, 
    85``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be 
    86of "integer" affinity.  However, **the SQLite autoincrement feature, whether 
    87implicitly or explicitly enabled, requires that the name of the column's type 
    88is exactly the string "INTEGER"**.  Therefore, if an application uses a type 
    89like :class:`.BigInteger` for a primary key, on SQLite this type will need to 
    90be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE 
    91TABLE`` statement in order for the autoincrement behavior to be available. 
    92 
    93One approach to achieve this is to use :class:`.Integer` on SQLite 
    94only using :meth:`.TypeEngine.with_variant`:: 
    95 
    96    table = Table( 
    97        "my_table", 
    98        metadata, 
    99        Column( 
    100            "id", 
    101            BigInteger().with_variant(Integer, "sqlite"), 
    102            primary_key=True, 
    103        ), 
    104    ) 
    105 
    106Another is to use a subclass of :class:`.BigInteger` that overrides its DDL 
    107name to be ``INTEGER`` when compiled against SQLite:: 
    108 
    109    from sqlalchemy import BigInteger 
    110    from sqlalchemy.ext.compiler import compiles 
    111 
    112 
    113    class SLBigInteger(BigInteger): 
    114        pass 
    115 
    116 
    117    @compiles(SLBigInteger, "sqlite") 
    118    def bi_c(element, compiler, **kw): 
    119        return "INTEGER" 
    120 
    121 
    122    @compiles(SLBigInteger) 
    123    def bi_c(element, compiler, **kw): 
    124        return compiler.visit_BIGINT(element, **kw) 
    125 
    126 
    127    table = Table( 
    128        "my_table", metadata, Column("id", SLBigInteger(), primary_key=True) 
    129    ) 
    130 
    131.. seealso:: 
    132 
    133    :meth:`.TypeEngine.with_variant` 
    134 
    135    :ref:`sqlalchemy.ext.compiler_toplevel` 
    136 
    137    `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_ 
    138 
    139.. _sqlite_transactions: 
    140 
    141Transactions with SQLite and the sqlite3 driver 
    142----------------------------------------------- 
    143 
    144As a file-based database, SQLite's approach to transactions differs from 
    145traditional databases in many ways.  Additionally, the ``sqlite3`` driver 
    146standard with Python (as well as the async version ``aiosqlite`` which builds 
    147on top of it) has several quirks, workarounds, and API features in the 
    148area of transaction control, all of which generally need to be addressed when 
    149constructing a SQLAlchemy application that uses SQLite. 
    150 
    151Legacy Transaction Mode with the sqlite3 driver 
    152^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 
    153 
    154The most important aspect of transaction handling with the sqlite3 driver is 
    155that it defaults (which will continue through Python 3.15 before being 
    156removed in Python 3.16) to legacy transactional behavior which does 
    157not strictly follow :pep:`249`.  The way in which the driver diverges from the 
    158PEP is that it does not "begin" a transaction automatically as dictated by 
    159:pep:`249` except in the case of DML statements, e.g. INSERT, UPDATE, and 
    160DELETE.   Normally, :pep:`249` dictates that a BEGIN must be emitted upon 
    161the first SQL statement of any kind, so that all subsequent operations will 
    162be established within a transaction until ``connection.commit()`` has been 
    163called.   The ``sqlite3`` driver, in an effort to be easier to use in 
    164highly concurrent environments, skips this step for DQL (e.g. SELECT) statements, 
    165and also skips it for DDL (e.g. CREATE TABLE etc.) statements for more legacy 
    166reasons.  Statements such as SAVEPOINT are also skipped. 
    167 
    168In modern versions of the ``sqlite3`` driver as of Python 3.12, this legacy 
    169mode of operation is referred to as 
    170`"legacy transaction control" <https://docs.python.org/3/library/sqlite3.html#sqlite3-transaction-control-isolation-level>`_, and is in 
    171effect by default due to the ``Connection.autocommit`` parameter being set to 
    172the constant ``sqlite3.LEGACY_TRANSACTION_CONTROL``.  Prior to Python 3.12, 
    173the ``Connection.autocommit`` attribute did not exist. 
    174 
    175The implications of legacy transaction mode include: 
    176 
    177* **Incorrect support for transactional DDL** - statements like CREATE TABLE, ALTER TABLE, 
    178  CREATE INDEX etc. will not automatically BEGIN a transaction if one were not 
    179  started already, leading to the changes by each statement being 
    180  "autocommitted" immediately unless BEGIN were otherwise emitted first.   Very 
    181  old (pre Python 3.6) versions of SQLite would also force a COMMIT for these 
    182  operations even if a transaction were present, however this is no longer the 
    183  case. 
    184* **SERIALIZABLE behavior not fully functional** - SQLite's transaction isolation 
    185  behavior is normally consistent with SERIALIZABLE isolation, as it is a file- 
    186  based system that locks the database file entirely for write operations, 
    187  preventing COMMIT until all reader transactions (and associated file locks) 
    188  have completed.  However, sqlite3's legacy transaction mode fails to emit BEGIN for SELECT 
    189  statements, which causes these SELECT statements to no longer be "repeatable", 
    190  failing one of the consistency guarantees of SERIALIZABLE. 
    191* **Incorrect behavior for SAVEPOINT** - as the SAVEPOINT statement does not 
    192  imply a BEGIN, a new SAVEPOINT emitted before a BEGIN will function on its 
    193  own but fails to participate in the enclosing transaction, meaning a ROLLBACK 
    194  of the transaction will not rollback elements that were part of a released 
    195  savepoint. 
    196 
    197Legacy transaction mode first existed in order to faciliate working around 
    198SQLite's file locks.  Because SQLite relies upon whole-file locks, it is easy to 
    199get "database is locked" errors, particularly when newer features like "write 
    200ahead logging" are disabled.   This is a key reason why ``sqlite3``'s legacy 
    201transaction mode is still the default mode of operation; disabling it will 
    202produce behavior that is more susceptible to locked database errors.  However 
    203note that **legacy transaction mode will no longer be the default** in a future 
    204Python version (3.16 as of this writing). 
    205 
    206.. _sqlite_enabling_transactions: 
    207 
    208Enabling Non-Legacy SQLite Transactional Modes with the sqlite3 or aiosqlite driver 
    209^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 
    210 
    211Current SQLAlchemy support allows either for setting the 
    212``.Connection.autocommit`` attribute, most directly by using a 
    213:func:`._sa.create_engine` parameter, or if on an older version of Python where 
    214the attribute is not available, using event hooks to control the behavior of 
    215BEGIN. 
    216 
    217* **Enabling modern sqlite3 transaction control via the autocommit connect parameter** (Python 3.12 and above) 
    218 
    219  To use SQLite in the mode described at `Transaction control via the autocommit attribute <https://docs.python.org/3/library/sqlite3.html#transaction-control-via-the-autocommit-attribute>`_, 
    220  the most straightforward approach is to set the attribute to its recommended value 
    221  of ``False`` at the connect level using :paramref:`_sa.create_engine.connect_args``:: 
    222 
    223    from sqlalchemy import create_engine 
    224 
    225    engine = create_engine( 
    226        "sqlite:///myfile.db", connect_args={"autocommit": False} 
    227    ) 
    228 
    229  This parameter is also passed through when using the aiosqlite driver:: 
    230 
    231    from sqlalchemy.ext.asyncio import create_async_engine 
    232 
    233    engine = create_async_engine( 
    234        "sqlite+aiosqlite:///myfile.db", connect_args={"autocommit": False} 
    235    ) 
    236 
    237  The parameter can also be set at the attribute level using the :meth:`.PoolEvents.connect` 
    238  event hook, however this will only work for sqlite3, as aiosqlite does not yet expose this 
    239  attribute on its ``Connection`` object:: 
    240 
    241    from sqlalchemy import create_engine, event 
    242 
    243    engine = create_engine("sqlite:///myfile.db") 
    244 
    245 
    246    @event.listens_for(engine, "connect") 
    247    def do_connect(dbapi_connection, connection_record): 
    248        # enable autocommit=False mode 
    249        dbapi_connection.autocommit = False 
    250 
    251* **Using SQLAlchemy to emit BEGIN in lieu of SQLite's transaction control** (all Python versions, sqlite3 and aiosqlite) 
    252 
    253  For older versions of ``sqlite3`` or for cross-compatiblity with older and 
    254  newer versions, SQLAlchemy can also take over the job of transaction control. 
    255  This is achieved by using the :meth:`.ConnectionEvents.begin` hook 
    256  to emit the "BEGIN" command directly, while also disabling SQLite's control 
    257  of this command using the :meth:`.PoolEvents.connect` event hook to set the 
    258  ``Connection.isolation_level`` attribute to ``None``:: 
    259 
    260 
    261    from sqlalchemy import create_engine, event 
    262 
    263    engine = create_engine("sqlite:///myfile.db") 
    264 
    265 
    266    @event.listens_for(engine, "connect") 
    267    def do_connect(dbapi_connection, connection_record): 
    268        # disable sqlite3's emitting of the BEGIN statement entirely. 
    269        dbapi_connection.isolation_level = None 
    270 
    271 
    272    @event.listens_for(engine, "begin") 
    273    def do_begin(conn): 
    274        # emit our own BEGIN.   sqlite3 still emits COMMIT/ROLLBACK correctly 
    275        conn.exec_driver_sql("BEGIN") 
    276 
    277  When using the asyncio variant ``aiosqlite``, refer to ``engine.sync_engine`` 
    278  as in the example below:: 
    279 
    280    from sqlalchemy import create_engine, event 
    281    from sqlalchemy.ext.asyncio import create_async_engine 
    282 
    283    engine = create_async_engine("sqlite+aiosqlite:///myfile.db") 
    284 
    285 
    286    @event.listens_for(engine.sync_engine, "connect") 
    287    def do_connect(dbapi_connection, connection_record): 
    288        # disable aiosqlite's emitting of the BEGIN statement entirely. 
    289        dbapi_connection.isolation_level = None 
    290 
    291 
    292    @event.listens_for(engine.sync_engine, "begin") 
    293    def do_begin(conn): 
    294        # emit our own BEGIN.  aiosqlite still emits COMMIT/ROLLBACK correctly 
    295        conn.exec_driver_sql("BEGIN") 
    296 
    297.. _sqlite_isolation_level: 
    298 
    299Using SQLAlchemy's Driver Level AUTOCOMMIT Feature with SQLite 
    300^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 
    301 
    302SQLAlchemy has a comprehensive database isolation feature with optional 
    303autocommit support that is introduced in the section :ref:`dbapi_autocommit`. 
    304 
    305For the ``sqlite3`` and ``aiosqlite`` drivers, SQLAlchemy only includes 
    306built-in support for "AUTOCOMMIT".    Note that this mode is currently incompatible 
    307with the non-legacy isolation mode hooks documented in the previous 
    308section at :ref:`sqlite_enabling_transactions`. 
    309 
    310To use the ``sqlite3`` driver with SQLAlchemy driver-level autocommit, 
    311create an engine setting the :paramref:`_sa.create_engine.isolation_level` 
    312parameter to "AUTOCOMMIT":: 
    313 
    314    eng = create_engine("sqlite:///myfile.db", isolation_level="AUTOCOMMIT") 
    315 
    316When using the above mode, any event hooks that set the sqlite3 ``Connection.autocommit`` 
    317parameter away from its default of ``sqlite3.LEGACY_TRANSACTION_CONTROL`` 
    318as well as hooks that emit ``BEGIN`` should be disabled. 
    319 
    320Additional Reading for SQLite / sqlite3 transaction control 
    321^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 
    322 
    323Links with important information on SQLite, the sqlite3 driver, 
    324as well as long historical conversations on how things got to their current state: 
    325 
    326* `Isolation in SQLite <https://www.sqlite.org/isolation.html>`_ - on the SQLite website 
    327* `Transaction control <https://docs.python.org/3/library/sqlite3.html#transaction-control>`_ - describes the sqlite3 autocommit attribute as well 
    328  as the legacy isolation_level attribute. 
    329* `sqlite3 SELECT does not BEGIN a transaction, but should according to spec <https://github.com/python/cpython/issues/54133>`_ - imported Python standard library issue on github 
    330* `sqlite3 module breaks transactions and potentially corrupts data <https://github.com/python/cpython/issues/54949>`_ - imported Python standard library issue on github 
    331 
    332 
    333INSERT/UPDATE/DELETE...RETURNING 
    334--------------------------------- 
    335 
    336The SQLite dialect supports SQLite 3.35's  ``INSERT|UPDATE|DELETE..RETURNING`` 
    337syntax.   ``INSERT..RETURNING`` may be used 
    338automatically in some cases in order to fetch newly generated identifiers in 
    339place of the traditional approach of using ``cursor.lastrowid``, however 
    340``cursor.lastrowid`` is currently still preferred for simple single-statement 
    341cases for its better performance. 
    342 
    343To specify an explicit ``RETURNING`` clause, use the 
    344:meth:`._UpdateBase.returning` method on a per-statement basis:: 
    345 
    346    # INSERT..RETURNING 
    347    result = connection.execute( 
    348        table.insert().values(name="foo").returning(table.c.col1, table.c.col2) 
    349    ) 
    350    print(result.all()) 
    351 
    352    # UPDATE..RETURNING 
    353    result = connection.execute( 
    354        table.update() 
    355        .where(table.c.name == "foo") 
    356        .values(name="bar") 
    357        .returning(table.c.col1, table.c.col2) 
    358    ) 
    359    print(result.all()) 
    360 
    361    # DELETE..RETURNING 
    362    result = connection.execute( 
    363        table.delete() 
    364        .where(table.c.name == "foo") 
    365        .returning(table.c.col1, table.c.col2) 
    366    ) 
    367    print(result.all()) 
    368 
    369.. versionadded:: 2.0  Added support for SQLite RETURNING 
    370 
    371 
    372.. _sqlite_foreign_keys: 
    373 
    374Foreign Key Support 
    375------------------- 
    376 
    377SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables, 
    378however by default these constraints have no effect on the operation of the 
    379table. 
    380 
    381Constraint checking on SQLite has three prerequisites: 
    382 
    383* At least version 3.6.19 of SQLite must be in use 
    384* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY 
    385  or SQLITE_OMIT_TRIGGER symbols enabled. 
    386* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all 
    387  connections before use -- including the initial call to 
    388  :meth:`sqlalchemy.schema.MetaData.create_all`. 
    389 
    390SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for 
    391new connections through the usage of events:: 
    392 
    393    from sqlalchemy.engine import Engine 
    394    from sqlalchemy import event 
    395 
    396 
    397    @event.listens_for(Engine, "connect") 
    398    def set_sqlite_pragma(dbapi_connection, connection_record): 
    399        # the sqlite3 driver will not set PRAGMA foreign_keys 
    400        # if autocommit=False; set to True temporarily 
    401        ac = dbapi_connection.autocommit 
    402        dbapi_connection.autocommit = True 
    403 
    404        cursor = dbapi_connection.cursor() 
    405        cursor.execute("PRAGMA foreign_keys=ON") 
    406        cursor.close() 
    407 
    408        # restore previous autocommit setting 
    409        dbapi_connection.autocommit = ac 
    410 
    411.. warning:: 
    412 
    413    When SQLite foreign keys are enabled, it is **not possible** 
    414    to emit CREATE or DROP statements for tables that contain 
    415    mutually-dependent foreign key constraints; 
    416    to emit the DDL for these tables requires that ALTER TABLE be used to 
    417    create or drop these constraints separately, for which SQLite has 
    418    no support. 
    419 
    420.. seealso:: 
    421 
    422    `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_ 
    423    - on the SQLite web site. 
    424 
    425    :ref:`event_toplevel` - SQLAlchemy event API. 
    426 
    427    :ref:`use_alter` - more information on SQLAlchemy's facilities for handling 
    428     mutually-dependent foreign key constraints. 
    429 
    430.. _sqlite_on_conflict_ddl: 
    431 
    432ON CONFLICT support for constraints 
    433----------------------------------- 
    434 
    435.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for 
    436   SQLite, which occurs within a CREATE TABLE statement.  For "ON CONFLICT" as 
    437   applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`. 
    438 
    439SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied 
    440to primary key, unique, check, and not null constraints.   In DDL, it is 
    441rendered either within the "CONSTRAINT" clause or within the column definition 
    442itself depending on the location of the target constraint.    To render this 
    443clause within DDL, the extension parameter ``sqlite_on_conflict`` can be 
    444specified with a string conflict resolution algorithm within the 
    445:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`, 
    446:class:`.CheckConstraint` objects.  Within the :class:`_schema.Column` object, 
    447there 
    448are individual parameters ``sqlite_on_conflict_not_null``, 
    449``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each 
    450correspond to the three types of relevant constraint types that can be 
    451indicated from a :class:`_schema.Column` object. 
    452 
    453.. seealso:: 
    454 
    455    `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite 
    456    documentation 
    457 
    458The ``sqlite_on_conflict`` parameters accept a  string argument which is just 
    459the resolution name to be chosen, which on SQLite can be one of ROLLBACK, 
    460ABORT, FAIL, IGNORE, and REPLACE.   For example, to add a UNIQUE constraint 
    461that specifies the IGNORE algorithm:: 
    462 
    463    some_table = Table( 
    464        "some_table", 
    465        metadata, 
    466        Column("id", Integer, primary_key=True), 
    467        Column("data", Integer), 
    468        UniqueConstraint("id", "data", sqlite_on_conflict="IGNORE"), 
    469    ) 
    470 
    471The above renders CREATE TABLE DDL as: 
    472 
    473.. sourcecode:: sql 
    474 
    475    CREATE TABLE some_table ( 
    476        id INTEGER NOT NULL, 
    477        data INTEGER, 
    478        PRIMARY KEY (id), 
    479        UNIQUE (id, data) ON CONFLICT IGNORE 
    480    ) 
    481 
    482 
    483When using the :paramref:`_schema.Column.unique` 
    484flag to add a UNIQUE constraint 
    485to a single column, the ``sqlite_on_conflict_unique`` parameter can 
    486be added to the :class:`_schema.Column` as well, which will be added to the 
    487UNIQUE constraint in the DDL:: 
    488 
    489    some_table = Table( 
    490        "some_table", 
    491        metadata, 
    492        Column("id", Integer, primary_key=True), 
    493        Column( 
    494            "data", Integer, unique=True, sqlite_on_conflict_unique="IGNORE" 
    495        ), 
    496    ) 
    497 
    498rendering: 
    499 
    500.. sourcecode:: sql 
    501 
    502    CREATE TABLE some_table ( 
    503        id INTEGER NOT NULL, 
    504        data INTEGER, 
    505        PRIMARY KEY (id), 
    506        UNIQUE (data) ON CONFLICT IGNORE 
    507    ) 
    508 
    509To apply the FAIL algorithm for a NOT NULL constraint, 
    510``sqlite_on_conflict_not_null`` is used:: 
    511 
    512    some_table = Table( 
    513        "some_table", 
    514        metadata, 
    515        Column("id", Integer, primary_key=True), 
    516        Column( 
    517            "data", Integer, nullable=False, sqlite_on_conflict_not_null="FAIL" 
    518        ), 
    519    ) 
    520 
    521this renders the column inline ON CONFLICT phrase: 
    522 
    523.. sourcecode:: sql 
    524 
    525    CREATE TABLE some_table ( 
    526        id INTEGER NOT NULL, 
    527        data INTEGER NOT NULL ON CONFLICT FAIL, 
    528        PRIMARY KEY (id) 
    529    ) 
    530 
    531 
    532Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``:: 
    533 
    534    some_table = Table( 
    535        "some_table", 
    536        metadata, 
    537        Column( 
    538            "id", 
    539            Integer, 
    540            primary_key=True, 
    541            sqlite_on_conflict_primary_key="FAIL", 
    542        ), 
    543    ) 
    544 
    545SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict 
    546resolution algorithm is applied to the constraint itself: 
    547 
    548.. sourcecode:: sql 
    549 
    550    CREATE TABLE some_table ( 
    551        id INTEGER NOT NULL, 
    552        PRIMARY KEY (id) ON CONFLICT FAIL 
    553    ) 
    554 
    555.. _sqlite_on_conflict_insert: 
    556 
    557INSERT...ON CONFLICT (Upsert) 
    558----------------------------- 
    559 
    560.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for 
    561   SQLite, which occurs within an INSERT statement.  For "ON CONFLICT" as 
    562   applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`. 
    563 
    564From version 3.24.0 onwards, SQLite supports "upserts" (update or insert) 
    565of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` 
    566statement. A candidate row will only be inserted if that row does not violate 
    567any unique or primary key constraints. In the case of a unique constraint violation, a 
    568secondary action can occur which can be either "DO UPDATE", indicating that 
    569the data in the target row should be updated, or "DO NOTHING", which indicates 
    570to silently skip this row. 
    571 
    572Conflicts are determined using columns that are part of existing unique 
    573constraints and indexes.  These constraints are identified by stating the 
    574columns and conditions that comprise the indexes. 
    575 
    576SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific 
    577:func:`_sqlite.insert()` function, which provides 
    578the generative methods :meth:`_sqlite.Insert.on_conflict_do_update` 
    579and :meth:`_sqlite.Insert.on_conflict_do_nothing`: 
    580 
    581.. sourcecode:: pycon+sql 
    582 
    583    >>> from sqlalchemy.dialects.sqlite import insert 
    584 
    585    >>> insert_stmt = insert(my_table).values( 
    586    ...     id="some_existing_id", data="inserted value" 
    587    ... ) 
    588 
    589    >>> do_update_stmt = insert_stmt.on_conflict_do_update( 
    590    ...     index_elements=["id"], set_=dict(data="updated value") 
    591    ... ) 
    592 
    593    >>> print(do_update_stmt) 
    594    {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 
    595    ON CONFLICT (id) DO UPDATE SET data = ?{stop} 
    596 
    597    >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"]) 
    598 
    599    >>> print(do_nothing_stmt) 
    600    {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 
    601    ON CONFLICT (id) DO NOTHING 
    602 
    603.. versionadded:: 1.4 
    604 
    605.. seealso:: 
    606 
    607    `Upsert 
    608    <https://sqlite.org/lang_UPSERT.html>`_ 
    609    - in the SQLite documentation. 
    610 
    611 
    612Specifying the Target 
    613^^^^^^^^^^^^^^^^^^^^^ 
    614 
    615Both methods supply the "target" of the conflict using column inference: 
    616 
    617* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument 
    618  specifies a sequence containing string column names, :class:`_schema.Column` 
    619  objects, and/or SQL expression elements, which would identify a unique index 
    620  or unique constraint. 
    621 
    622* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` 
    623  to infer an index, a partial index can be inferred by also specifying the 
    624  :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter: 
    625 
    626  .. sourcecode:: pycon+sql 
    627 
    628        >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data") 
    629 
    630        >>> do_update_stmt = stmt.on_conflict_do_update( 
    631        ...     index_elements=[my_table.c.user_email], 
    632        ...     index_where=my_table.c.user_email.like("%@gmail.com"), 
    633        ...     set_=dict(data=stmt.excluded.data), 
    634        ... ) 
    635 
    636        >>> print(do_update_stmt) 
    637        {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?) 
    638        ON CONFLICT (user_email) 
    639        WHERE user_email LIKE '%@gmail.com' 
    640        DO UPDATE SET data = excluded.data 
    641 
    642The SET Clause 
    643^^^^^^^^^^^^^^^ 
    644 
    645``ON CONFLICT...DO UPDATE`` is used to perform an update of the already 
    646existing row, using any combination of new values as well as values 
    647from the proposed insertion. These values are specified using the 
    648:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter.  This 
    649parameter accepts a dictionary which consists of direct values 
    650for UPDATE: 
    651 
    652.. sourcecode:: pycon+sql 
    653 
    654    >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 
    655 
    656    >>> do_update_stmt = stmt.on_conflict_do_update( 
    657    ...     index_elements=["id"], set_=dict(data="updated value") 
    658    ... ) 
    659 
    660    >>> print(do_update_stmt) 
    661    {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) 
    662    ON CONFLICT (id) DO UPDATE SET data = ? 
    663 
    664.. warning:: 
    665 
    666    The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take 
    667    into account Python-side default UPDATE values or generation functions, 
    668    e.g. those specified using :paramref:`_schema.Column.onupdate`. These 
    669    values will not be exercised for an ON CONFLICT style of UPDATE, unless 
    670    they are manually specified in the 
    671    :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary. 
    672 
    673Updating using the Excluded INSERT Values 
    674^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 
    675 
    676In order to refer to the proposed insertion row, the special alias 
    677:attr:`~.sqlite.Insert.excluded` is available as an attribute on 
    678the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix 
    679on a column, that informs the DO UPDATE to update the row with the value that 
    680would have been inserted had the constraint not failed: 
    681 
    682.. sourcecode:: pycon+sql 
    683 
    684    >>> stmt = insert(my_table).values( 
    685    ...     id="some_id", data="inserted value", author="jlh" 
    686    ... ) 
    687 
    688    >>> do_update_stmt = stmt.on_conflict_do_update( 
    689    ...     index_elements=["id"], 
    690    ...     set_=dict(data="updated value", author=stmt.excluded.author), 
    691    ... ) 
    692 
    693    >>> print(do_update_stmt) 
    694    {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) 
    695    ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author 
    696 
    697Additional WHERE Criteria 
    698^^^^^^^^^^^^^^^^^^^^^^^^^ 
    699 
    700The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts 
    701a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where` 
    702parameter, which will limit those rows which receive an UPDATE: 
    703 
    704.. sourcecode:: pycon+sql 
    705 
    706    >>> stmt = insert(my_table).values( 
    707    ...     id="some_id", data="inserted value", author="jlh" 
    708    ... ) 
    709 
    710    >>> on_update_stmt = stmt.on_conflict_do_update( 
    711    ...     index_elements=["id"], 
    712    ...     set_=dict(data="updated value", author=stmt.excluded.author), 
    713    ...     where=(my_table.c.status == 2), 
    714    ... ) 
    715    >>> print(on_update_stmt) 
    716    {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) 
    717    ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author 
    718    WHERE my_table.status = ? 
    719 
    720 
    721Skipping Rows with DO NOTHING 
    722^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 
    723 
    724``ON CONFLICT`` may be used to skip inserting a row entirely 
    725if any conflict with a unique constraint occurs; below this is illustrated 
    726using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method: 
    727 
    728.. sourcecode:: pycon+sql 
    729 
    730    >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 
    731    >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) 
    732    >>> print(stmt) 
    733    {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING 
    734 
    735 
    736If ``DO NOTHING`` is used without specifying any columns or constraint, 
    737it has the effect of skipping the INSERT for any unique violation which 
    738occurs: 
    739 
    740.. sourcecode:: pycon+sql 
    741 
    742    >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 
    743    >>> stmt = stmt.on_conflict_do_nothing() 
    744    >>> print(stmt) 
    745    {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING 
    746 
    747.. _sqlite_type_reflection: 
    748 
    749Type Reflection 
    750--------------- 
    751 
    752SQLite types are unlike those of most other database backends, in that 
    753the string name of the type usually does not correspond to a "type" in a 
    754one-to-one fashion.  Instead, SQLite links per-column typing behavior 
    755to one of five so-called "type affinities" based on a string matching 
    756pattern for the type. 
    757 
    758SQLAlchemy's reflection process, when inspecting types, uses a simple 
    759lookup table to link the keywords returned to provided SQLAlchemy types. 
    760This lookup table is present within the SQLite dialect as it is for all 
    761other dialects.  However, the SQLite dialect has a different "fallback" 
    762routine for when a particular type name is not located in the lookup map; 
    763it instead implements the SQLite "type affinity" scheme located at 
    764https://www.sqlite.org/datatype3.html section 2.1. 
    765 
    766The provided typemap will make direct associations from an exact string 
    767name match for the following types: 
    768 
    769:class:`_types.BIGINT`, :class:`_types.BLOB`, 
    770:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`, 
    771:class:`_types.CHAR`, :class:`_types.DATE`, 
    772:class:`_types.DATETIME`, :class:`_types.FLOAT`, 
    773:class:`_types.DECIMAL`, :class:`_types.FLOAT`, 
    774:class:`_types.INTEGER`, :class:`_types.INTEGER`, 
    775:class:`_types.NUMERIC`, :class:`_types.REAL`, 
    776:class:`_types.SMALLINT`, :class:`_types.TEXT`, 
    777:class:`_types.TIME`, :class:`_types.TIMESTAMP`, 
    778:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`, 
    779:class:`_types.NCHAR` 
    780 
    781When a type name does not match one of the above types, the "type affinity" 
    782lookup is used instead: 
    783 
    784* :class:`_types.INTEGER` is returned if the type name includes the 
    785  string ``INT`` 
    786* :class:`_types.TEXT` is returned if the type name includes the 
    787  string ``CHAR``, ``CLOB`` or ``TEXT`` 
    788* :class:`_types.NullType` is returned if the type name includes the 
    789  string ``BLOB`` 
    790* :class:`_types.REAL` is returned if the type name includes the string 
    791  ``REAL``, ``FLOA`` or ``DOUB``. 
    792* Otherwise, the :class:`_types.NUMERIC` type is used. 
    793 
    794.. _sqlite_partial_index: 
    795 
    796Partial Indexes 
    797--------------- 
    798 
    799A partial index, e.g. one which uses a WHERE clause, can be specified 
    800with the DDL system using the argument ``sqlite_where``:: 
    801 
    802    tbl = Table("testtbl", m, Column("data", Integer)) 
    803    idx = Index( 
    804        "test_idx1", 
    805        tbl.c.data, 
    806        sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10), 
    807    ) 
    808 
    809The index will be rendered at create time as: 
    810 
    811.. sourcecode:: sql 
    812 
    813    CREATE INDEX test_idx1 ON testtbl (data) 
    814    WHERE data > 5 AND data < 10 
    815 
    816.. _sqlite_dotted_column_names: 
    817 
    818Dotted Column Names 
    819------------------- 
    820 
    821Using table or column names that explicitly have periods in them is 
    822**not recommended**.   While this is generally a bad idea for relational 
    823databases in general, as the dot is a syntactically significant character, 
    824the SQLite driver up until version **3.10.0** of SQLite has a bug which 
    825requires that SQLAlchemy filter out these dots in result sets. 
    826 
    827The bug, entirely outside of SQLAlchemy, can be illustrated thusly:: 
    828 
    829    import sqlite3 
    830 
    831    assert sqlite3.sqlite_version_info < ( 
    832        3, 
    833        10, 
    834        0, 
    835    ), "bug is fixed in this version" 
    836 
    837    conn = sqlite3.connect(":memory:") 
    838    cursor = conn.cursor() 
    839 
    840    cursor.execute("create table x (a integer, b integer)") 
    841    cursor.execute("insert into x (a, b) values (1, 1)") 
    842    cursor.execute("insert into x (a, b) values (2, 2)") 
    843 
    844    cursor.execute("select x.a, x.b from x") 
    845    assert [c[0] for c in cursor.description] == ["a", "b"] 
    846 
    847    cursor.execute( 
    848        """ 
    849        select x.a, x.b from x where a=1 
    850        union 
    851        select x.a, x.b from x where a=2 
    852        """ 
    853    ) 
    854    assert [c[0] for c in cursor.description] == ["a", "b"], [ 
    855        c[0] for c in cursor.description 
    856    ] 
    857 
    858The second assertion fails: 
    859 
    860.. sourcecode:: text 
    861 
    862    Traceback (most recent call last): 
    863      File "test.py", line 19, in <module> 
    864        [c[0] for c in cursor.description] 
    865    AssertionError: ['x.a', 'x.b'] 
    866 
    867Where above, the driver incorrectly reports the names of the columns 
    868including the name of the table, which is entirely inconsistent vs. 
    869when the UNION is not present. 
    870 
    871SQLAlchemy relies upon column names being predictable in how they match 
    872to the original statement, so the SQLAlchemy dialect has no choice but 
    873to filter these out:: 
    874 
    875 
    876    from sqlalchemy import create_engine 
    877 
    878    eng = create_engine("sqlite://") 
    879    conn = eng.connect() 
    880 
    881    conn.exec_driver_sql("create table x (a integer, b integer)") 
    882    conn.exec_driver_sql("insert into x (a, b) values (1, 1)") 
    883    conn.exec_driver_sql("insert into x (a, b) values (2, 2)") 
    884 
    885    result = conn.exec_driver_sql("select x.a, x.b from x") 
    886    assert result.keys() == ["a", "b"] 
    887 
    888    result = conn.exec_driver_sql( 
    889        """ 
    890        select x.a, x.b from x where a=1 
    891        union 
    892        select x.a, x.b from x where a=2 
    893        """ 
    894    ) 
    895    assert result.keys() == ["a", "b"] 
    896 
    897Note that above, even though SQLAlchemy filters out the dots, *both 
    898names are still addressable*:: 
    899 
    900    >>> row = result.first() 
    901    >>> row["a"] 
    902    1 
    903    >>> row["x.a"] 
    904    1 
    905    >>> row["b"] 
    906    1 
    907    >>> row["x.b"] 
    908    1 
    909 
    910Therefore, the workaround applied by SQLAlchemy only impacts 
    911:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In 
    912the very specific case where an application is forced to use column names that 
    913contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and 
    914:meth:`.Row.keys()` is required to return these dotted names unmodified, 
    915the ``sqlite_raw_colnames`` execution option may be provided, either on a 
    916per-:class:`_engine.Connection` basis:: 
    917 
    918    result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql( 
    919        """ 
    920        select x.a, x.b from x where a=1 
    921        union 
    922        select x.a, x.b from x where a=2 
    923        """ 
    924    ) 
    925    assert result.keys() == ["x.a", "x.b"] 
    926 
    927or on a per-:class:`_engine.Engine` basis:: 
    928 
    929    engine = create_engine( 
    930        "sqlite://", execution_options={"sqlite_raw_colnames": True} 
    931    ) 
    932 
    933When using the per-:class:`_engine.Engine` execution option, note that 
    934**Core and ORM queries that use UNION may not function properly**. 
    935 
    936SQLite-specific table options 
    937----------------------------- 
    938 
    939One option for CREATE TABLE is supported directly by the SQLite 
    940dialect in conjunction with the :class:`_schema.Table` construct: 
    941 
    942* ``WITHOUT ROWID``:: 
    943 
    944    Table("some_table", metadata, ..., sqlite_with_rowid=False) 
    945 
    946* 
    947  ``STRICT``:: 
    948 
    949    Table("some_table", metadata, ..., sqlite_strict=True) 
    950 
    951  .. versionadded:: 2.0.37 
    952 
    953.. seealso:: 
    954 
    955    `SQLite CREATE TABLE options 
    956    <https://www.sqlite.org/lang_createtable.html>`_ 
    957 
    958.. _sqlite_include_internal: 
    959 
    960Reflecting internal schema tables 
    961---------------------------------- 
    962 
    963Reflection methods that return lists of tables will omit so-called 
    964"SQLite internal schema object" names, which are considered by SQLite 
    965as any object name that is prefixed with ``sqlite_``.  An example of 
    966such an object is the ``sqlite_sequence`` table that's generated when 
    967the ``AUTOINCREMENT`` column parameter is used.   In order to return 
    968these objects, the parameter ``sqlite_include_internal=True`` may be 
    969passed to methods such as :meth:`_schema.MetaData.reflect` or 
    970:meth:`.Inspector.get_table_names`. 
    971 
    972.. versionadded:: 2.0  Added the ``sqlite_include_internal=True`` parameter. 
    973   Previously, these tables were not ignored by SQLAlchemy reflection 
    974   methods. 
    975 
    976.. note:: 
    977 
    978    The ``sqlite_include_internal`` parameter does not refer to the 
    979    "system" tables that are present in schemas such as ``sqlite_master``. 
    980 
    981.. seealso:: 
    982 
    983    `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite 
    984    documentation. 
    985 
    986'''  # noqa 
    987from __future__ import annotations 
    988 
    989import datetime 
    990import numbers 
    991import re 
    992from typing import Any 
    993from typing import Callable 
    994from typing import Optional 
    995from typing import TYPE_CHECKING 
    996 
    997from .json import JSON 
    998from .json import JSONIndexType 
    999from .json import JSONPathType 
    1000from ... import exc 
    1001from ... import schema as sa_schema 
    1002from ... import sql 
    1003from ... import text 
    1004from ... import types as sqltypes 
    1005from ... import util 
    1006from ...engine import default 
    1007from ...engine import processors 
    1008from ...engine import reflection 
    1009from ...engine.reflection import ReflectionDefaults 
    1010from ...sql import coercions 
    1011from ...sql import compiler 
    1012from ...sql import elements 
    1013from ...sql import roles 
    1014from ...sql import schema 
    1015from ...types import BLOB  # noqa 
    1016from ...types import BOOLEAN  # noqa 
    1017from ...types import CHAR  # noqa 
    1018from ...types import DECIMAL  # noqa 
    1019from ...types import FLOAT  # noqa 
    1020from ...types import INTEGER  # noqa 
    1021from ...types import NUMERIC  # noqa 
    1022from ...types import REAL  # noqa 
    1023from ...types import SMALLINT  # noqa 
    1024from ...types import TEXT  # noqa 
    1025from ...types import TIMESTAMP  # noqa 
    1026from ...types import VARCHAR  # noqa 
    1027 
    1028if TYPE_CHECKING: 
    1029    from ...engine.interfaces import DBAPIConnection 
    1030    from ...engine.interfaces import Dialect 
    1031    from ...engine.interfaces import IsolationLevel 
    1032    from ...sql.type_api import _BindProcessorType 
    1033    from ...sql.type_api import _ResultProcessorType 
    1034 
    1035 
    1036class _SQliteJson(JSON): 
    1037    def result_processor(self, dialect, coltype): 
    1038        default_processor = super().result_processor(dialect, coltype) 
    1039 
    1040        def process(value): 
    1041            try: 
    1042                return default_processor(value) 
    1043            except TypeError: 
    1044                if isinstance(value, numbers.Number): 
    1045                    return value 
    1046                else: 
    1047                    raise 
    1048 
    1049        return process 
    1050 
    1051 
    1052class _DateTimeMixin: 
    1053    _reg = None 
    1054    _storage_format = None 
    1055 
    1056    def __init__(self, storage_format=None, regexp=None, **kw): 
    1057        super().__init__(**kw) 
    1058        if regexp is not None: 
    1059            self._reg = re.compile(regexp) 
    1060        if storage_format is not None: 
    1061            self._storage_format = storage_format 
    1062 
    1063    @property 
    1064    def format_is_text_affinity(self): 
    1065        """return True if the storage format will automatically imply 
    1066        a TEXT affinity. 
    1067 
    1068        If the storage format contains no non-numeric characters, 
    1069        it will imply a NUMERIC storage format on SQLite; in this case, 
    1070        the type will generate its DDL as DATE_CHAR, DATETIME_CHAR, 
    1071        TIME_CHAR. 
    1072 
    1073        """ 
    1074        spec = self._storage_format % { 
    1075            "year": 0, 
    1076            "month": 0, 
    1077            "day": 0, 
    1078            "hour": 0, 
    1079            "minute": 0, 
    1080            "second": 0, 
    1081            "microsecond": 0, 
    1082        } 
    1083        return bool(re.search(r"[^0-9]", spec)) 
    1084 
    1085    def adapt(self, cls, **kw): 
    1086        if issubclass(cls, _DateTimeMixin): 
    1087            if self._storage_format: 
    1088                kw["storage_format"] = self._storage_format 
    1089            if self._reg: 
    1090                kw["regexp"] = self._reg 
    1091        return super().adapt(cls, **kw) 
    1092 
    1093    def literal_processor(self, dialect): 
    1094        bp = self.bind_processor(dialect) 
    1095 
    1096        def process(value): 
    1097            return "'%s'" % bp(value) 
    1098 
    1099        return process 
    1100 
    1101 
    1102class DATETIME(_DateTimeMixin, sqltypes.DateTime): 
    1103    r"""Represent a Python datetime object in SQLite using a string. 
    1104 
    1105    The default string storage format is:: 
    1106 
    1107        "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 
    1108 
    1109    e.g.: 
    1110 
    1111    .. sourcecode:: text 
    1112 
    1113        2021-03-15 12:05:57.105542 
    1114 
    1115    The incoming storage format is by default parsed using the 
    1116    Python ``datetime.fromisoformat()`` function. 
    1117 
    1118    .. versionchanged:: 2.0  ``datetime.fromisoformat()`` is used for default 
    1119       datetime string parsing. 
    1120 
    1121    The storage format can be customized to some degree using the 
    1122    ``storage_format`` and ``regexp`` parameters, such as:: 
    1123 
    1124        import re 
    1125        from sqlalchemy.dialects.sqlite import DATETIME 
    1126 
    1127        dt = DATETIME( 
    1128            storage_format=( 
    1129                "%(year)04d/%(month)02d/%(day)02d %(hour)02d:%(minute)02d:%(second)02d" 
    1130            ), 
    1131            regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)", 
    1132        ) 
    1133 
    1134    :param truncate_microseconds: when ``True`` microseconds will be truncated 
    1135     from the datetime. Can't be specified together with ``storage_format`` 
    1136     or ``regexp``. 
    1137 
    1138    :param storage_format: format string which will be applied to the dict 
    1139     with keys year, month, day, hour, minute, second, and microsecond. 
    1140 
    1141    :param regexp: regular expression which will be applied to incoming result 
    1142     rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming 
    1143     strings. If the regexp contains named groups, the resulting match dict is 
    1144     applied to the Python datetime() constructor as keyword arguments. 
    1145     Otherwise, if positional groups are used, the datetime() constructor 
    1146     is called with positional arguments via 
    1147     ``*map(int, match_obj.groups(0))``. 
    1148 
    1149    """  # noqa 
    1150 
    1151    _storage_format = ( 
    1152        "%(year)04d-%(month)02d-%(day)02d " 
    1153        "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 
    1154    ) 
    1155 
    1156    def __init__(self, *args, **kwargs): 
    1157        truncate_microseconds = kwargs.pop("truncate_microseconds", False) 
    1158        super().__init__(*args, **kwargs) 
    1159        if truncate_microseconds: 
    1160            assert "storage_format" not in kwargs, ( 
    1161                "You can specify only " 
    1162                "one of truncate_microseconds or storage_format." 
    1163            ) 
    1164            assert "regexp" not in kwargs, ( 
    1165                "You can specify only one of " 
    1166                "truncate_microseconds or regexp." 
    1167            ) 
    1168            self._storage_format = ( 
    1169                "%(year)04d-%(month)02d-%(day)02d " 
    1170                "%(hour)02d:%(minute)02d:%(second)02d" 
    1171            ) 
    1172 
    1173    def bind_processor( 
    1174        self, dialect: Dialect 
    1175    ) -> Optional[_BindProcessorType[Any]]: 
    1176        datetime_datetime = datetime.datetime 
    1177        datetime_date = datetime.date 
    1178        format_ = self._storage_format 
    1179 
    1180        def process(value): 
    1181            if value is None: 
    1182                return None 
    1183            elif isinstance(value, datetime_datetime): 
    1184                return format_ % { 
    1185                    "year": value.year, 
    1186                    "month": value.month, 
    1187                    "day": value.day, 
    1188                    "hour": value.hour, 
    1189                    "minute": value.minute, 
    1190                    "second": value.second, 
    1191                    "microsecond": value.microsecond, 
    1192                } 
    1193            elif isinstance(value, datetime_date): 
    1194                return format_ % { 
    1195                    "year": value.year, 
    1196                    "month": value.month, 
    1197                    "day": value.day, 
    1198                    "hour": 0, 
    1199                    "minute": 0, 
    1200                    "second": 0, 
    1201                    "microsecond": 0, 
    1202                } 
    1203            else: 
    1204                raise TypeError( 
    1205                    "SQLite DateTime type only accepts Python " 
    1206                    "datetime and date objects as input." 
    1207                ) 
    1208 
    1209        return process 
    1210 
    1211    def result_processor( 
    1212        self, dialect: Dialect, coltype: object 
    1213    ) -> Optional[_ResultProcessorType[Any]]: 
    1214        if self._reg: 
    1215            return processors.str_to_datetime_processor_factory( 
    1216                self._reg, datetime.datetime 
    1217            ) 
    1218        else: 
    1219            return processors.str_to_datetime 
    1220 
    1221 
    1222class DATE(_DateTimeMixin, sqltypes.Date): 
    1223    r"""Represent a Python date object in SQLite using a string. 
    1224 
    1225    The default string storage format is:: 
    1226 
    1227        "%(year)04d-%(month)02d-%(day)02d" 
    1228 
    1229    e.g.: 
    1230 
    1231    .. sourcecode:: text 
    1232 
    1233        2011-03-15 
    1234 
    1235    The incoming storage format is by default parsed using the 
    1236    Python ``date.fromisoformat()`` function. 
    1237 
    1238    .. versionchanged:: 2.0  ``date.fromisoformat()`` is used for default 
    1239       date string parsing. 
    1240 
    1241 
    1242    The storage format can be customized to some degree using the 
    1243    ``storage_format`` and ``regexp`` parameters, such as:: 
    1244 
    1245        import re 
    1246        from sqlalchemy.dialects.sqlite import DATE 
    1247 
    1248        d = DATE( 
    1249            storage_format="%(month)02d/%(day)02d/%(year)04d", 
    1250            regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)"), 
    1251        ) 
    1252 
    1253    :param storage_format: format string which will be applied to the 
    1254     dict with keys year, month, and day. 
    1255 
    1256    :param regexp: regular expression which will be applied to 
    1257     incoming result rows, replacing the use of ``date.fromisoformat()`` to 
    1258     parse incoming strings. If the regexp contains named groups, the resulting 
    1259     match dict is applied to the Python date() constructor as keyword 
    1260     arguments. Otherwise, if positional groups are used, the date() 
    1261     constructor is called with positional arguments via 
    1262     ``*map(int, match_obj.groups(0))``. 
    1263 
    1264    """ 
    1265 
    1266    _storage_format = "%(year)04d-%(month)02d-%(day)02d" 
    1267 
    1268    def bind_processor( 
    1269        self, dialect: Dialect 
    1270    ) -> Optional[_BindProcessorType[Any]]: 
    1271        datetime_date = datetime.date 
    1272        format_ = self._storage_format 
    1273 
    1274        def process(value): 
    1275            if value is None: 
    1276                return None 
    1277            elif isinstance(value, datetime_date): 
    1278                return format_ % { 
    1279                    "year": value.year, 
    1280                    "month": value.month, 
    1281                    "day": value.day, 
    1282                } 
    1283            else: 
    1284                raise TypeError( 
    1285                    "SQLite Date type only accepts Python " 
    1286                    "date objects as input." 
    1287                ) 
    1288 
    1289        return process 
    1290 
    1291    def result_processor( 
    1292        self, dialect: Dialect, coltype: object 
    1293    ) -> Optional[_ResultProcessorType[Any]]: 
    1294        if self._reg: 
    1295            return processors.str_to_datetime_processor_factory( 
    1296                self._reg, datetime.date 
    1297            ) 
    1298        else: 
    1299            return processors.str_to_date 
    1300 
    1301 
    1302class TIME(_DateTimeMixin, sqltypes.Time): 
    1303    r"""Represent a Python time object in SQLite using a string. 
    1304 
    1305    The default string storage format is:: 
    1306 
    1307        "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 
    1308 
    1309    e.g.: 
    1310 
    1311    .. sourcecode:: text 
    1312 
    1313        12:05:57.10558 
    1314 
    1315    The incoming storage format is by default parsed using the 
    1316    Python ``time.fromisoformat()`` function. 
    1317 
    1318    .. versionchanged:: 2.0  ``time.fromisoformat()`` is used for default 
    1319       time string parsing. 
    1320 
    1321    The storage format can be customized to some degree using the 
    1322    ``storage_format`` and ``regexp`` parameters, such as:: 
    1323 
    1324        import re 
    1325        from sqlalchemy.dialects.sqlite import TIME 
    1326 
    1327        t = TIME( 
    1328            storage_format="%(hour)02d-%(minute)02d-%(second)02d-%(microsecond)06d", 
    1329            regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?"), 
    1330        ) 
    1331 
    1332    :param truncate_microseconds: when ``True`` microseconds will be truncated 
    1333     from the time. Can't be specified together with ``storage_format`` 
    1334     or ``regexp``. 
    1335 
    1336    :param storage_format: format string which will be applied to the dict 
    1337     with keys hour, minute, second, and microsecond. 
    1338 
    1339    :param regexp: regular expression which will be applied to incoming result 
    1340     rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming 
    1341     strings. If the regexp contains named groups, the resulting match dict is 
    1342     applied to the Python time() constructor as keyword arguments. Otherwise, 
    1343     if positional groups are used, the time() constructor is called with 
    1344     positional arguments via ``*map(int, match_obj.groups(0))``. 
    1345 
    1346    """ 
    1347 
    1348    _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" 
    1349 
    1350    def __init__(self, *args, **kwargs): 
    1351        truncate_microseconds = kwargs.pop("truncate_microseconds", False) 
    1352        super().__init__(*args, **kwargs) 
    1353        if truncate_microseconds: 
    1354            assert "storage_format" not in kwargs, ( 
    1355                "You can specify only " 
    1356                "one of truncate_microseconds or storage_format." 
    1357            ) 
    1358            assert "regexp" not in kwargs, ( 
    1359                "You can specify only one of " 
    1360                "truncate_microseconds or regexp." 
    1361            ) 
    1362            self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d" 
    1363 
    1364    def bind_processor(self, dialect): 
    1365        datetime_time = datetime.time 
    1366        format_ = self._storage_format 
    1367 
    1368        def process(value): 
    1369            if value is None: 
    1370                return None 
    1371            elif isinstance(value, datetime_time): 
    1372                return format_ % { 
    1373                    "hour": value.hour, 
    1374                    "minute": value.minute, 
    1375                    "second": value.second, 
    1376                    "microsecond": value.microsecond, 
    1377                } 
    1378            else: 
    1379                raise TypeError( 
    1380                    "SQLite Time type only accepts Python " 
    1381                    "time objects as input." 
    1382                ) 
    1383 
    1384        return process 
    1385 
    1386    def result_processor(self, dialect, coltype): 
    1387        if self._reg: 
    1388            return processors.str_to_datetime_processor_factory( 
    1389                self._reg, datetime.time 
    1390            ) 
    1391        else: 
    1392            return processors.str_to_time 
    1393 
    1394 
    1395colspecs = { 
    1396    sqltypes.Date: DATE, 
    1397    sqltypes.DateTime: DATETIME, 
    1398    sqltypes.JSON: _SQliteJson, 
    1399    sqltypes.JSON.JSONIndexType: JSONIndexType, 
    1400    sqltypes.JSON.JSONPathType: JSONPathType, 
    1401    sqltypes.Time: TIME, 
    1402} 
    1403 
    1404ischema_names = { 
    1405    "BIGINT": sqltypes.BIGINT, 
    1406    "BLOB": sqltypes.BLOB, 
    1407    "BOOL": sqltypes.BOOLEAN, 
    1408    "BOOLEAN": sqltypes.BOOLEAN, 
    1409    "CHAR": sqltypes.CHAR, 
    1410    "DATE": sqltypes.DATE, 
    1411    "DATE_CHAR": sqltypes.DATE, 
    1412    "DATETIME": sqltypes.DATETIME, 
    1413    "DATETIME_CHAR": sqltypes.DATETIME, 
    1414    "DOUBLE": sqltypes.DOUBLE, 
    1415    "DECIMAL": sqltypes.DECIMAL, 
    1416    "FLOAT": sqltypes.FLOAT, 
    1417    "INT": sqltypes.INTEGER, 
    1418    "INTEGER": sqltypes.INTEGER, 
    1419    "JSON": JSON, 
    1420    "NUMERIC": sqltypes.NUMERIC, 
    1421    "REAL": sqltypes.REAL, 
    1422    "SMALLINT": sqltypes.SMALLINT, 
    1423    "TEXT": sqltypes.TEXT, 
    1424    "TIME": sqltypes.TIME, 
    1425    "TIME_CHAR": sqltypes.TIME, 
    1426    "TIMESTAMP": sqltypes.TIMESTAMP, 
    1427    "VARCHAR": sqltypes.VARCHAR, 
    1428    "NVARCHAR": sqltypes.NVARCHAR, 
    1429    "NCHAR": sqltypes.NCHAR, 
    1430} 
    1431 
    1432 
    1433class SQLiteCompiler(compiler.SQLCompiler): 
    1434    extract_map = util.update_copy( 
    1435        compiler.SQLCompiler.extract_map, 
    1436        { 
    1437            "month": "%m", 
    1438            "day": "%d", 
    1439            "year": "%Y", 
    1440            "second": "%S", 
    1441            "hour": "%H", 
    1442            "doy": "%j", 
    1443            "minute": "%M", 
    1444            "epoch": "%s", 
    1445            "dow": "%w", 
    1446            "week": "%W", 
    1447        }, 
    1448    ) 
    1449 
    1450    def visit_truediv_binary(self, binary, operator, **kw): 
    1451        return ( 
    1452            self.process(binary.left, **kw) 
    1453            + " / " 
    1454            + "(%s + 0.0)" % self.process(binary.right, **kw) 
    1455        ) 
    1456 
    1457    def visit_now_func(self, fn, **kw): 
    1458        return "CURRENT_TIMESTAMP" 
    1459 
    1460    def visit_localtimestamp_func(self, func, **kw): 
    1461        return "DATETIME(CURRENT_TIMESTAMP, 'localtime')" 
    1462 
    1463    def visit_true(self, expr, **kw): 
    1464        return "1" 
    1465 
    1466    def visit_false(self, expr, **kw): 
    1467        return "0" 
    1468 
    1469    def visit_char_length_func(self, fn, **kw): 
    1470        return "length%s" % self.function_argspec(fn) 
    1471 
    1472    def visit_aggregate_strings_func(self, fn, **kw): 
    1473        return super().visit_aggregate_strings_func( 
    1474            fn, use_function_name="group_concat", **kw 
    1475        ) 
    1476 
    1477    def visit_cast(self, cast, **kwargs): 
    1478        if self.dialect.supports_cast: 
    1479            return super().visit_cast(cast, **kwargs) 
    1480        else: 
    1481            return self.process(cast.clause, **kwargs) 
    1482 
    1483    def visit_extract(self, extract, **kw): 
    1484        try: 
    1485            return "CAST(STRFTIME('%s', %s) AS INTEGER)" % ( 
    1486                self.extract_map[extract.field], 
    1487                self.process(extract.expr, **kw), 
    1488            ) 
    1489        except KeyError as err: 
    1490            raise exc.CompileError( 
    1491                "%s is not a valid extract argument." % extract.field 
    1492            ) from err 
    1493 
    1494    def returning_clause( 
    1495        self, 
    1496        stmt, 
    1497        returning_cols, 
    1498        *, 
    1499        populate_result_map, 
    1500        **kw, 
    1501    ): 
    1502        kw["include_table"] = False 
    1503        return super().returning_clause( 
    1504            stmt, returning_cols, populate_result_map=populate_result_map, **kw 
    1505        ) 
    1506 
    1507    def limit_clause(self, select, **kw): 
    1508        text = "" 
    1509        if select._limit_clause is not None: 
    1510            text += "\n LIMIT " + self.process(select._limit_clause, **kw) 
    1511        if select._offset_clause is not None: 
    1512            if select._limit_clause is None: 
    1513                text += "\n LIMIT " + self.process(sql.literal(-1)) 
    1514            text += " OFFSET " + self.process(select._offset_clause, **kw) 
    1515        else: 
    1516            text += " OFFSET " + self.process(sql.literal(0), **kw) 
    1517        return text 
    1518 
    1519    def for_update_clause(self, select, **kw): 
    1520        # sqlite has no "FOR UPDATE" AFAICT 
    1521        return "" 
    1522 
    1523    def update_from_clause( 
    1524        self, update_stmt, from_table, extra_froms, from_hints, **kw 
    1525    ): 
    1526        kw["asfrom"] = True 
    1527        return "FROM " + ", ".join( 
    1528            t._compiler_dispatch(self, fromhints=from_hints, **kw) 
    1529            for t in extra_froms 
    1530        ) 
    1531 
    1532    def visit_is_distinct_from_binary(self, binary, operator, **kw): 
    1533        return "%s IS NOT %s" % ( 
    1534            self.process(binary.left), 
    1535            self.process(binary.right), 
    1536        ) 
    1537 
    1538    def visit_is_not_distinct_from_binary(self, binary, operator, **kw): 
    1539        return "%s IS %s" % ( 
    1540            self.process(binary.left), 
    1541            self.process(binary.right), 
    1542        ) 
    1543 
    1544    def visit_json_getitem_op_binary( 
    1545        self, binary, operator, _cast_applied=False, **kw 
    1546    ): 
    1547        if ( 
    1548            not _cast_applied 
    1549            and binary.type._type_affinity is not sqltypes.JSON 
    1550        ): 
    1551            kw["_cast_applied"] = True 
    1552            return self.process(sql.cast(binary, binary.type), **kw) 
    1553 
    1554        if binary.type._type_affinity is sqltypes.JSON: 
    1555            expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" 
    1556        else: 
    1557            expr = "JSON_EXTRACT(%s, %s)" 
    1558 
    1559        return expr % ( 
    1560            self.process(binary.left, **kw), 
    1561            self.process(binary.right, **kw), 
    1562        ) 
    1563 
    1564    def visit_json_path_getitem_op_binary( 
    1565        self, binary, operator, _cast_applied=False, **kw 
    1566    ): 
    1567        if ( 
    1568            not _cast_applied 
    1569            and binary.type._type_affinity is not sqltypes.JSON 
    1570        ): 
    1571            kw["_cast_applied"] = True 
    1572            return self.process(sql.cast(binary, binary.type), **kw) 
    1573 
    1574        if binary.type._type_affinity is sqltypes.JSON: 
    1575            expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" 
    1576        else: 
    1577            expr = "JSON_EXTRACT(%s, %s)" 
    1578 
    1579        return expr % ( 
    1580            self.process(binary.left, **kw), 
    1581            self.process(binary.right, **kw), 
    1582        ) 
    1583 
    1584    def visit_empty_set_op_expr(self, type_, expand_op, **kw): 
    1585        # slightly old SQLite versions don't seem to be able to handle 
    1586        # the empty set impl 
    1587        return self.visit_empty_set_expr(type_) 
    1588 
    1589    def visit_empty_set_expr(self, element_types, **kw): 
    1590        return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % ( 
    1591            ", ".join("1" for type_ in element_types or [INTEGER()]), 
    1592            ", ".join("1" for type_ in element_types or [INTEGER()]), 
    1593        ) 
    1594 
    1595    def visit_regexp_match_op_binary(self, binary, operator, **kw): 
    1596        return self._generate_generic_binary(binary, " REGEXP ", **kw) 
    1597 
    1598    def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 
    1599        return self._generate_generic_binary(binary, " NOT REGEXP ", **kw) 
    1600 
    1601    def _on_conflict_target(self, clause, **kw): 
    1602        if clause.inferred_target_elements is not None: 
    1603            target_text = "(%s)" % ", ".join( 
    1604                ( 
    1605                    self.preparer.quote(c) 
    1606                    if isinstance(c, str) 
    1607                    else self.process(c, include_table=False, use_schema=False) 
    1608                ) 
    1609                for c in clause.inferred_target_elements 
    1610            ) 
    1611            if clause.inferred_target_whereclause is not None: 
    1612                target_text += " WHERE %s" % self.process( 
    1613                    clause.inferred_target_whereclause, 
    1614                    include_table=False, 
    1615                    use_schema=False, 
    1616                    literal_execute=True, 
    1617                ) 
    1618 
    1619        else: 
    1620            target_text = "" 
    1621 
    1622        return target_text 
    1623 
    1624    def visit_on_conflict_do_nothing(self, on_conflict, **kw): 
    1625        target_text = self._on_conflict_target(on_conflict, **kw) 
    1626 
    1627        if target_text: 
    1628            return "ON CONFLICT %s DO NOTHING" % target_text 
    1629        else: 
    1630            return "ON CONFLICT DO NOTHING" 
    1631 
    1632    def visit_on_conflict_do_update(self, on_conflict, **kw): 
    1633        clause = on_conflict 
    1634 
    1635        target_text = self._on_conflict_target(on_conflict, **kw) 
    1636 
    1637        action_set_ops = [] 
    1638 
    1639        set_parameters = dict(clause.update_values_to_set) 
    1640        # create a list of column assignment clauses as tuples 
    1641 
    1642        insert_statement = self.stack[-1]["selectable"] 
    1643        cols = insert_statement.table.c 
    1644        for c in cols: 
    1645            col_key = c.key 
    1646 
    1647            if col_key in set_parameters: 
    1648                value = set_parameters.pop(col_key) 
    1649            elif c in set_parameters: 
    1650                value = set_parameters.pop(c) 
    1651            else: 
    1652                continue 
    1653 
    1654            if ( 
    1655                isinstance(value, elements.BindParameter) 
    1656                and value.type._isnull 
    1657            ): 
    1658                value = value._with_binary_element_type(c.type) 
    1659            value_text = self.process(value.self_group(), use_schema=False) 
    1660 
    1661            key_text = self.preparer.quote(c.name) 
    1662            action_set_ops.append("%s = %s" % (key_text, value_text)) 
    1663 
    1664        # check for names that don't match columns 
    1665        if set_parameters: 
    1666            util.warn( 
    1667                "Additional column names not matching " 
    1668                "any column keys in table '%s': %s" 
    1669                % ( 
    1670                    self.current_executable.table.name, 
    1671                    (", ".join("'%s'" % c for c in set_parameters)), 
    1672                ) 
    1673            ) 
    1674            for k, v in set_parameters.items(): 
    1675                key_text = ( 
    1676                    self.preparer.quote(k) 
    1677                    if isinstance(k, str) 
    1678                    else self.process(k, use_schema=False) 
    1679                ) 
    1680                value_text = self.process( 
    1681                    coercions.expect(roles.ExpressionElementRole, v), 
    1682                    use_schema=False, 
    1683                ) 
    1684                action_set_ops.append("%s = %s" % (key_text, value_text)) 
    1685 
    1686        action_text = ", ".join(action_set_ops) 
    1687        if clause.update_whereclause is not None: 
    1688            action_text += " WHERE %s" % self.process( 
    1689                clause.update_whereclause, include_table=True, use_schema=False 
    1690            ) 
    1691 
    1692        return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) 
    1693 
    1694    def visit_bitwise_xor_op_binary(self, binary, operator, **kw): 
    1695        # sqlite has no xor. Use "a XOR b" = "(a | b) - (a & b)". 
    1696        kw["eager_grouping"] = True 
    1697        or_ = self._generate_generic_binary(binary, " | ", **kw) 
    1698        and_ = self._generate_generic_binary(binary, " & ", **kw) 
    1699        return f"({or_} - {and_})" 
    1700 
    1701 
    1702class SQLiteDDLCompiler(compiler.DDLCompiler): 
    1703    def get_column_specification(self, column, **kwargs): 
    1704        coltype = self.dialect.type_compiler_instance.process( 
    1705            column.type, type_expression=column 
    1706        ) 
    1707        colspec = self.preparer.format_column(column) + " " + coltype 
    1708        default = self.get_column_default_string(column) 
    1709        if default is not None: 
    1710 
    1711            if not re.match(r"""^\s*[\'\"\(]""", default) and re.match( 
    1712                r".*\W.*", default 
    1713            ): 
    1714                colspec += f" DEFAULT ({default})" 
    1715            else: 
    1716                colspec += f" DEFAULT {default}" 
    1717 
    1718        if not column.nullable: 
    1719            colspec += " NOT NULL" 
    1720 
    1721            on_conflict_clause = column.dialect_options["sqlite"][ 
    1722                "on_conflict_not_null" 
    1723            ] 
    1724            if on_conflict_clause is not None: 
    1725                colspec += " ON CONFLICT " + on_conflict_clause 
    1726 
    1727        if column.primary_key: 
    1728            if ( 
    1729                column.autoincrement is True 
    1730                and len(column.table.primary_key.columns) != 1 
    1731            ): 
    1732                raise exc.CompileError( 
    1733                    "SQLite does not support autoincrement for " 
    1734                    "composite primary keys" 
    1735                ) 
    1736 
    1737            if ( 
    1738                column.table.dialect_options["sqlite"]["autoincrement"] 
    1739                and len(column.table.primary_key.columns) == 1 
    1740                and issubclass(column.type._type_affinity, sqltypes.Integer) 
    1741                and not column.foreign_keys 
    1742            ): 
    1743                colspec += " PRIMARY KEY" 
    1744 
    1745                on_conflict_clause = column.dialect_options["sqlite"][ 
    1746                    "on_conflict_primary_key" 
    1747                ] 
    1748                if on_conflict_clause is not None: 
    1749                    colspec += " ON CONFLICT " + on_conflict_clause 
    1750 
    1751                colspec += " AUTOINCREMENT" 
    1752 
    1753        if column.computed is not None: 
    1754            colspec += " " + self.process(column.computed) 
    1755 
    1756        return colspec 
    1757 
    1758    def visit_primary_key_constraint(self, constraint, **kw): 
    1759        # for columns with sqlite_autoincrement=True, 
    1760        # the PRIMARY KEY constraint can only be inline 
    1761        # with the column itself. 
    1762        if len(constraint.columns) == 1: 
    1763            c = list(constraint)[0] 
    1764            if ( 
    1765                c.primary_key 
    1766                and c.table.dialect_options["sqlite"]["autoincrement"] 
    1767                and issubclass(c.type._type_affinity, sqltypes.Integer) 
    1768                and not c.foreign_keys 
    1769            ): 
    1770                return None 
    1771 
    1772        text = super().visit_primary_key_constraint(constraint) 
    1773 
    1774        on_conflict_clause = constraint.dialect_options["sqlite"][ 
    1775            "on_conflict" 
    1776        ] 
    1777        if on_conflict_clause is None and len(constraint.columns) == 1: 
    1778            on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][ 
    1779                "on_conflict_primary_key" 
    1780            ] 
    1781 
    1782        if on_conflict_clause is not None: 
    1783            text += " ON CONFLICT " + on_conflict_clause 
    1784 
    1785        return text 
    1786 
    1787    def visit_unique_constraint(self, constraint, **kw): 
    1788        text = super().visit_unique_constraint(constraint) 
    1789 
    1790        on_conflict_clause = constraint.dialect_options["sqlite"][ 
    1791            "on_conflict" 
    1792        ] 
    1793        if on_conflict_clause is None and len(constraint.columns) == 1: 
    1794            col1 = list(constraint)[0] 
    1795            if isinstance(col1, schema.SchemaItem): 
    1796                on_conflict_clause = list(constraint)[0].dialect_options[ 
    1797                    "sqlite" 
    1798                ]["on_conflict_unique"] 
    1799 
    1800        if on_conflict_clause is not None: 
    1801            text += " ON CONFLICT " + on_conflict_clause 
    1802 
    1803        return text 
    1804 
    1805    def visit_check_constraint(self, constraint, **kw): 
    1806        text = super().visit_check_constraint(constraint) 
    1807 
    1808        on_conflict_clause = constraint.dialect_options["sqlite"][ 
    1809            "on_conflict" 
    1810        ] 
    1811 
    1812        if on_conflict_clause is not None: 
    1813            text += " ON CONFLICT " + on_conflict_clause 
    1814 
    1815        return text 
    1816 
    1817    def visit_column_check_constraint(self, constraint, **kw): 
    1818        text = super().visit_column_check_constraint(constraint) 
    1819 
    1820        if constraint.dialect_options["sqlite"]["on_conflict"] is not None: 
    1821            raise exc.CompileError( 
    1822                "SQLite does not support on conflict clause for " 
    1823                "column check constraint" 
    1824            ) 
    1825 
    1826        return text 
    1827 
    1828    def visit_foreign_key_constraint(self, constraint, **kw): 
    1829        local_table = constraint.elements[0].parent.table 
    1830        remote_table = constraint.elements[0].column.table 
    1831 
    1832        if local_table.schema != remote_table.schema: 
    1833            return None 
    1834        else: 
    1835            return super().visit_foreign_key_constraint(constraint) 
    1836 
    1837    def define_constraint_remote_table(self, constraint, table, preparer): 
    1838        """Format the remote table clause of a CREATE CONSTRAINT clause.""" 
    1839 
    1840        return preparer.format_table(table, use_schema=False) 
    1841 
    1842    def visit_create_index( 
    1843        self, create, include_schema=False, include_table_schema=True, **kw 
    1844    ): 
    1845        index = create.element 
    1846        self._verify_index_table(index) 
    1847        preparer = self.preparer 
    1848        text = "CREATE " 
    1849        if index.unique: 
    1850            text += "UNIQUE " 
    1851 
    1852        text += "INDEX " 
    1853 
    1854        if create.if_not_exists: 
    1855            text += "IF NOT EXISTS " 
    1856 
    1857        text += "%s ON %s (%s)" % ( 
    1858            self._prepared_index_name(index, include_schema=True), 
    1859            preparer.format_table(index.table, use_schema=False), 
    1860            ", ".join( 
    1861                self.sql_compiler.process( 
    1862                    expr, include_table=False, literal_binds=True 
    1863                ) 
    1864                for expr in index.expressions 
    1865            ), 
    1866        ) 
    1867 
    1868        whereclause = index.dialect_options["sqlite"]["where"] 
    1869        if whereclause is not None: 
    1870            where_compiled = self.sql_compiler.process( 
    1871                whereclause, include_table=False, literal_binds=True 
    1872            ) 
    1873            text += " WHERE " + where_compiled 
    1874 
    1875        return text 
    1876 
    1877    def post_create_table(self, table): 
    1878        table_options = [] 
    1879 
    1880        if not table.dialect_options["sqlite"]["with_rowid"]: 
    1881            table_options.append("WITHOUT ROWID") 
    1882 
    1883        if table.dialect_options["sqlite"]["strict"]: 
    1884            table_options.append("STRICT") 
    1885 
    1886        if table_options: 
    1887            return "\n " + ",\n ".join(table_options) 
    1888        else: 
    1889            return "" 
    1890 
    1891 
    1892class SQLiteTypeCompiler(compiler.GenericTypeCompiler): 
    1893    def visit_large_binary(self, type_, **kw): 
    1894        return self.visit_BLOB(type_) 
    1895 
    1896    def visit_DATETIME(self, type_, **kw): 
    1897        if ( 
    1898            not isinstance(type_, _DateTimeMixin) 
    1899            or type_.format_is_text_affinity 
    1900        ): 
    1901            return super().visit_DATETIME(type_) 
    1902        else: 
    1903            return "DATETIME_CHAR" 
    1904 
    1905    def visit_DATE(self, type_, **kw): 
    1906        if ( 
    1907            not isinstance(type_, _DateTimeMixin) 
    1908            or type_.format_is_text_affinity 
    1909        ): 
    1910            return super().visit_DATE(type_) 
    1911        else: 
    1912            return "DATE_CHAR" 
    1913 
    1914    def visit_TIME(self, type_, **kw): 
    1915        if ( 
    1916            not isinstance(type_, _DateTimeMixin) 
    1917            or type_.format_is_text_affinity 
    1918        ): 
    1919            return super().visit_TIME(type_) 
    1920        else: 
    1921            return "TIME_CHAR" 
    1922 
    1923    def visit_JSON(self, type_, **kw): 
    1924        # note this name provides NUMERIC affinity, not TEXT. 
    1925        # should not be an issue unless the JSON value consists of a single 
    1926        # numeric value.   JSONTEXT can be used if this case is required. 
    1927        return "JSON" 
    1928 
    1929 
    1930class SQLiteIdentifierPreparer(compiler.IdentifierPreparer): 
    1931    reserved_words = { 
    1932        "add", 
    1933        "after", 
    1934        "all", 
    1935        "alter", 
    1936        "analyze", 
    1937        "and", 
    1938        "as", 
    1939        "asc", 
    1940        "attach", 
    1941        "autoincrement", 
    1942        "before", 
    1943        "begin", 
    1944        "between", 
    1945        "by", 
    1946        "cascade", 
    1947        "case", 
    1948        "cast", 
    1949        "check", 
    1950        "collate", 
    1951        "column", 
    1952        "commit", 
    1953        "conflict", 
    1954        "constraint", 
    1955        "create", 
    1956        "cross", 
    1957        "current_date", 
    1958        "current_time", 
    1959        "current_timestamp", 
    1960        "database", 
    1961        "default", 
    1962        "deferrable", 
    1963        "deferred", 
    1964        "delete", 
    1965        "desc", 
    1966        "detach", 
    1967        "distinct", 
    1968        "drop", 
    1969        "each", 
    1970        "else", 
    1971        "end", 
    1972        "escape", 
    1973        "except", 
    1974        "exclusive", 
    1975        "exists", 
    1976        "explain", 
    1977        "false", 
    1978        "fail", 
    1979        "for", 
    1980        "foreign", 
    1981        "from", 
    1982        "full", 
    1983        "glob", 
    1984        "group", 
    1985        "having", 
    1986        "if", 
    1987        "ignore", 
    1988        "immediate", 
    1989        "in", 
    1990        "index", 
    1991        "indexed", 
    1992        "initially", 
    1993        "inner", 
    1994        "insert", 
    1995        "instead", 
    1996        "intersect", 
    1997        "into", 
    1998        "is", 
    1999        "isnull", 
    2000        "join", 
    2001        "key", 
    2002        "left", 
    2003        "like", 
    2004        "limit", 
    2005        "match", 
    2006        "natural", 
    2007        "not", 
    2008        "notnull", 
    2009        "null", 
    2010        "of", 
    2011        "offset", 
    2012        "on", 
    2013        "or", 
    2014        "order", 
    2015        "outer", 
    2016        "plan", 
    2017        "pragma", 
    2018        "primary", 
    2019        "query", 
    2020        "raise", 
    2021        "references", 
    2022        "reindex", 
    2023        "rename", 
    2024        "replace", 
    2025        "restrict", 
    2026        "right", 
    2027        "rollback", 
    2028        "row", 
    2029        "select", 
    2030        "set", 
    2031        "table", 
    2032        "temp", 
    2033        "temporary", 
    2034        "then", 
    2035        "to", 
    2036        "transaction", 
    2037        "trigger", 
    2038        "true", 
    2039        "union", 
    2040        "unique", 
    2041        "update", 
    2042        "using", 
    2043        "vacuum", 
    2044        "values", 
    2045        "view", 
    2046        "virtual", 
    2047        "when", 
    2048        "where", 
    2049    } 
    2050 
    2051 
    2052class SQLiteExecutionContext(default.DefaultExecutionContext): 
    2053    @util.memoized_property 
    2054    def _preserve_raw_colnames(self): 
    2055        return ( 
    2056            not self.dialect._broken_dotted_colnames 
    2057            or self.execution_options.get("sqlite_raw_colnames", False) 
    2058        ) 
    2059 
    2060    def _translate_colname(self, colname): 
    2061        # TODO: detect SQLite version 3.10.0 or greater; 
    2062        # see [ticket:3633] 
    2063 
    2064        # adjust for dotted column names.  SQLite 
    2065        # in the case of UNION may store col names as 
    2066        # "tablename.colname", or if using an attached database, 
    2067        # "database.tablename.colname", in cursor.description 
    2068        if not self._preserve_raw_colnames and "." in colname: 
    2069            return colname.split(".")[-1], colname 
    2070        else: 
    2071            return colname, None 
    2072 
    2073 
    2074class SQLiteDialect(default.DefaultDialect): 
    2075    name = "sqlite" 
    2076    supports_alter = False 
    2077 
    2078    # SQlite supports "DEFAULT VALUES" but *does not* support 
    2079    # "VALUES (DEFAULT)" 
    2080    supports_default_values = True 
    2081    supports_default_metavalue = False 
    2082 
    2083    # sqlite issue: 
    2084    # https://github.com/python/cpython/issues/93421 
    2085    # note this parameter is no longer used by the ORM or default dialect 
    2086    # see #9414 
    2087    supports_sane_rowcount_returning = False 
    2088 
    2089    supports_empty_insert = False 
    2090    supports_cast = True 
    2091    supports_multivalues_insert = True 
    2092    use_insertmanyvalues = True 
    2093    tuple_in_values = True 
    2094    supports_statement_cache = True 
    2095    insert_null_pk_still_autoincrements = True 
    2096    insert_returning = True 
    2097    update_returning = True 
    2098    update_returning_multifrom = True 
    2099    delete_returning = True 
    2100    update_returning_multifrom = True 
    2101 
    2102    supports_default_metavalue = True 
    2103    """dialect supports INSERT... VALUES (DEFAULT) syntax""" 
    2104 
    2105    default_metavalue_token = "NULL" 
    2106    """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 
    2107    parenthesis.""" 
    2108 
    2109    default_paramstyle = "qmark" 
    2110    execution_ctx_cls = SQLiteExecutionContext 
    2111    statement_compiler = SQLiteCompiler 
    2112    ddl_compiler = SQLiteDDLCompiler 
    2113    type_compiler_cls = SQLiteTypeCompiler 
    2114    preparer = SQLiteIdentifierPreparer 
    2115    ischema_names = ischema_names 
    2116    colspecs = colspecs 
    2117 
    2118    construct_arguments = [ 
    2119        ( 
    2120            sa_schema.Table, 
    2121            { 
    2122                "autoincrement": False, 
    2123                "with_rowid": True, 
    2124                "strict": False, 
    2125            }, 
    2126        ), 
    2127        (sa_schema.Index, {"where": None}), 
    2128        ( 
    2129            sa_schema.Column, 
    2130            { 
    2131                "on_conflict_primary_key": None, 
    2132                "on_conflict_not_null": None, 
    2133                "on_conflict_unique": None, 
    2134            }, 
    2135        ), 
    2136        (sa_schema.Constraint, {"on_conflict": None}), 
    2137    ] 
    2138 
    2139    _broken_fk_pragma_quotes = False 
    2140    _broken_dotted_colnames = False 
    2141 
    2142    def __init__( 
    2143        self, 
    2144        native_datetime: bool = False, 
    2145        json_serializer: Optional[Callable[..., Any]] = None, 
    2146        json_deserializer: Optional[Callable[..., Any]] = None, 
    2147        **kwargs: Any, 
    2148    ) -> None: 
    2149        default.DefaultDialect.__init__(self, **kwargs) 
    2150 
    2151        self._json_serializer = json_serializer 
    2152        self._json_deserializer = json_deserializer 
    2153 
    2154        # this flag used by pysqlite dialect, and perhaps others in the 
    2155        # future, to indicate the driver is handling date/timestamp 
    2156        # conversions (and perhaps datetime/time as well on some hypothetical 
    2157        # driver ?) 
    2158        self.native_datetime = native_datetime 
    2159 
    2160        if self.dbapi is not None: 
    2161            if self.dbapi.sqlite_version_info < (3, 7, 16): 
    2162                util.warn( 
    2163                    "SQLite version %s is older than 3.7.16, and will not " 
    2164                    "support right nested joins, as are sometimes used in " 
    2165                    "more complex ORM scenarios.  SQLAlchemy 1.4 and above " 
    2166                    "no longer tries to rewrite these joins." 
    2167                    % (self.dbapi.sqlite_version_info,) 
    2168                ) 
    2169 
    2170            # NOTE: python 3.7 on fedora for me has SQLite 3.34.1.  These 
    2171            # version checks are getting very stale. 
    2172            self._broken_dotted_colnames = self.dbapi.sqlite_version_info < ( 
    2173                3, 
    2174                10, 
    2175                0, 
    2176            ) 
    2177            self.supports_default_values = self.dbapi.sqlite_version_info >= ( 
    2178                3, 
    2179                3, 
    2180                8, 
    2181            ) 
    2182            self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3) 
    2183            self.supports_multivalues_insert = ( 
    2184                # https://www.sqlite.org/releaselog/3_7_11.html 
    2185                self.dbapi.sqlite_version_info 
    2186                >= (3, 7, 11) 
    2187            ) 
    2188            # see https://www.sqlalchemy.org/trac/ticket/2568 
    2189            # as well as https://www.sqlite.org/src/info/600482d161 
    2190            self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < ( 
    2191                3, 
    2192                6, 
    2193                14, 
    2194            ) 
    2195 
    2196            if self.dbapi.sqlite_version_info < (3, 35) or util.pypy: 
    2197                self.update_returning = self.delete_returning = ( 
    2198                    self.insert_returning 
    2199                ) = False 
    2200 
    2201            if self.dbapi.sqlite_version_info < (3, 32, 0): 
    2202                # https://www.sqlite.org/limits.html 
    2203                self.insertmanyvalues_max_parameters = 999 
    2204 
    2205    _isolation_lookup = util.immutabledict( 
    2206        {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0} 
    2207    ) 
    2208 
    2209    def get_isolation_level_values(self, dbapi_connection): 
    2210        return list(self._isolation_lookup) 
    2211 
    2212    def set_isolation_level( 
    2213        self, dbapi_connection: DBAPIConnection, level: IsolationLevel 
    2214    ) -> None: 
    2215        isolation_level = self._isolation_lookup[level] 
    2216 
    2217        cursor = dbapi_connection.cursor() 
    2218        cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}") 
    2219        cursor.close() 
    2220 
    2221    def get_isolation_level(self, dbapi_connection): 
    2222        cursor = dbapi_connection.cursor() 
    2223        cursor.execute("PRAGMA read_uncommitted") 
    2224        res = cursor.fetchone() 
    2225        if res: 
    2226            value = res[0] 
    2227        else: 
    2228            # https://www.sqlite.org/changes.html#version_3_3_3 
    2229            # "Optional READ UNCOMMITTED isolation (instead of the 
    2230            # default isolation level of SERIALIZABLE) and 
    2231            # table level locking when database connections 
    2232            # share a common cache."" 
    2233            # pre-SQLite 3.3.0 default to 0 
    2234            value = 0 
    2235        cursor.close() 
    2236        if value == 0: 
    2237            return "SERIALIZABLE" 
    2238        elif value == 1: 
    2239            return "READ UNCOMMITTED" 
    2240        else: 
    2241            assert False, "Unknown isolation level %s" % value 
    2242 
    2243    @reflection.cache 
    2244    def get_schema_names(self, connection, **kw): 
    2245        s = "PRAGMA database_list" 
    2246        dl = connection.exec_driver_sql(s) 
    2247 
    2248        return [db[1] for db in dl if db[1] != "temp"] 
    2249 
    2250    def _format_schema(self, schema, table_name): 
    2251        if schema is not None: 
    2252            qschema = self.identifier_preparer.quote_identifier(schema) 
    2253            name = f"{qschema}.{table_name}" 
    2254        else: 
    2255            name = table_name 
    2256        return name 
    2257 
    2258    def _sqlite_main_query( 
    2259        self, 
    2260        table: str, 
    2261        type_: str, 
    2262        schema: Optional[str], 
    2263        sqlite_include_internal: bool, 
    2264    ): 
    2265        main = self._format_schema(schema, table) 
    2266        if not sqlite_include_internal: 
    2267            filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'" 
    2268        else: 
    2269            filter_table = "" 
    2270        query = ( 
    2271            f"SELECT name FROM {main} " 
    2272            f"WHERE type='{type_}'{filter_table} " 
    2273            "ORDER BY name" 
    2274        ) 
    2275        return query 
    2276 
    2277    @reflection.cache 
    2278    def get_table_names( 
    2279        self, connection, schema=None, sqlite_include_internal=False, **kw 
    2280    ): 
    2281        query = self._sqlite_main_query( 
    2282            "sqlite_master", "table", schema, sqlite_include_internal 
    2283        ) 
    2284        names = connection.exec_driver_sql(query).scalars().all() 
    2285        return names 
    2286 
    2287    @reflection.cache 
    2288    def get_temp_table_names( 
    2289        self, connection, sqlite_include_internal=False, **kw 
    2290    ): 
    2291        query = self._sqlite_main_query( 
    2292            "sqlite_temp_master", "table", None, sqlite_include_internal 
    2293        ) 
    2294        names = connection.exec_driver_sql(query).scalars().all() 
    2295        return names 
    2296 
    2297    @reflection.cache 
    2298    def get_temp_view_names( 
    2299        self, connection, sqlite_include_internal=False, **kw 
    2300    ): 
    2301        query = self._sqlite_main_query( 
    2302            "sqlite_temp_master", "view", None, sqlite_include_internal 
    2303        ) 
    2304        names = connection.exec_driver_sql(query).scalars().all() 
    2305        return names 
    2306 
    2307    @reflection.cache 
    2308    def has_table(self, connection, table_name, schema=None, **kw): 
    2309        self._ensure_has_table_connection(connection) 
    2310 
    2311        if schema is not None and schema not in self.get_schema_names( 
    2312            connection, **kw 
    2313        ): 
    2314            return False 
    2315 
    2316        info = self._get_table_pragma( 
    2317            connection, "table_info", table_name, schema=schema 
    2318        ) 
    2319        return bool(info) 
    2320 
    2321    def _get_default_schema_name(self, connection): 
    2322        return "main" 
    2323 
    2324    @reflection.cache 
    2325    def get_view_names( 
    2326        self, connection, schema=None, sqlite_include_internal=False, **kw 
    2327    ): 
    2328        query = self._sqlite_main_query( 
    2329            "sqlite_master", "view", schema, sqlite_include_internal 
    2330        ) 
    2331        names = connection.exec_driver_sql(query).scalars().all() 
    2332        return names 
    2333 
    2334    @reflection.cache 
    2335    def get_view_definition(self, connection, view_name, schema=None, **kw): 
    2336        if schema is not None: 
    2337            qschema = self.identifier_preparer.quote_identifier(schema) 
    2338            master = f"{qschema}.sqlite_master" 
    2339            s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % ( 
    2340                master, 
    2341            ) 
    2342            rs = connection.exec_driver_sql(s, (view_name,)) 
    2343        else: 
    2344            try: 
    2345                s = ( 
    2346                    "SELECT sql FROM " 
    2347                    " (SELECT * FROM sqlite_master UNION ALL " 
    2348                    "  SELECT * FROM sqlite_temp_master) " 
    2349                    "WHERE name = ? " 
    2350                    "AND type='view'" 
    2351                ) 
    2352                rs = connection.exec_driver_sql(s, (view_name,)) 
    2353            except exc.DBAPIError: 
    2354                s = ( 
    2355                    "SELECT sql FROM sqlite_master WHERE name = ? " 
    2356                    "AND type='view'" 
    2357                ) 
    2358                rs = connection.exec_driver_sql(s, (view_name,)) 
    2359 
    2360        result = rs.fetchall() 
    2361        if result: 
    2362            return result[0].sql 
    2363        else: 
    2364            raise exc.NoSuchTableError( 
    2365                f"{schema}.{view_name}" if schema else view_name 
    2366            ) 
    2367 
    2368    @reflection.cache 
    2369    def get_columns(self, connection, table_name, schema=None, **kw): 
    2370        pragma = "table_info" 
    2371        # computed columns are threaded as hidden, they require table_xinfo 
    2372        if self.server_version_info >= (3, 31): 
    2373            pragma = "table_xinfo" 
    2374        info = self._get_table_pragma( 
    2375            connection, pragma, table_name, schema=schema 
    2376        ) 
    2377        columns = [] 
    2378        tablesql = None 
    2379        for row in info: 
    2380            name = row[1] 
    2381            type_ = row[2].upper() 
    2382            nullable = not row[3] 
    2383            default = row[4] 
    2384            primary_key = row[5] 
    2385            hidden = row[6] if pragma == "table_xinfo" else 0 
    2386 
    2387            # hidden has value 0 for normal columns, 1 for hidden columns, 
    2388            # 2 for computed virtual columns and 3 for computed stored columns 
    2389            # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b 
    2390            if hidden == 1: 
    2391                continue 
    2392 
    2393            generated = bool(hidden) 
    2394            persisted = hidden == 3 
    2395 
    2396            if tablesql is None and generated: 
    2397                tablesql = self._get_table_sql( 
    2398                    connection, table_name, schema, **kw 
    2399                ) 
    2400                # remove create table 
    2401                match = re.match( 
    2402                    ( 
    2403                        r"create table .*?\((.*)\)" 
    2404                        r"(?:\s*,?\s*(?:WITHOUT\s+ROWID|STRICT))*$" 
    2405                    ), 
    2406                    tablesql.strip(), 
    2407                    re.DOTALL | re.IGNORECASE, 
    2408                ) 
    2409                assert match, f"create table not found in {tablesql}" 
    2410                tablesql = match.group(1).strip() 
    2411 
    2412            columns.append( 
    2413                self._get_column_info( 
    2414                    name, 
    2415                    type_, 
    2416                    nullable, 
    2417                    default, 
    2418                    primary_key, 
    2419                    generated, 
    2420                    persisted, 
    2421                    tablesql, 
    2422                ) 
    2423            ) 
    2424        if columns: 
    2425            return columns 
    2426        elif not self.has_table(connection, table_name, schema): 
    2427            raise exc.NoSuchTableError( 
    2428                f"{schema}.{table_name}" if schema else table_name 
    2429            ) 
    2430        else: 
    2431            return ReflectionDefaults.columns() 
    2432 
    2433    def _get_column_info( 
    2434        self, 
    2435        name, 
    2436        type_, 
    2437        nullable, 
    2438        default, 
    2439        primary_key, 
    2440        generated, 
    2441        persisted, 
    2442        tablesql, 
    2443    ): 
    2444        if generated: 
    2445            # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)" 
    2446            # somehow is "INTEGER GENERATED ALWAYS" 
    2447            type_ = re.sub("generated", "", type_, flags=re.IGNORECASE) 
    2448            type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip() 
    2449 
    2450        coltype = self._resolve_type_affinity(type_) 
    2451 
    2452        if default is not None: 
    2453            default = str(default) 
    2454 
    2455        colspec = { 
    2456            "name": name, 
    2457            "type": coltype, 
    2458            "nullable": nullable, 
    2459            "default": default, 
    2460            "primary_key": primary_key, 
    2461        } 
    2462        if generated: 
    2463            sqltext = "" 
    2464            if tablesql: 
    2465                pattern = ( 
    2466                    r"[^,]*\s+GENERATED\s+ALWAYS\s+AS" 
    2467                    r"\s+\((.*)\)\s*(?:virtual|stored)?" 
    2468                ) 
    2469                match = re.search( 
    2470                    re.escape(name) + pattern, tablesql, re.IGNORECASE 
    2471                ) 
    2472                if match: 
    2473                    sqltext = match.group(1) 
    2474            colspec["computed"] = {"sqltext": sqltext, "persisted": persisted} 
    2475        return colspec 
    2476 
    2477    def _resolve_type_affinity(self, type_): 
    2478        """Return a data type from a reflected column, using affinity rules. 
    2479 
    2480        SQLite's goal for universal compatibility introduces some complexity 
    2481        during reflection, as a column's defined type might not actually be a 
    2482        type that SQLite understands - or indeed, my not be defined *at all*. 
    2483        Internally, SQLite handles this with a 'data type affinity' for each 
    2484        column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER', 
    2485        'REAL', or 'NONE' (raw bits). The algorithm that determines this is 
    2486        listed in https://www.sqlite.org/datatype3.html section 2.1. 
    2487 
    2488        This method allows SQLAlchemy to support that algorithm, while still 
    2489        providing access to smarter reflection utilities by recognizing 
    2490        column definitions that SQLite only supports through affinity (like 
    2491        DATE and DOUBLE). 
    2492 
    2493        """ 
    2494        match = re.match(r"([\w ]+)(\(.*?\))?", type_) 
    2495        if match: 
    2496            coltype = match.group(1) 
    2497            args = match.group(2) 
    2498        else: 
    2499            coltype = "" 
    2500            args = "" 
    2501 
    2502        if coltype in self.ischema_names: 
    2503            coltype = self.ischema_names[coltype] 
    2504        elif "INT" in coltype: 
    2505            coltype = sqltypes.INTEGER 
    2506        elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype: 
    2507            coltype = sqltypes.TEXT 
    2508        elif "BLOB" in coltype or not coltype: 
    2509            coltype = sqltypes.NullType 
    2510        elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype: 
    2511            coltype = sqltypes.REAL 
    2512        else: 
    2513            coltype = sqltypes.NUMERIC 
    2514 
    2515        if args is not None: 
    2516            args = re.findall(r"(\d+)", args) 
    2517            try: 
    2518                coltype = coltype(*[int(a) for a in args]) 
    2519            except TypeError: 
    2520                util.warn( 
    2521                    "Could not instantiate type %s with " 
    2522                    "reflected arguments %s; using no arguments." 
    2523                    % (coltype, args) 
    2524                ) 
    2525                coltype = coltype() 
    2526        else: 
    2527            coltype = coltype() 
    2528 
    2529        return coltype 
    2530 
    2531    @reflection.cache 
    2532    def get_pk_constraint(self, connection, table_name, schema=None, **kw): 
    2533        constraint_name = None 
    2534        table_data = self._get_table_sql(connection, table_name, schema=schema) 
    2535        if table_data: 
    2536            PK_PATTERN = r"CONSTRAINT (\w+) PRIMARY KEY" 
    2537            result = re.search(PK_PATTERN, table_data, re.I) 
    2538            constraint_name = result.group(1) if result else None 
    2539 
    2540        cols = self.get_columns(connection, table_name, schema, **kw) 
    2541        # consider only pk columns. This also avoids sorting the cached 
    2542        # value returned by get_columns 
    2543        cols = [col for col in cols if col.get("primary_key", 0) > 0] 
    2544        cols.sort(key=lambda col: col.get("primary_key")) 
    2545        pkeys = [col["name"] for col in cols] 
    2546 
    2547        if pkeys: 
    2548            return {"constrained_columns": pkeys, "name": constraint_name} 
    2549        else: 
    2550            return ReflectionDefaults.pk_constraint() 
    2551 
    2552    @reflection.cache 
    2553    def get_foreign_keys(self, connection, table_name, schema=None, **kw): 
    2554        # sqlite makes this *extremely difficult*. 
    2555        # First, use the pragma to get the actual FKs. 
    2556        pragma_fks = self._get_table_pragma( 
    2557            connection, "foreign_key_list", table_name, schema=schema 
    2558        ) 
    2559 
    2560        fks = {} 
    2561 
    2562        for row in pragma_fks: 
    2563            (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4]) 
    2564 
    2565            if not rcol: 
    2566                # no referred column, which means it was not named in the 
    2567                # original DDL.  The referred columns of the foreign key 
    2568                # constraint are therefore the primary key of the referred 
    2569                # table. 
    2570                try: 
    2571                    referred_pk = self.get_pk_constraint( 
    2572                        connection, rtbl, schema=schema, **kw 
    2573                    ) 
    2574                    referred_columns = referred_pk["constrained_columns"] 
    2575                except exc.NoSuchTableError: 
    2576                    # ignore not existing parents 
    2577                    referred_columns = [] 
    2578            else: 
    2579                # note we use this list only if this is the first column 
    2580                # in the constraint.  for subsequent columns we ignore the 
    2581                # list and append "rcol" if present. 
    2582                referred_columns = [] 
    2583 
    2584            if self._broken_fk_pragma_quotes: 
    2585                rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl) 
    2586 
    2587            if numerical_id in fks: 
    2588                fk = fks[numerical_id] 
    2589            else: 
    2590                fk = fks[numerical_id] = { 
    2591                    "name": None, 
    2592                    "constrained_columns": [], 
    2593                    "referred_schema": schema, 
    2594                    "referred_table": rtbl, 
    2595                    "referred_columns": referred_columns, 
    2596                    "options": {}, 
    2597                } 
    2598                fks[numerical_id] = fk 
    2599 
    2600            fk["constrained_columns"].append(lcol) 
    2601 
    2602            if rcol: 
    2603                fk["referred_columns"].append(rcol) 
    2604 
    2605        def fk_sig(constrained_columns, referred_table, referred_columns): 
    2606            return ( 
    2607                tuple(constrained_columns) 
    2608                + (referred_table,) 
    2609                + tuple(referred_columns) 
    2610            ) 
    2611 
    2612        # then, parse the actual SQL and attempt to find DDL that matches 
    2613        # the names as well.   SQLite saves the DDL in whatever format 
    2614        # it was typed in as, so need to be liberal here. 
    2615 
    2616        keys_by_signature = { 
    2617            fk_sig( 
    2618                fk["constrained_columns"], 
    2619                fk["referred_table"], 
    2620                fk["referred_columns"], 
    2621            ): fk 
    2622            for fk in fks.values() 
    2623        } 
    2624 
    2625        table_data = self._get_table_sql(connection, table_name, schema=schema) 
    2626 
    2627        def parse_fks(): 
    2628            if table_data is None: 
    2629                # system tables, etc. 
    2630                return 
    2631 
    2632            # note that we already have the FKs from PRAGMA above.  This whole 
    2633            # regexp thing is trying to locate additional detail about the 
    2634            # FKs, namely the name of the constraint and other options. 
    2635            # so parsing the columns is really about matching it up to what 
    2636            # we already have. 
    2637            FK_PATTERN = ( 
    2638                r"(?:CONSTRAINT (\w+) +)?" 
    2639                r"FOREIGN KEY *\( *(.+?) *\) +" 
    2640                r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *'  # noqa: E501 
    2641                r"((?:ON (?:DELETE|UPDATE) " 
    2642                r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)" 
    2643                r"((?:NOT +)?DEFERRABLE)?" 
    2644                r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?" 
    2645            ) 
    2646            for match in re.finditer(FK_PATTERN, table_data, re.I): 
    2647                ( 
    2648                    constraint_name, 
    2649                    constrained_columns, 
    2650                    referred_quoted_name, 
    2651                    referred_name, 
    2652                    referred_columns, 
    2653                    onupdatedelete, 
    2654                    deferrable, 
    2655                    initially, 
    2656                ) = match.group(1, 2, 3, 4, 5, 6, 7, 8) 
    2657                constrained_columns = list( 
    2658                    self._find_cols_in_sig(constrained_columns) 
    2659                ) 
    2660                if not referred_columns: 
    2661                    referred_columns = constrained_columns 
    2662                else: 
    2663                    referred_columns = list( 
    2664                        self._find_cols_in_sig(referred_columns) 
    2665                    ) 
    2666                referred_name = referred_quoted_name or referred_name 
    2667                options = {} 
    2668 
    2669                for token in re.split(r" *\bON\b *", onupdatedelete.upper()): 
    2670                    if token.startswith("DELETE"): 
    2671                        ondelete = token[6:].strip() 
    2672                        if ondelete and ondelete != "NO ACTION": 
    2673                            options["ondelete"] = ondelete 
    2674                    elif token.startswith("UPDATE"): 
    2675                        onupdate = token[6:].strip() 
    2676                        if onupdate and onupdate != "NO ACTION": 
    2677                            options["onupdate"] = onupdate 
    2678 
    2679                if deferrable: 
    2680                    options["deferrable"] = "NOT" not in deferrable.upper() 
    2681                if initially: 
    2682                    options["initially"] = initially.upper() 
    2683 
    2684                yield ( 
    2685                    constraint_name, 
    2686                    constrained_columns, 
    2687                    referred_name, 
    2688                    referred_columns, 
    2689                    options, 
    2690                ) 
    2691 
    2692        fkeys = [] 
    2693 
    2694        for ( 
    2695            constraint_name, 
    2696            constrained_columns, 
    2697            referred_name, 
    2698            referred_columns, 
    2699            options, 
    2700        ) in parse_fks(): 
    2701            sig = fk_sig(constrained_columns, referred_name, referred_columns) 
    2702            if sig not in keys_by_signature: 
    2703                util.warn( 
    2704                    "WARNING: SQL-parsed foreign key constraint " 
    2705                    "'%s' could not be located in PRAGMA " 
    2706                    "foreign_keys for table %s" % (sig, table_name) 
    2707                ) 
    2708                continue 
    2709            key = keys_by_signature.pop(sig) 
    2710            key["name"] = constraint_name 
    2711            key["options"] = options 
    2712            fkeys.append(key) 
    2713        # assume the remainders are the unnamed, inline constraints, just 
    2714        # use them as is as it's extremely difficult to parse inline 
    2715        # constraints 
    2716        fkeys.extend(keys_by_signature.values()) 
    2717        if fkeys: 
    2718            return fkeys 
    2719        else: 
    2720            return ReflectionDefaults.foreign_keys() 
    2721 
    2722    def _find_cols_in_sig(self, sig): 
    2723        for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I): 
    2724            yield match.group(1) or match.group(2) 
    2725 
    2726    @reflection.cache 
    2727    def get_unique_constraints( 
    2728        self, connection, table_name, schema=None, **kw 
    2729    ): 
    2730        auto_index_by_sig = {} 
    2731        for idx in self.get_indexes( 
    2732            connection, 
    2733            table_name, 
    2734            schema=schema, 
    2735            include_auto_indexes=True, 
    2736            **kw, 
    2737        ): 
    2738            if not idx["name"].startswith("sqlite_autoindex"): 
    2739                continue 
    2740            sig = tuple(idx["column_names"]) 
    2741            auto_index_by_sig[sig] = idx 
    2742 
    2743        table_data = self._get_table_sql( 
    2744            connection, table_name, schema=schema, **kw 
    2745        ) 
    2746        unique_constraints = [] 
    2747 
    2748        def parse_uqs(): 
    2749            if table_data is None: 
    2750                return 
    2751            UNIQUE_PATTERN = r'(?:CONSTRAINT "?(.+?)"? +)?UNIQUE *\((.+?)\)' 
    2752            INLINE_UNIQUE_PATTERN = ( 
    2753                r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?)[\t ]' 
    2754                r"+[a-z0-9_ ]+?[\t ]+UNIQUE" 
    2755            ) 
    2756 
    2757            for match in re.finditer(UNIQUE_PATTERN, table_data, re.I): 
    2758                name, cols = match.group(1, 2) 
    2759                yield name, list(self._find_cols_in_sig(cols)) 
    2760 
    2761            # we need to match inlines as well, as we seek to differentiate 
    2762            # a UNIQUE constraint from a UNIQUE INDEX, even though these 
    2763            # are kind of the same thing :) 
    2764            for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I): 
    2765                cols = list( 
    2766                    self._find_cols_in_sig(match.group(1) or match.group(2)) 
    2767                ) 
    2768                yield None, cols 
    2769 
    2770        for name, cols in parse_uqs(): 
    2771            sig = tuple(cols) 
    2772            if sig in auto_index_by_sig: 
    2773                auto_index_by_sig.pop(sig) 
    2774                parsed_constraint = {"name": name, "column_names": cols} 
    2775                unique_constraints.append(parsed_constraint) 
    2776        # NOTE: auto_index_by_sig might not be empty here, 
    2777        # the PRIMARY KEY may have an entry. 
    2778        if unique_constraints: 
    2779            return unique_constraints 
    2780        else: 
    2781            return ReflectionDefaults.unique_constraints() 
    2782 
    2783    @reflection.cache 
    2784    def get_check_constraints(self, connection, table_name, schema=None, **kw): 
    2785        table_data = self._get_table_sql( 
    2786            connection, table_name, schema=schema, **kw 
    2787        ) 
    2788 
    2789        # Extract CHECK constraints by properly handling balanced parentheses 
    2790        # and avoiding false matches when CHECK/CONSTRAINT appear in table 
    2791        # names. See #12924 for context. 
    2792        # 
    2793        # SQLite supports 4 identifier quote styles (see 
    2794        # sqlite.org/lang_keywords.html): 
    2795        # - Double quotes "..." (standard SQL) 
    2796        # - Brackets [...] (MS Access/SQL Server compatibility) 
    2797        # - Backticks `...` (MySQL compatibility) 
    2798        # - Single quotes '...' (SQLite extension) 
    2799        # 
    2800        # NOTE: there is not currently a way to parse CHECK constraints that 
    2801        # contain newlines as the approach here relies upon each individual 
    2802        # CHECK constraint being on a single line by itself.   This necessarily 
    2803        # makes assumptions as to how the CREATE TABLE was emitted. 
    2804        CHECK_PATTERN = re.compile( 
    2805            r""" 
    2806            (?<![A-Za-z0-9_])   # Negative lookbehind: ensure CHECK is not 
    2807                                # part of an identifier (e.g., table name 
    2808                                # like "tableCHECK") 
    2809 
    2810            (?:                 # Optional CONSTRAINT clause 
    2811                CONSTRAINT\s+ 
    2812                (               # Group 1: Constraint name (quoted or unquoted) 
    2813                    "(?:[^"]|"")+"        # Double-quoted: "name" or "na""me" 
    2814                    |'(?:[^']|'')+'  # Single-quoted: 'name' or 'na''me' 
    2815                    |\[(?:[^\]]|\]\])+\]  # Bracket-quoted: [name] or [na]]me] 
    2816                    |`(?:[^`]|``)+`       # Backtick-quoted: `name` or `na``me` 
    2817                    |\S+                  # Unquoted: simple_name 
    2818                ) 
    2819                \s+ 
    2820            )? 
    2821 
    2822            CHECK\s*\(          # CHECK keyword followed by opening paren 
    2823            """, 
    2824            re.VERBOSE | re.IGNORECASE, 
    2825        ) 
    2826        cks = [] 
    2827 
    2828        for match in re.finditer(CHECK_PATTERN, table_data or ""): 
    2829            constraint_name = match.group(1) 
    2830 
    2831            if constraint_name: 
    2832                # Remove surrounding quotes if present 
    2833                # Double quotes: "name" -> name 
    2834                # Single quotes: 'name' -> name 
    2835                # Brackets: [name] -> name 
    2836                # Backticks: `name` -> name 
    2837                constraint_name = re.sub( 
    2838                    r'^(["\'`])(.+)\1$|^\[(.+)\]$', 
    2839                    lambda m: m.group(2) or m.group(3), 
    2840                    constraint_name, 
    2841                    flags=re.DOTALL, 
    2842                ) 
    2843 
    2844            # Find the matching closing parenthesis by counting balanced parens 
    2845            # Must track string context to ignore parens inside string literals 
    2846            start = match.end()  # Position after 'CHECK (' 
    2847            paren_count = 1 
    2848            in_single_quote = False 
    2849            in_double_quote = False 
    2850 
    2851            for pos, char in enumerate(table_data[start:], start): 
    2852                # Track string literal context 
    2853                if char == "'" and not in_double_quote: 
    2854                    in_single_quote = not in_single_quote 
    2855                elif char == '"' and not in_single_quote: 
    2856                    in_double_quote = not in_double_quote 
    2857                # Only count parens when not inside a string literal 
    2858                elif not in_single_quote and not in_double_quote: 
    2859                    if char == "(": 
    2860                        paren_count += 1 
    2861                    elif char == ")": 
    2862                        paren_count -= 1 
    2863                        if paren_count == 0: 
    2864                            # Successfully found matching closing parenthesis 
    2865                            sqltext = table_data[start:pos].strip() 
    2866                            cks.append( 
    2867                                {"sqltext": sqltext, "name": constraint_name} 
    2868                            ) 
    2869                            break 
    2870 
    2871        cks.sort(key=lambda d: d["name"] or "~")  # sort None as last 
    2872        if cks: 
    2873            return cks 
    2874        else: 
    2875            return ReflectionDefaults.check_constraints() 
    2876 
    2877    @reflection.cache 
    2878    def get_indexes(self, connection, table_name, schema=None, **kw): 
    2879        pragma_indexes = self._get_table_pragma( 
    2880            connection, "index_list", table_name, schema=schema 
    2881        ) 
    2882        indexes = [] 
    2883 
    2884        # regular expression to extract the filter predicate of a partial 
    2885        # index. this could fail to extract the predicate correctly on 
    2886        # indexes created like 
    2887        #   CREATE INDEX i ON t (col || ') where') WHERE col <> '' 
    2888        # but as this function does not support expression-based indexes 
    2889        # this case does not occur. 
    2890        partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE) 
    2891 
    2892        if schema: 
    2893            schema_expr = "%s." % self.identifier_preparer.quote_identifier( 
    2894                schema 
    2895            ) 
    2896        else: 
    2897            schema_expr = "" 
    2898 
    2899        include_auto_indexes = kw.pop("include_auto_indexes", False) 
    2900        for row in pragma_indexes: 
    2901            # ignore implicit primary key index. 
    2902            # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html 
    2903            if not include_auto_indexes and row[1].startswith( 
    2904                "sqlite_autoindex" 
    2905            ): 
    2906                continue 
    2907            indexes.append( 
    2908                dict( 
    2909                    name=row[1], 
    2910                    column_names=[], 
    2911                    unique=row[2], 
    2912                    dialect_options={}, 
    2913                ) 
    2914            ) 
    2915 
    2916            # check partial indexes 
    2917            if len(row) >= 5 and row[4]: 
    2918                s = ( 
    2919                    "SELECT sql FROM %(schema)ssqlite_master " 
    2920                    "WHERE name = ? " 
    2921                    "AND type = 'index'" % {"schema": schema_expr} 
    2922                ) 
    2923                rs = connection.exec_driver_sql(s, (row[1],)) 
    2924                index_sql = rs.scalar() 
    2925                predicate_match = partial_pred_re.search(index_sql) 
    2926                if predicate_match is None: 
    2927                    # unless the regex is broken this case shouldn't happen 
    2928                    # because we know this is a partial index, so the 
    2929                    # definition sql should match the regex 
    2930                    util.warn( 
    2931                        "Failed to look up filter predicate of " 
    2932                        "partial index %s" % row[1] 
    2933                    ) 
    2934                else: 
    2935                    predicate = predicate_match.group(1) 
    2936                    indexes[-1]["dialect_options"]["sqlite_where"] = text( 
    2937                        predicate 
    2938                    ) 
    2939 
    2940        # loop thru unique indexes to get the column names. 
    2941        for idx in list(indexes): 
    2942            pragma_index = self._get_table_pragma( 
    2943                connection, "index_info", idx["name"], schema=schema 
    2944            ) 
    2945 
    2946            for row in pragma_index: 
    2947                if row[2] is None: 
    2948                    util.warn( 
    2949                        "Skipped unsupported reflection of " 
    2950                        "expression-based index %s" % idx["name"] 
    2951                    ) 
    2952                    indexes.remove(idx) 
    2953                    break 
    2954                else: 
    2955                    idx["column_names"].append(row[2]) 
    2956 
    2957        indexes.sort(key=lambda d: d["name"] or "~")  # sort None as last 
    2958        if indexes: 
    2959            return indexes 
    2960        elif not self.has_table(connection, table_name, schema): 
    2961            raise exc.NoSuchTableError( 
    2962                f"{schema}.{table_name}" if schema else table_name 
    2963            ) 
    2964        else: 
    2965            return ReflectionDefaults.indexes() 
    2966 
    2967    def _is_sys_table(self, table_name): 
    2968        return table_name in { 
    2969            "sqlite_schema", 
    2970            "sqlite_master", 
    2971            "sqlite_temp_schema", 
    2972            "sqlite_temp_master", 
    2973        } 
    2974 
    2975    @reflection.cache 
    2976    def _get_table_sql(self, connection, table_name, schema=None, **kw): 
    2977        if schema: 
    2978            schema_expr = "%s." % ( 
    2979                self.identifier_preparer.quote_identifier(schema) 
    2980            ) 
    2981        else: 
    2982            schema_expr = "" 
    2983        try: 
    2984            s = ( 
    2985                "SELECT sql FROM " 
    2986                " (SELECT * FROM %(schema)ssqlite_master UNION ALL " 
    2987                "  SELECT * FROM %(schema)ssqlite_temp_master) " 
    2988                "WHERE name = ? " 
    2989                "AND type in ('table', 'view')" % {"schema": schema_expr} 
    2990            ) 
    2991            rs = connection.exec_driver_sql(s, (table_name,)) 
    2992        except exc.DBAPIError: 
    2993            s = ( 
    2994                "SELECT sql FROM %(schema)ssqlite_master " 
    2995                "WHERE name = ? " 
    2996                "AND type in ('table', 'view')" % {"schema": schema_expr} 
    2997            ) 
    2998            rs = connection.exec_driver_sql(s, (table_name,)) 
    2999        value = rs.scalar() 
    3000        if value is None and not self._is_sys_table(table_name): 
    3001            raise exc.NoSuchTableError(f"{schema_expr}{table_name}") 
    3002        return value 
    3003 
    3004    def _get_table_pragma(self, connection, pragma, table_name, schema=None): 
    3005        quote = self.identifier_preparer.quote_identifier 
    3006        if schema is not None: 
    3007            statements = [f"PRAGMA {quote(schema)}."] 
    3008        else: 
    3009            # because PRAGMA looks in all attached databases if no schema 
    3010            # given, need to specify "main" schema, however since we want 
    3011            # 'temp' tables in the same namespace as 'main', need to run 
    3012            # the PRAGMA twice 
    3013            statements = ["PRAGMA main.", "PRAGMA temp."] 
    3014 
    3015        qtable = quote(table_name) 
    3016        for statement in statements: 
    3017            statement = f"{statement}{pragma}({qtable})" 
    3018            cursor = connection.exec_driver_sql(statement) 
    3019            if not cursor._soft_closed: 
    3020                # work around SQLite issue whereby cursor.description 
    3021                # is blank when PRAGMA returns no rows: 
    3022                # https://www.sqlite.org/cvstrac/tktview?tn=1884 
    3023                result = cursor.fetchall() 
    3024            else: 
    3025                result = [] 
    3026            if result: 
    3027                return result 
    3028        else: 
    3029            return []