Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/sqlite/base.py: 25%
724 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# sqlite/base.py
2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
8r"""
9.. dialect:: sqlite
10 :name: SQLite
11 :full_support: 3.21, 3.28+
12 :normal_support: 3.12+
13 :best_effort: 3.7.16+
15.. _sqlite_datetime:
17Date and Time Types
18-------------------
20SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does
21not provide out of the box functionality for translating values between Python
22`datetime` objects and a SQLite-supported format. SQLAlchemy's own
23:class:`~sqlalchemy.types.DateTime` and related types provide date formatting
24and parsing functionality when SQLite is used. The implementation classes are
25:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`.
26These types represent dates and times as ISO formatted strings, which also
27nicely support ordering. There's no reliance on typical "libc" internals for
28these functions so historical dates are fully supported.
30Ensuring Text affinity
31^^^^^^^^^^^^^^^^^^^^^^
33The DDL rendered for these types is the standard ``DATE``, ``TIME``
34and ``DATETIME`` indicators. However, custom storage formats can also be
35applied to these types. When the
36storage format is detected as containing no alpha characters, the DDL for
37these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``,
38so that the column continues to have textual affinity.
40.. seealso::
42 `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ -
43 in the SQLite documentation
45.. _sqlite_autoincrement:
47SQLite Auto Incrementing Behavior
48----------------------------------
50Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html
52Key concepts:
54* SQLite has an implicit "auto increment" feature that takes place for any
55 non-composite primary-key column that is specifically created using
56 "INTEGER PRIMARY KEY" for the type + primary key.
58* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not**
59 equivalent to the implicit autoincrement feature; this keyword is not
60 recommended for general use. SQLAlchemy does not render this keyword
61 unless a special SQLite-specific directive is used (see below). However,
62 it still requires that the column's type is named "INTEGER".
64Using the AUTOINCREMENT Keyword
65^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
67To specifically render the AUTOINCREMENT keyword on the primary key column
68when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table
69construct::
71 Table('sometable', metadata,
72 Column('id', Integer, primary_key=True),
73 sqlite_autoincrement=True)
75Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER
76^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
78SQLite's typing model is based on naming conventions. Among other things, this
79means that any type name which contains the substring ``"INT"`` will be
80determined to be of "integer affinity". A type named ``"BIGINT"``,
81``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be
82of "integer" affinity. However, **the SQLite autoincrement feature, whether
83implicitly or explicitly enabled, requires that the name of the column's type
84is exactly the string "INTEGER"**. Therefore, if an application uses a type
85like :class:`.BigInteger` for a primary key, on SQLite this type will need to
86be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE
87TABLE`` statement in order for the autoincrement behavior to be available.
89One approach to achieve this is to use :class:`.Integer` on SQLite
90only using :meth:`.TypeEngine.with_variant`::
92 table = Table(
93 "my_table", metadata,
94 Column("id", BigInteger().with_variant(Integer, "sqlite"), primary_key=True)
95 )
97Another is to use a subclass of :class:`.BigInteger` that overrides its DDL
98name to be ``INTEGER`` when compiled against SQLite::
100 from sqlalchemy import BigInteger
101 from sqlalchemy.ext.compiler import compiles
103 class SLBigInteger(BigInteger):
104 pass
106 @compiles(SLBigInteger, 'sqlite')
107 def bi_c(element, compiler, **kw):
108 return "INTEGER"
110 @compiles(SLBigInteger)
111 def bi_c(element, compiler, **kw):
112 return compiler.visit_BIGINT(element, **kw)
115 table = Table(
116 "my_table", metadata,
117 Column("id", SLBigInteger(), primary_key=True)
118 )
120.. seealso::
122 :meth:`.TypeEngine.with_variant`
124 :ref:`sqlalchemy.ext.compiler_toplevel`
126 `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_
128.. _sqlite_concurrency:
130Database Locking Behavior / Concurrency
131---------------------------------------
133SQLite is not designed for a high level of write concurrency. The database
134itself, being a file, is locked completely during write operations within
135transactions, meaning exactly one "connection" (in reality a file handle)
136has exclusive access to the database during this period - all other
137"connections" will be blocked during this time.
139The Python DBAPI specification also calls for a connection model that is
140always in a transaction; there is no ``connection.begin()`` method,
141only ``connection.commit()`` and ``connection.rollback()``, upon which a
142new transaction is to be begun immediately. This may seem to imply
143that the SQLite driver would in theory allow only a single filehandle on a
144particular database file at any time; however, there are several
145factors both within SQLite itself as well as within the pysqlite driver
146which loosen this restriction significantly.
148However, no matter what locking modes are used, SQLite will still always
149lock the database file once a transaction is started and DML (e.g. INSERT,
150UPDATE, DELETE) has at least been emitted, and this will block
151other transactions at least at the point that they also attempt to emit DML.
152By default, the length of time on this block is very short before it times out
153with an error.
155This behavior becomes more critical when used in conjunction with the
156SQLAlchemy ORM. SQLAlchemy's :class:`.Session` object by default runs
157within a transaction, and with its autoflush model, may emit DML preceding
158any SELECT statement. This may lead to a SQLite database that locks
159more quickly than is expected. The locking mode of SQLite and the pysqlite
160driver can be manipulated to some degree, however it should be noted that
161achieving a high degree of write-concurrency with SQLite is a losing battle.
163For more information on SQLite's lack of write concurrency by design, please
164see
165`Situations Where Another RDBMS May Work Better - High Concurrency
166<https://www.sqlite.org/whentouse.html>`_ near the bottom of the page.
168The following subsections introduce areas that are impacted by SQLite's
169file-based architecture and additionally will usually require workarounds to
170work when using the pysqlite driver.
172.. _sqlite_isolation_level:
174Transaction Isolation Level / Autocommit
175----------------------------------------
177SQLite supports "transaction isolation" in a non-standard way, along two
178axes. One is that of the
179`PRAGMA read_uncommitted <https://www.sqlite.org/pragma.html#pragma_read_uncommitted>`_
180instruction. This setting can essentially switch SQLite between its
181default mode of ``SERIALIZABLE`` isolation, and a "dirty read" isolation
182mode normally referred to as ``READ UNCOMMITTED``.
184SQLAlchemy ties into this PRAGMA statement using the
185:paramref:`_sa.create_engine.isolation_level` parameter of
186:func:`_sa.create_engine`.
187Valid values for this parameter when used with SQLite are ``"SERIALIZABLE"``
188and ``"READ UNCOMMITTED"`` corresponding to a value of 0 and 1, respectively.
189SQLite defaults to ``SERIALIZABLE``, however its behavior is impacted by
190the pysqlite driver's default behavior.
192When using the pysqlite driver, the ``"AUTOCOMMIT"`` isolation level is also
193available, which will alter the pysqlite connection using the ``.isolation_level``
194attribute on the DBAPI connection and set it to None for the duration
195of the setting.
197.. versionadded:: 1.3.16 added support for SQLite AUTOCOMMIT isolation level
198 when using the pysqlite / sqlite3 SQLite driver.
201The other axis along which SQLite's transactional locking is impacted is
202via the nature of the ``BEGIN`` statement used. The three varieties
203are "deferred", "immediate", and "exclusive", as described at
204`BEGIN TRANSACTION <https://sqlite.org/lang_transaction.html>`_. A straight
205``BEGIN`` statement uses the "deferred" mode, where the database file is
206not locked until the first read or write operation, and read access remains
207open to other transactions until the first write operation. But again,
208it is critical to note that the pysqlite driver interferes with this behavior
209by *not even emitting BEGIN* until the first write operation.
211.. warning::
213 SQLite's transactional scope is impacted by unresolved
214 issues in the pysqlite driver, which defers BEGIN statements to a greater
215 degree than is often feasible. See the section :ref:`pysqlite_serializable`
216 for techniques to work around this behavior.
218.. seealso::
220 :ref:`dbapi_autocommit`
222SAVEPOINT Support
223----------------------------
225SQLite supports SAVEPOINTs, which only function once a transaction is
226begun. SQLAlchemy's SAVEPOINT support is available using the
227:meth:`_engine.Connection.begin_nested` method at the Core level, and
228:meth:`.Session.begin_nested` at the ORM level. However, SAVEPOINTs
229won't work at all with pysqlite unless workarounds are taken.
231.. warning::
233 SQLite's SAVEPOINT feature is impacted by unresolved
234 issues in the pysqlite driver, which defers BEGIN statements to a greater
235 degree than is often feasible. See the section :ref:`pysqlite_serializable`
236 for techniques to work around this behavior.
238Transactional DDL
239----------------------------
241The SQLite database supports transactional :term:`DDL` as well.
242In this case, the pysqlite driver is not only failing to start transactions,
243it also is ending any existing transaction when DDL is detected, so again,
244workarounds are required.
246.. warning::
248 SQLite's transactional DDL is impacted by unresolved issues
249 in the pysqlite driver, which fails to emit BEGIN and additionally
250 forces a COMMIT to cancel any transaction when DDL is encountered.
251 See the section :ref:`pysqlite_serializable`
252 for techniques to work around this behavior.
254.. _sqlite_foreign_keys:
256Foreign Key Support
257-------------------
259SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables,
260however by default these constraints have no effect on the operation of the
261table.
263Constraint checking on SQLite has three prerequisites:
265* At least version 3.6.19 of SQLite must be in use
266* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY
267 or SQLITE_OMIT_TRIGGER symbols enabled.
268* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all
269 connections before use -- including the initial call to
270 :meth:`sqlalchemy.schema.MetaData.create_all`.
272SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for
273new connections through the usage of events::
275 from sqlalchemy.engine import Engine
276 from sqlalchemy import event
278 @event.listens_for(Engine, "connect")
279 def set_sqlite_pragma(dbapi_connection, connection_record):
280 cursor = dbapi_connection.cursor()
281 cursor.execute("PRAGMA foreign_keys=ON")
282 cursor.close()
284.. warning::
286 When SQLite foreign keys are enabled, it is **not possible**
287 to emit CREATE or DROP statements for tables that contain
288 mutually-dependent foreign key constraints;
289 to emit the DDL for these tables requires that ALTER TABLE be used to
290 create or drop these constraints separately, for which SQLite has
291 no support.
293.. seealso::
295 `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_
296 - on the SQLite web site.
298 :ref:`event_toplevel` - SQLAlchemy event API.
300 :ref:`use_alter` - more information on SQLAlchemy's facilities for handling
301 mutually-dependent foreign key constraints.
303.. _sqlite_on_conflict_ddl:
305ON CONFLICT support for constraints
306-----------------------------------
308.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for
309 SQLite, which occurs within a CREATE TABLE statement. For "ON CONFLICT" as
310 applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`.
312SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied
313to primary key, unique, check, and not null constraints. In DDL, it is
314rendered either within the "CONSTRAINT" clause or within the column definition
315itself depending on the location of the target constraint. To render this
316clause within DDL, the extension parameter ``sqlite_on_conflict`` can be
317specified with a string conflict resolution algorithm within the
318:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`,
319:class:`.CheckConstraint` objects. Within the :class:`_schema.Column` object,
320there
321are individual parameters ``sqlite_on_conflict_not_null``,
322``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each
323correspond to the three types of relevant constraint types that can be
324indicated from a :class:`_schema.Column` object.
326.. seealso::
328 `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite
329 documentation
331.. versionadded:: 1.3
334The ``sqlite_on_conflict`` parameters accept a string argument which is just
335the resolution name to be chosen, which on SQLite can be one of ROLLBACK,
336ABORT, FAIL, IGNORE, and REPLACE. For example, to add a UNIQUE constraint
337that specifies the IGNORE algorithm::
339 some_table = Table(
340 'some_table', metadata,
341 Column('id', Integer, primary_key=True),
342 Column('data', Integer),
343 UniqueConstraint('id', 'data', sqlite_on_conflict='IGNORE')
344 )
346The above renders CREATE TABLE DDL as::
348 CREATE TABLE some_table (
349 id INTEGER NOT NULL,
350 data INTEGER,
351 PRIMARY KEY (id),
352 UNIQUE (id, data) ON CONFLICT IGNORE
353 )
356When using the :paramref:`_schema.Column.unique`
357flag to add a UNIQUE constraint
358to a single column, the ``sqlite_on_conflict_unique`` parameter can
359be added to the :class:`_schema.Column` as well, which will be added to the
360UNIQUE constraint in the DDL::
362 some_table = Table(
363 'some_table', metadata,
364 Column('id', Integer, primary_key=True),
365 Column('data', Integer, unique=True,
366 sqlite_on_conflict_unique='IGNORE')
367 )
369rendering::
371 CREATE TABLE some_table (
372 id INTEGER NOT NULL,
373 data INTEGER,
374 PRIMARY KEY (id),
375 UNIQUE (data) ON CONFLICT IGNORE
376 )
378To apply the FAIL algorithm for a NOT NULL constraint,
379``sqlite_on_conflict_not_null`` is used::
381 some_table = Table(
382 'some_table', metadata,
383 Column('id', Integer, primary_key=True),
384 Column('data', Integer, nullable=False,
385 sqlite_on_conflict_not_null='FAIL')
386 )
388this renders the column inline ON CONFLICT phrase::
390 CREATE TABLE some_table (
391 id INTEGER NOT NULL,
392 data INTEGER NOT NULL ON CONFLICT FAIL,
393 PRIMARY KEY (id)
394 )
397Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``::
399 some_table = Table(
400 'some_table', metadata,
401 Column('id', Integer, primary_key=True,
402 sqlite_on_conflict_primary_key='FAIL')
403 )
405SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict
406resolution algorithm is applied to the constraint itself::
408 CREATE TABLE some_table (
409 id INTEGER NOT NULL,
410 PRIMARY KEY (id) ON CONFLICT FAIL
411 )
413.. _sqlite_on_conflict_insert:
415INSERT...ON CONFLICT (Upsert)
416-----------------------------------
418.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for
419 SQLite, which occurs within an INSERT statement. For "ON CONFLICT" as
420 applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`.
422From version 3.24.0 onwards, SQLite supports "upserts" (update or insert)
423of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT``
424statement. A candidate row will only be inserted if that row does not violate
425any unique or primary key constraints. In the case of a unique constraint violation, a
426secondary action can occur which can be either "DO UPDATE", indicating that
427the data in the target row should be updated, or "DO NOTHING", which indicates
428to silently skip this row.
430Conflicts are determined using columns that are part of existing unique
431constraints and indexes. These constraints are identified by stating the
432columns and conditions that comprise the indexes.
434SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific
435:func:`_sqlite.insert()` function, which provides
436the generative methods :meth:`_sqlite.Insert.on_conflict_do_update`
437and :meth:`_sqlite.Insert.on_conflict_do_nothing`:
439.. sourcecode:: pycon+sql
441 >>> from sqlalchemy.dialects.sqlite import insert
443 >>> insert_stmt = insert(my_table).values(
444 ... id='some_existing_id',
445 ... data='inserted value')
447 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
448 ... index_elements=['id'],
449 ... set_=dict(data='updated value')
450 ... )
452 >>> print(do_update_stmt)
453 {opensql}INSERT INTO my_table (id, data) VALUES (?, ?)
454 ON CONFLICT (id) DO UPDATE SET data = ?{stop}
456 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
457 ... index_elements=['id']
458 ... )
460 >>> print(do_nothing_stmt)
461 {opensql}INSERT INTO my_table (id, data) VALUES (?, ?)
462 ON CONFLICT (id) DO NOTHING
464.. versionadded:: 1.4
466.. seealso::
468 `Upsert
469 <https://sqlite.org/lang_UPSERT.html>`_
470 - in the SQLite documentation.
473Specifying the Target
474^^^^^^^^^^^^^^^^^^^^^
476Both methods supply the "target" of the conflict using column inference:
478* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument
479 specifies a sequence containing string column names, :class:`_schema.Column`
480 objects, and/or SQL expression elements, which would identify a unique index
481 or unique constraint.
483* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements`
484 to infer an index, a partial index can be inferred by also specifying the
485 :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter:
487 .. sourcecode:: pycon+sql
489 >>> stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
491 >>> do_update_stmt = stmt.on_conflict_do_update(
492 ... index_elements=[my_table.c.user_email],
493 ... index_where=my_table.c.user_email.like('%@gmail.com'),
494 ... set_=dict(data=stmt.excluded.data)
495 ... )
497 >>> print(do_update_stmt)
498 {opensql}INSERT INTO my_table (data, user_email) VALUES (?, ?)
499 ON CONFLICT (user_email)
500 WHERE user_email LIKE '%@gmail.com'
501 DO UPDATE SET data = excluded.data
502 >>>
504The SET Clause
505^^^^^^^^^^^^^^^
507``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
508existing row, using any combination of new values as well as values
509from the proposed insertion. These values are specified using the
510:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter. This
511parameter accepts a dictionary which consists of direct values
512for UPDATE:
514.. sourcecode:: pycon+sql
516 >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
518 >>> do_update_stmt = stmt.on_conflict_do_update(
519 ... index_elements=['id'],
520 ... set_=dict(data='updated value')
521 ... )
523 >>> print(do_update_stmt)
525 {opensql}INSERT INTO my_table (id, data) VALUES (?, ?)
526 ON CONFLICT (id) DO UPDATE SET data = ?
528.. warning::
530 The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take
531 into account Python-side default UPDATE values or generation functions,
532 e.g. those specified using :paramref:`_schema.Column.onupdate`. These
533 values will not be exercised for an ON CONFLICT style of UPDATE, unless
534 they are manually specified in the
535 :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary.
537Updating using the Excluded INSERT Values
538^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
540In order to refer to the proposed insertion row, the special alias
541:attr:`~.sqlite.Insert.excluded` is available as an attribute on
542the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix
543on a column, that informs the DO UPDATE to update the row with the value that
544would have been inserted had the constraint not failed:
546.. sourcecode:: pycon+sql
548 >>> stmt = insert(my_table).values(
549 ... id='some_id',
550 ... data='inserted value',
551 ... author='jlh'
552 ... )
554 >>> do_update_stmt = stmt.on_conflict_do_update(
555 ... index_elements=['id'],
556 ... set_=dict(data='updated value', author=stmt.excluded.author)
557 ... )
559 >>> print(do_update_stmt)
560 {opensql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
561 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
563Additional WHERE Criteria
564^^^^^^^^^^^^^^^^^^^^^^^^^
566The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts
567a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where`
568parameter, which will limit those rows which receive an UPDATE:
570.. sourcecode:: pycon+sql
572 >>> stmt = insert(my_table).values(
573 ... id='some_id',
574 ... data='inserted value',
575 ... author='jlh'
576 ... )
578 >>> on_update_stmt = stmt.on_conflict_do_update(
579 ... index_elements=['id'],
580 ... set_=dict(data='updated value', author=stmt.excluded.author),
581 ... where=(my_table.c.status == 2)
582 ... )
583 >>> print(on_update_stmt)
584 {opensql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
585 ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
586 WHERE my_table.status = ?
589Skipping Rows with DO NOTHING
590^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
592``ON CONFLICT`` may be used to skip inserting a row entirely
593if any conflict with a unique constraint occurs; below this is illustrated
594using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method:
596.. sourcecode:: pycon+sql
598 >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
599 >>> stmt = stmt.on_conflict_do_nothing(index_elements=['id'])
600 >>> print(stmt)
601 {opensql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING
604If ``DO NOTHING`` is used without specifying any columns or constraint,
605it has the effect of skipping the INSERT for any unique violation which
606occurs:
608.. sourcecode:: pycon+sql
610 >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
611 >>> stmt = stmt.on_conflict_do_nothing()
612 >>> print(stmt)
613 {opensql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING
615.. _sqlite_type_reflection:
617Type Reflection
618---------------
620SQLite types are unlike those of most other database backends, in that
621the string name of the type usually does not correspond to a "type" in a
622one-to-one fashion. Instead, SQLite links per-column typing behavior
623to one of five so-called "type affinities" based on a string matching
624pattern for the type.
626SQLAlchemy's reflection process, when inspecting types, uses a simple
627lookup table to link the keywords returned to provided SQLAlchemy types.
628This lookup table is present within the SQLite dialect as it is for all
629other dialects. However, the SQLite dialect has a different "fallback"
630routine for when a particular type name is not located in the lookup map;
631it instead implements the SQLite "type affinity" scheme located at
632https://www.sqlite.org/datatype3.html section 2.1.
634The provided typemap will make direct associations from an exact string
635name match for the following types:
637:class:`_types.BIGINT`, :class:`_types.BLOB`,
638:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`,
639:class:`_types.CHAR`, :class:`_types.DATE`,
640:class:`_types.DATETIME`, :class:`_types.FLOAT`,
641:class:`_types.DECIMAL`, :class:`_types.FLOAT`,
642:class:`_types.INTEGER`, :class:`_types.INTEGER`,
643:class:`_types.NUMERIC`, :class:`_types.REAL`,
644:class:`_types.SMALLINT`, :class:`_types.TEXT`,
645:class:`_types.TIME`, :class:`_types.TIMESTAMP`,
646:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`,
647:class:`_types.NCHAR`
649When a type name does not match one of the above types, the "type affinity"
650lookup is used instead:
652* :class:`_types.INTEGER` is returned if the type name includes the
653 string ``INT``
654* :class:`_types.TEXT` is returned if the type name includes the
655 string ``CHAR``, ``CLOB`` or ``TEXT``
656* :class:`_types.NullType` is returned if the type name includes the
657 string ``BLOB``
658* :class:`_types.REAL` is returned if the type name includes the string
659 ``REAL``, ``FLOA`` or ``DOUB``.
660* Otherwise, the :class:`_types.NUMERIC` type is used.
662.. versionadded:: 0.9.3 Support for SQLite type affinity rules when reflecting
663 columns.
666.. _sqlite_partial_index:
668Partial Indexes
669---------------
671A partial index, e.g. one which uses a WHERE clause, can be specified
672with the DDL system using the argument ``sqlite_where``::
674 tbl = Table('testtbl', m, Column('data', Integer))
675 idx = Index('test_idx1', tbl.c.data,
676 sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10))
678The index will be rendered at create time as::
680 CREATE INDEX test_idx1 ON testtbl (data)
681 WHERE data > 5 AND data < 10
683.. versionadded:: 0.9.9
685.. _sqlite_dotted_column_names:
687Dotted Column Names
688-------------------
690Using table or column names that explicitly have periods in them is
691**not recommended**. While this is generally a bad idea for relational
692databases in general, as the dot is a syntactically significant character,
693the SQLite driver up until version **3.10.0** of SQLite has a bug which
694requires that SQLAlchemy filter out these dots in result sets.
696.. versionchanged:: 1.1
698 The following SQLite issue has been resolved as of version 3.10.0
699 of SQLite. SQLAlchemy as of **1.1** automatically disables its internal
700 workarounds based on detection of this version.
702The bug, entirely outside of SQLAlchemy, can be illustrated thusly::
704 import sqlite3
706 assert sqlite3.sqlite_version_info < (3, 10, 0), "bug is fixed in this version"
708 conn = sqlite3.connect(":memory:")
709 cursor = conn.cursor()
711 cursor.execute("create table x (a integer, b integer)")
712 cursor.execute("insert into x (a, b) values (1, 1)")
713 cursor.execute("insert into x (a, b) values (2, 2)")
715 cursor.execute("select x.a, x.b from x")
716 assert [c[0] for c in cursor.description] == ['a', 'b']
718 cursor.execute('''
719 select x.a, x.b from x where a=1
720 union
721 select x.a, x.b from x where a=2
722 ''')
723 assert [c[0] for c in cursor.description] == ['a', 'b'], \
724 [c[0] for c in cursor.description]
726The second assertion fails::
728 Traceback (most recent call last):
729 File "test.py", line 19, in <module>
730 [c[0] for c in cursor.description]
731 AssertionError: ['x.a', 'x.b']
733Where above, the driver incorrectly reports the names of the columns
734including the name of the table, which is entirely inconsistent vs.
735when the UNION is not present.
737SQLAlchemy relies upon column names being predictable in how they match
738to the original statement, so the SQLAlchemy dialect has no choice but
739to filter these out::
742 from sqlalchemy import create_engine
744 eng = create_engine("sqlite://")
745 conn = eng.connect()
747 conn.exec_driver_sql("create table x (a integer, b integer)")
748 conn.exec_driver_sql("insert into x (a, b) values (1, 1)")
749 conn.exec_driver_sql("insert into x (a, b) values (2, 2)")
751 result = conn.exec_driver_sql("select x.a, x.b from x")
752 assert result.keys() == ["a", "b"]
754 result = conn.exec_driver_sql('''
755 select x.a, x.b from x where a=1
756 union
757 select x.a, x.b from x where a=2
758 ''')
759 assert result.keys() == ["a", "b"]
761Note that above, even though SQLAlchemy filters out the dots, *both
762names are still addressable*::
764 >>> row = result.first()
765 >>> row["a"]
766 1
767 >>> row["x.a"]
768 1
769 >>> row["b"]
770 1
771 >>> row["x.b"]
772 1
774Therefore, the workaround applied by SQLAlchemy only impacts
775:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In
776the very specific case where an application is forced to use column names that
777contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and
778:meth:`.Row.keys()` is required to return these dotted names unmodified,
779the ``sqlite_raw_colnames`` execution option may be provided, either on a
780per-:class:`_engine.Connection` basis::
782 result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql('''
783 select x.a, x.b from x where a=1
784 union
785 select x.a, x.b from x where a=2
786 ''')
787 assert result.keys() == ["x.a", "x.b"]
789or on a per-:class:`_engine.Engine` basis::
791 engine = create_engine("sqlite://", execution_options={"sqlite_raw_colnames": True})
793When using the per-:class:`_engine.Engine` execution option, note that
794**Core and ORM queries that use UNION may not function properly**.
796SQLite-specific table options
797-----------------------------
799One option for CREATE TABLE is supported directly by the SQLite
800dialect in conjunction with the :class:`_schema.Table` construct:
802* ``WITHOUT ROWID``::
804 Table("some_table", metadata, ..., sqlite_with_rowid=False)
806.. seealso::
808 `SQLite CREATE TABLE options
809 <https://www.sqlite.org/lang_createtable.html>`_
811""" # noqa
813import datetime
814import numbers
815import re
817from .json import JSON
818from .json import JSONIndexType
819from .json import JSONPathType
820from ... import exc
821from ... import processors
822from ... import schema as sa_schema
823from ... import sql
824from ... import text
825from ... import types as sqltypes
826from ... import util
827from ...engine import default
828from ...engine import reflection
829from ...sql import coercions
830from ...sql import ColumnElement
831from ...sql import compiler
832from ...sql import elements
833from ...sql import roles
834from ...sql import schema
835from ...types import BLOB # noqa
836from ...types import BOOLEAN # noqa
837from ...types import CHAR # noqa
838from ...types import DECIMAL # noqa
839from ...types import FLOAT # noqa
840from ...types import INTEGER # noqa
841from ...types import NUMERIC # noqa
842from ...types import REAL # noqa
843from ...types import SMALLINT # noqa
844from ...types import TEXT # noqa
845from ...types import TIMESTAMP # noqa
846from ...types import VARCHAR # noqa
849class _SQliteJson(JSON):
850 def result_processor(self, dialect, coltype):
851 default_processor = super(_SQliteJson, self).result_processor(
852 dialect, coltype
853 )
855 def process(value):
856 try:
857 return default_processor(value)
858 except TypeError:
859 if isinstance(value, numbers.Number):
860 return value
861 else:
862 raise
864 return process
867class _DateTimeMixin(object):
868 _reg = None
869 _storage_format = None
871 def __init__(self, storage_format=None, regexp=None, **kw):
872 super(_DateTimeMixin, self).__init__(**kw)
873 if regexp is not None:
874 self._reg = re.compile(regexp)
875 if storage_format is not None:
876 self._storage_format = storage_format
878 @property
879 def format_is_text_affinity(self):
880 """return True if the storage format will automatically imply
881 a TEXT affinity.
883 If the storage format contains no non-numeric characters,
884 it will imply a NUMERIC storage format on SQLite; in this case,
885 the type will generate its DDL as DATE_CHAR, DATETIME_CHAR,
886 TIME_CHAR.
888 .. versionadded:: 1.0.0
890 """
891 spec = self._storage_format % {
892 "year": 0,
893 "month": 0,
894 "day": 0,
895 "hour": 0,
896 "minute": 0,
897 "second": 0,
898 "microsecond": 0,
899 }
900 return bool(re.search(r"[^0-9]", spec))
902 def adapt(self, cls, **kw):
903 if issubclass(cls, _DateTimeMixin):
904 if self._storage_format:
905 kw["storage_format"] = self._storage_format
906 if self._reg:
907 kw["regexp"] = self._reg
908 return super(_DateTimeMixin, self).adapt(cls, **kw)
910 def literal_processor(self, dialect):
911 bp = self.bind_processor(dialect)
913 def process(value):
914 return "'%s'" % bp(value)
916 return process
919class DATETIME(_DateTimeMixin, sqltypes.DateTime):
920 r"""Represent a Python datetime object in SQLite using a string.
922 The default string storage format is::
924 "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
926 e.g.::
928 2021-03-15 12:05:57.105542
930 The storage format can be customized to some degree using the
931 ``storage_format`` and ``regexp`` parameters, such as::
933 import re
934 from sqlalchemy.dialects.sqlite import DATETIME
936 dt = DATETIME(storage_format="%(year)04d/%(month)02d/%(day)02d "
937 "%(hour)02d:%(minute)02d:%(second)02d",
938 regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)"
939 )
941 :param storage_format: format string which will be applied to the dict
942 with keys year, month, day, hour, minute, second, and microsecond.
944 :param regexp: regular expression which will be applied to incoming result
945 rows. If the regexp contains named groups, the resulting match dict is
946 applied to the Python datetime() constructor as keyword arguments.
947 Otherwise, if positional groups are used, the datetime() constructor
948 is called with positional arguments via
949 ``*map(int, match_obj.groups(0))``.
951 """ # noqa
953 _storage_format = (
954 "%(year)04d-%(month)02d-%(day)02d "
955 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
956 )
958 def __init__(self, *args, **kwargs):
959 truncate_microseconds = kwargs.pop("truncate_microseconds", False)
960 super(DATETIME, self).__init__(*args, **kwargs)
961 if truncate_microseconds:
962 assert "storage_format" not in kwargs, (
963 "You can specify only "
964 "one of truncate_microseconds or storage_format."
965 )
966 assert "regexp" not in kwargs, (
967 "You can specify only one of "
968 "truncate_microseconds or regexp."
969 )
970 self._storage_format = (
971 "%(year)04d-%(month)02d-%(day)02d "
972 "%(hour)02d:%(minute)02d:%(second)02d"
973 )
975 def bind_processor(self, dialect):
976 datetime_datetime = datetime.datetime
977 datetime_date = datetime.date
978 format_ = self._storage_format
980 def process(value):
981 if value is None:
982 return None
983 elif isinstance(value, datetime_datetime):
984 return format_ % {
985 "year": value.year,
986 "month": value.month,
987 "day": value.day,
988 "hour": value.hour,
989 "minute": value.minute,
990 "second": value.second,
991 "microsecond": value.microsecond,
992 }
993 elif isinstance(value, datetime_date):
994 return format_ % {
995 "year": value.year,
996 "month": value.month,
997 "day": value.day,
998 "hour": 0,
999 "minute": 0,
1000 "second": 0,
1001 "microsecond": 0,
1002 }
1003 else:
1004 raise TypeError(
1005 "SQLite DateTime type only accepts Python "
1006 "datetime and date objects as input."
1007 )
1009 return process
1011 def result_processor(self, dialect, coltype):
1012 if self._reg:
1013 return processors.str_to_datetime_processor_factory(
1014 self._reg, datetime.datetime
1015 )
1016 else:
1017 return processors.str_to_datetime
1020class DATE(_DateTimeMixin, sqltypes.Date):
1021 r"""Represent a Python date object in SQLite using a string.
1023 The default string storage format is::
1025 "%(year)04d-%(month)02d-%(day)02d"
1027 e.g.::
1029 2011-03-15
1031 The storage format can be customized to some degree using the
1032 ``storage_format`` and ``regexp`` parameters, such as::
1034 import re
1035 from sqlalchemy.dialects.sqlite import DATE
1037 d = DATE(
1038 storage_format="%(month)02d/%(day)02d/%(year)04d",
1039 regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)")
1040 )
1042 :param storage_format: format string which will be applied to the
1043 dict with keys year, month, and day.
1045 :param regexp: regular expression which will be applied to
1046 incoming result rows. If the regexp contains named groups, the
1047 resulting match dict is applied to the Python date() constructor
1048 as keyword arguments. Otherwise, if positional groups are used, the
1049 date() constructor is called with positional arguments via
1050 ``*map(int, match_obj.groups(0))``.
1051 """
1053 _storage_format = "%(year)04d-%(month)02d-%(day)02d"
1055 def bind_processor(self, dialect):
1056 datetime_date = datetime.date
1057 format_ = self._storage_format
1059 def process(value):
1060 if value is None:
1061 return None
1062 elif isinstance(value, datetime_date):
1063 return format_ % {
1064 "year": value.year,
1065 "month": value.month,
1066 "day": value.day,
1067 }
1068 else:
1069 raise TypeError(
1070 "SQLite Date type only accepts Python "
1071 "date objects as input."
1072 )
1074 return process
1076 def result_processor(self, dialect, coltype):
1077 if self._reg:
1078 return processors.str_to_datetime_processor_factory(
1079 self._reg, datetime.date
1080 )
1081 else:
1082 return processors.str_to_date
1085class TIME(_DateTimeMixin, sqltypes.Time):
1086 r"""Represent a Python time object in SQLite using a string.
1088 The default string storage format is::
1090 "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1092 e.g.::
1094 12:05:57.10558
1096 The storage format can be customized to some degree using the
1097 ``storage_format`` and ``regexp`` parameters, such as::
1099 import re
1100 from sqlalchemy.dialects.sqlite import TIME
1102 t = TIME(storage_format="%(hour)02d-%(minute)02d-"
1103 "%(second)02d-%(microsecond)06d",
1104 regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?")
1105 )
1107 :param storage_format: format string which will be applied to the dict
1108 with keys hour, minute, second, and microsecond.
1110 :param regexp: regular expression which will be applied to incoming result
1111 rows. If the regexp contains named groups, the resulting match dict is
1112 applied to the Python time() constructor as keyword arguments. Otherwise,
1113 if positional groups are used, the time() constructor is called with
1114 positional arguments via ``*map(int, match_obj.groups(0))``.
1115 """
1117 _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
1119 def __init__(self, *args, **kwargs):
1120 truncate_microseconds = kwargs.pop("truncate_microseconds", False)
1121 super(TIME, self).__init__(*args, **kwargs)
1122 if truncate_microseconds:
1123 assert "storage_format" not in kwargs, (
1124 "You can specify only "
1125 "one of truncate_microseconds or storage_format."
1126 )
1127 assert "regexp" not in kwargs, (
1128 "You can specify only one of "
1129 "truncate_microseconds or regexp."
1130 )
1131 self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d"
1133 def bind_processor(self, dialect):
1134 datetime_time = datetime.time
1135 format_ = self._storage_format
1137 def process(value):
1138 if value is None:
1139 return None
1140 elif isinstance(value, datetime_time):
1141 return format_ % {
1142 "hour": value.hour,
1143 "minute": value.minute,
1144 "second": value.second,
1145 "microsecond": value.microsecond,
1146 }
1147 else:
1148 raise TypeError(
1149 "SQLite Time type only accepts Python "
1150 "time objects as input."
1151 )
1153 return process
1155 def result_processor(self, dialect, coltype):
1156 if self._reg:
1157 return processors.str_to_datetime_processor_factory(
1158 self._reg, datetime.time
1159 )
1160 else:
1161 return processors.str_to_time
1164colspecs = {
1165 sqltypes.Date: DATE,
1166 sqltypes.DateTime: DATETIME,
1167 sqltypes.JSON: _SQliteJson,
1168 sqltypes.JSON.JSONIndexType: JSONIndexType,
1169 sqltypes.JSON.JSONPathType: JSONPathType,
1170 sqltypes.Time: TIME,
1171}
1173ischema_names = {
1174 "BIGINT": sqltypes.BIGINT,
1175 "BLOB": sqltypes.BLOB,
1176 "BOOL": sqltypes.BOOLEAN,
1177 "BOOLEAN": sqltypes.BOOLEAN,
1178 "CHAR": sqltypes.CHAR,
1179 "DATE": sqltypes.DATE,
1180 "DATE_CHAR": sqltypes.DATE,
1181 "DATETIME": sqltypes.DATETIME,
1182 "DATETIME_CHAR": sqltypes.DATETIME,
1183 "DOUBLE": sqltypes.FLOAT,
1184 "DECIMAL": sqltypes.DECIMAL,
1185 "FLOAT": sqltypes.FLOAT,
1186 "INT": sqltypes.INTEGER,
1187 "INTEGER": sqltypes.INTEGER,
1188 "JSON": JSON,
1189 "NUMERIC": sqltypes.NUMERIC,
1190 "REAL": sqltypes.REAL,
1191 "SMALLINT": sqltypes.SMALLINT,
1192 "TEXT": sqltypes.TEXT,
1193 "TIME": sqltypes.TIME,
1194 "TIME_CHAR": sqltypes.TIME,
1195 "TIMESTAMP": sqltypes.TIMESTAMP,
1196 "VARCHAR": sqltypes.VARCHAR,
1197 "NVARCHAR": sqltypes.NVARCHAR,
1198 "NCHAR": sqltypes.NCHAR,
1199}
1202class SQLiteCompiler(compiler.SQLCompiler):
1203 extract_map = util.update_copy(
1204 compiler.SQLCompiler.extract_map,
1205 {
1206 "month": "%m",
1207 "day": "%d",
1208 "year": "%Y",
1209 "second": "%S",
1210 "hour": "%H",
1211 "doy": "%j",
1212 "minute": "%M",
1213 "epoch": "%s",
1214 "dow": "%w",
1215 "week": "%W",
1216 },
1217 )
1219 def visit_now_func(self, fn, **kw):
1220 return "CURRENT_TIMESTAMP"
1222 def visit_localtimestamp_func(self, func, **kw):
1223 return 'DATETIME(CURRENT_TIMESTAMP, "localtime")'
1225 def visit_true(self, expr, **kw):
1226 return "1"
1228 def visit_false(self, expr, **kw):
1229 return "0"
1231 def visit_char_length_func(self, fn, **kw):
1232 return "length%s" % self.function_argspec(fn)
1234 def visit_cast(self, cast, **kwargs):
1235 if self.dialect.supports_cast:
1236 return super(SQLiteCompiler, self).visit_cast(cast, **kwargs)
1237 else:
1238 return self.process(cast.clause, **kwargs)
1240 def visit_extract(self, extract, **kw):
1241 try:
1242 return "CAST(STRFTIME('%s', %s) AS INTEGER)" % (
1243 self.extract_map[extract.field],
1244 self.process(extract.expr, **kw),
1245 )
1246 except KeyError as err:
1247 util.raise_(
1248 exc.CompileError(
1249 "%s is not a valid extract argument." % extract.field
1250 ),
1251 replace_context=err,
1252 )
1254 def limit_clause(self, select, **kw):
1255 text = ""
1256 if select._limit_clause is not None:
1257 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
1258 if select._offset_clause is not None:
1259 if select._limit_clause is None:
1260 text += "\n LIMIT " + self.process(sql.literal(-1))
1261 text += " OFFSET " + self.process(select._offset_clause, **kw)
1262 else:
1263 text += " OFFSET " + self.process(sql.literal(0), **kw)
1264 return text
1266 def for_update_clause(self, select, **kw):
1267 # sqlite has no "FOR UPDATE" AFAICT
1268 return ""
1270 def visit_is_distinct_from_binary(self, binary, operator, **kw):
1271 return "%s IS NOT %s" % (
1272 self.process(binary.left),
1273 self.process(binary.right),
1274 )
1276 def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
1277 return "%s IS %s" % (
1278 self.process(binary.left),
1279 self.process(binary.right),
1280 )
1282 def visit_json_getitem_op_binary(self, binary, operator, **kw):
1283 if binary.type._type_affinity is sqltypes.JSON:
1284 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
1285 else:
1286 expr = "JSON_EXTRACT(%s, %s)"
1288 return expr % (
1289 self.process(binary.left, **kw),
1290 self.process(binary.right, **kw),
1291 )
1293 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
1294 if binary.type._type_affinity is sqltypes.JSON:
1295 expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
1296 else:
1297 expr = "JSON_EXTRACT(%s, %s)"
1299 return expr % (
1300 self.process(binary.left, **kw),
1301 self.process(binary.right, **kw),
1302 )
1304 def visit_empty_set_op_expr(self, type_, expand_op):
1305 # slightly old SQLite versions don't seem to be able to handle
1306 # the empty set impl
1307 return self.visit_empty_set_expr(type_)
1309 def visit_empty_set_expr(self, element_types):
1310 return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % (
1311 ", ".join("1" for type_ in element_types or [INTEGER()]),
1312 ", ".join("1" for type_ in element_types or [INTEGER()]),
1313 )
1315 def visit_regexp_match_op_binary(self, binary, operator, **kw):
1316 return self._generate_generic_binary(binary, " REGEXP ", **kw)
1318 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
1319 return self._generate_generic_binary(binary, " NOT REGEXP ", **kw)
1321 def _on_conflict_target(self, clause, **kw):
1322 if clause.constraint_target is not None:
1323 target_text = "(%s)" % clause.constraint_target
1324 elif clause.inferred_target_elements is not None:
1325 target_text = "(%s)" % ", ".join(
1326 (
1327 self.preparer.quote(c)
1328 if isinstance(c, util.string_types)
1329 else self.process(c, include_table=False, use_schema=False)
1330 )
1331 for c in clause.inferred_target_elements
1332 )
1333 if clause.inferred_target_whereclause is not None:
1334 target_text += " WHERE %s" % self.process(
1335 clause.inferred_target_whereclause,
1336 include_table=False,
1337 use_schema=False,
1338 literal_binds=True,
1339 )
1341 else:
1342 target_text = ""
1344 return target_text
1346 def visit_on_conflict_do_nothing(self, on_conflict, **kw):
1348 target_text = self._on_conflict_target(on_conflict, **kw)
1350 if target_text:
1351 return "ON CONFLICT %s DO NOTHING" % target_text
1352 else:
1353 return "ON CONFLICT DO NOTHING"
1355 def visit_on_conflict_do_update(self, on_conflict, **kw):
1356 clause = on_conflict
1358 target_text = self._on_conflict_target(on_conflict, **kw)
1360 action_set_ops = []
1362 set_parameters = dict(clause.update_values_to_set)
1363 # create a list of column assignment clauses as tuples
1365 insert_statement = self.stack[-1]["selectable"]
1366 cols = insert_statement.table.c
1367 for c in cols:
1368 col_key = c.key
1370 if col_key in set_parameters:
1371 value = set_parameters.pop(col_key)
1372 elif c in set_parameters:
1373 value = set_parameters.pop(c)
1374 else:
1375 continue
1377 if coercions._is_literal(value):
1378 value = elements.BindParameter(None, value, type_=c.type)
1380 else:
1381 if (
1382 isinstance(value, elements.BindParameter)
1383 and value.type._isnull
1384 ):
1385 value = value._clone()
1386 value.type = c.type
1387 value_text = self.process(value.self_group(), use_schema=False)
1389 key_text = self.preparer.quote(c.name)
1390 action_set_ops.append("%s = %s" % (key_text, value_text))
1392 # check for names that don't match columns
1393 if set_parameters:
1394 util.warn(
1395 "Additional column names not matching "
1396 "any column keys in table '%s': %s"
1397 % (
1398 self.current_executable.table.name,
1399 (", ".join("'%s'" % c for c in set_parameters)),
1400 )
1401 )
1402 for k, v in set_parameters.items():
1403 key_text = (
1404 self.preparer.quote(k)
1405 if isinstance(k, util.string_types)
1406 else self.process(k, use_schema=False)
1407 )
1408 value_text = self.process(
1409 coercions.expect(roles.ExpressionElementRole, v),
1410 use_schema=False,
1411 )
1412 action_set_ops.append("%s = %s" % (key_text, value_text))
1414 action_text = ", ".join(action_set_ops)
1415 if clause.update_whereclause is not None:
1416 action_text += " WHERE %s" % self.process(
1417 clause.update_whereclause, include_table=True, use_schema=False
1418 )
1420 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
1423class SQLiteDDLCompiler(compiler.DDLCompiler):
1424 def get_column_specification(self, column, **kwargs):
1426 coltype = self.dialect.type_compiler.process(
1427 column.type, type_expression=column
1428 )
1429 colspec = self.preparer.format_column(column) + " " + coltype
1430 default = self.get_column_default_string(column)
1431 if default is not None:
1432 if isinstance(column.server_default.arg, ColumnElement):
1433 default = "(" + default + ")"
1434 colspec += " DEFAULT " + default
1436 if not column.nullable:
1437 colspec += " NOT NULL"
1439 on_conflict_clause = column.dialect_options["sqlite"][
1440 "on_conflict_not_null"
1441 ]
1442 if on_conflict_clause is not None:
1443 colspec += " ON CONFLICT " + on_conflict_clause
1445 if column.primary_key:
1446 if (
1447 column.autoincrement is True
1448 and len(column.table.primary_key.columns) != 1
1449 ):
1450 raise exc.CompileError(
1451 "SQLite does not support autoincrement for "
1452 "composite primary keys"
1453 )
1455 if (
1456 column.table.dialect_options["sqlite"]["autoincrement"]
1457 and len(column.table.primary_key.columns) == 1
1458 and issubclass(column.type._type_affinity, sqltypes.Integer)
1459 and not column.foreign_keys
1460 ):
1461 colspec += " PRIMARY KEY"
1463 on_conflict_clause = column.dialect_options["sqlite"][
1464 "on_conflict_primary_key"
1465 ]
1466 if on_conflict_clause is not None:
1467 colspec += " ON CONFLICT " + on_conflict_clause
1469 colspec += " AUTOINCREMENT"
1471 if column.computed is not None:
1472 colspec += " " + self.process(column.computed)
1474 return colspec
1476 def visit_primary_key_constraint(self, constraint):
1477 # for columns with sqlite_autoincrement=True,
1478 # the PRIMARY KEY constraint can only be inline
1479 # with the column itself.
1480 if len(constraint.columns) == 1:
1481 c = list(constraint)[0]
1482 if (
1483 c.primary_key
1484 and c.table.dialect_options["sqlite"]["autoincrement"]
1485 and issubclass(c.type._type_affinity, sqltypes.Integer)
1486 and not c.foreign_keys
1487 ):
1488 return None
1490 text = super(SQLiteDDLCompiler, self).visit_primary_key_constraint(
1491 constraint
1492 )
1494 on_conflict_clause = constraint.dialect_options["sqlite"][
1495 "on_conflict"
1496 ]
1497 if on_conflict_clause is None and len(constraint.columns) == 1:
1498 on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][
1499 "on_conflict_primary_key"
1500 ]
1502 if on_conflict_clause is not None:
1503 text += " ON CONFLICT " + on_conflict_clause
1505 return text
1507 def visit_unique_constraint(self, constraint):
1508 text = super(SQLiteDDLCompiler, self).visit_unique_constraint(
1509 constraint
1510 )
1512 on_conflict_clause = constraint.dialect_options["sqlite"][
1513 "on_conflict"
1514 ]
1515 if on_conflict_clause is None and len(constraint.columns) == 1:
1516 col1 = list(constraint)[0]
1517 if isinstance(col1, schema.SchemaItem):
1518 on_conflict_clause = list(constraint)[0].dialect_options[
1519 "sqlite"
1520 ]["on_conflict_unique"]
1522 if on_conflict_clause is not None:
1523 text += " ON CONFLICT " + on_conflict_clause
1525 return text
1527 def visit_check_constraint(self, constraint):
1528 text = super(SQLiteDDLCompiler, self).visit_check_constraint(
1529 constraint
1530 )
1532 on_conflict_clause = constraint.dialect_options["sqlite"][
1533 "on_conflict"
1534 ]
1536 if on_conflict_clause is not None:
1537 text += " ON CONFLICT " + on_conflict_clause
1539 return text
1541 def visit_column_check_constraint(self, constraint):
1542 text = super(SQLiteDDLCompiler, self).visit_column_check_constraint(
1543 constraint
1544 )
1546 if constraint.dialect_options["sqlite"]["on_conflict"] is not None:
1547 raise exc.CompileError(
1548 "SQLite does not support on conflict clause for "
1549 "column check constraint"
1550 )
1552 return text
1554 def visit_foreign_key_constraint(self, constraint):
1556 local_table = constraint.elements[0].parent.table
1557 remote_table = constraint.elements[0].column.table
1559 if local_table.schema != remote_table.schema:
1560 return None
1561 else:
1562 return super(SQLiteDDLCompiler, self).visit_foreign_key_constraint(
1563 constraint
1564 )
1566 def define_constraint_remote_table(self, constraint, table, preparer):
1567 """Format the remote table clause of a CREATE CONSTRAINT clause."""
1569 return preparer.format_table(table, use_schema=False)
1571 def visit_create_index(
1572 self, create, include_schema=False, include_table_schema=True
1573 ):
1574 index = create.element
1575 self._verify_index_table(index)
1576 preparer = self.preparer
1577 text = "CREATE "
1578 if index.unique:
1579 text += "UNIQUE "
1581 text += "INDEX "
1583 if create.if_not_exists:
1584 text += "IF NOT EXISTS "
1586 text += "%s ON %s (%s)" % (
1587 self._prepared_index_name(index, include_schema=True),
1588 preparer.format_table(index.table, use_schema=False),
1589 ", ".join(
1590 self.sql_compiler.process(
1591 expr, include_table=False, literal_binds=True
1592 )
1593 for expr in index.expressions
1594 ),
1595 )
1597 whereclause = index.dialect_options["sqlite"]["where"]
1598 if whereclause is not None:
1599 where_compiled = self.sql_compiler.process(
1600 whereclause, include_table=False, literal_binds=True
1601 )
1602 text += " WHERE " + where_compiled
1604 return text
1606 def post_create_table(self, table):
1607 if table.dialect_options["sqlite"]["with_rowid"] is False:
1608 return "\n WITHOUT ROWID"
1609 return ""
1612class SQLiteTypeCompiler(compiler.GenericTypeCompiler):
1613 def visit_large_binary(self, type_, **kw):
1614 return self.visit_BLOB(type_)
1616 def visit_DATETIME(self, type_, **kw):
1617 if (
1618 not isinstance(type_, _DateTimeMixin)
1619 or type_.format_is_text_affinity
1620 ):
1621 return super(SQLiteTypeCompiler, self).visit_DATETIME(type_)
1622 else:
1623 return "DATETIME_CHAR"
1625 def visit_DATE(self, type_, **kw):
1626 if (
1627 not isinstance(type_, _DateTimeMixin)
1628 or type_.format_is_text_affinity
1629 ):
1630 return super(SQLiteTypeCompiler, self).visit_DATE(type_)
1631 else:
1632 return "DATE_CHAR"
1634 def visit_TIME(self, type_, **kw):
1635 if (
1636 not isinstance(type_, _DateTimeMixin)
1637 or type_.format_is_text_affinity
1638 ):
1639 return super(SQLiteTypeCompiler, self).visit_TIME(type_)
1640 else:
1641 return "TIME_CHAR"
1643 def visit_JSON(self, type_, **kw):
1644 # note this name provides NUMERIC affinity, not TEXT.
1645 # should not be an issue unless the JSON value consists of a single
1646 # numeric value. JSONTEXT can be used if this case is required.
1647 return "JSON"
1650class SQLiteIdentifierPreparer(compiler.IdentifierPreparer):
1651 reserved_words = set(
1652 [
1653 "add",
1654 "after",
1655 "all",
1656 "alter",
1657 "analyze",
1658 "and",
1659 "as",
1660 "asc",
1661 "attach",
1662 "autoincrement",
1663 "before",
1664 "begin",
1665 "between",
1666 "by",
1667 "cascade",
1668 "case",
1669 "cast",
1670 "check",
1671 "collate",
1672 "column",
1673 "commit",
1674 "conflict",
1675 "constraint",
1676 "create",
1677 "cross",
1678 "current_date",
1679 "current_time",
1680 "current_timestamp",
1681 "database",
1682 "default",
1683 "deferrable",
1684 "deferred",
1685 "delete",
1686 "desc",
1687 "detach",
1688 "distinct",
1689 "drop",
1690 "each",
1691 "else",
1692 "end",
1693 "escape",
1694 "except",
1695 "exclusive",
1696 "exists",
1697 "explain",
1698 "false",
1699 "fail",
1700 "for",
1701 "foreign",
1702 "from",
1703 "full",
1704 "glob",
1705 "group",
1706 "having",
1707 "if",
1708 "ignore",
1709 "immediate",
1710 "in",
1711 "index",
1712 "indexed",
1713 "initially",
1714 "inner",
1715 "insert",
1716 "instead",
1717 "intersect",
1718 "into",
1719 "is",
1720 "isnull",
1721 "join",
1722 "key",
1723 "left",
1724 "like",
1725 "limit",
1726 "match",
1727 "natural",
1728 "not",
1729 "notnull",
1730 "null",
1731 "of",
1732 "offset",
1733 "on",
1734 "or",
1735 "order",
1736 "outer",
1737 "plan",
1738 "pragma",
1739 "primary",
1740 "query",
1741 "raise",
1742 "references",
1743 "reindex",
1744 "rename",
1745 "replace",
1746 "restrict",
1747 "right",
1748 "rollback",
1749 "row",
1750 "select",
1751 "set",
1752 "table",
1753 "temp",
1754 "temporary",
1755 "then",
1756 "to",
1757 "transaction",
1758 "trigger",
1759 "true",
1760 "union",
1761 "unique",
1762 "update",
1763 "using",
1764 "vacuum",
1765 "values",
1766 "view",
1767 "virtual",
1768 "when",
1769 "where",
1770 ]
1771 )
1774class SQLiteExecutionContext(default.DefaultExecutionContext):
1775 @util.memoized_property
1776 def _preserve_raw_colnames(self):
1777 return (
1778 not self.dialect._broken_dotted_colnames
1779 or self.execution_options.get("sqlite_raw_colnames", False)
1780 )
1782 def _translate_colname(self, colname):
1783 # TODO: detect SQLite version 3.10.0 or greater;
1784 # see [ticket:3633]
1786 # adjust for dotted column names. SQLite
1787 # in the case of UNION may store col names as
1788 # "tablename.colname", or if using an attached database,
1789 # "database.tablename.colname", in cursor.description
1790 if not self._preserve_raw_colnames and "." in colname:
1791 return colname.split(".")[-1], colname
1792 else:
1793 return colname, None
1796class SQLiteDialect(default.DefaultDialect):
1797 name = "sqlite"
1798 supports_alter = False
1799 supports_unicode_statements = True
1800 supports_unicode_binds = True
1802 # SQlite supports "DEFAULT VALUES" but *does not* support
1803 # "VALUES (DEFAULT)"
1804 supports_default_values = True
1805 supports_default_metavalue = False
1807 supports_empty_insert = False
1808 supports_cast = True
1809 supports_multivalues_insert = True
1810 tuple_in_values = True
1811 supports_statement_cache = True
1813 default_paramstyle = "qmark"
1814 execution_ctx_cls = SQLiteExecutionContext
1815 statement_compiler = SQLiteCompiler
1816 ddl_compiler = SQLiteDDLCompiler
1817 type_compiler = SQLiteTypeCompiler
1818 preparer = SQLiteIdentifierPreparer
1819 ischema_names = ischema_names
1820 colspecs = colspecs
1821 isolation_level = None
1823 construct_arguments = [
1824 (
1825 sa_schema.Table,
1826 {
1827 "autoincrement": False,
1828 "with_rowid": True,
1829 },
1830 ),
1831 (sa_schema.Index, {"where": None}),
1832 (
1833 sa_schema.Column,
1834 {
1835 "on_conflict_primary_key": None,
1836 "on_conflict_not_null": None,
1837 "on_conflict_unique": None,
1838 },
1839 ),
1840 (sa_schema.Constraint, {"on_conflict": None}),
1841 ]
1843 _broken_fk_pragma_quotes = False
1844 _broken_dotted_colnames = False
1846 @util.deprecated_params(
1847 _json_serializer=(
1848 "1.3.7",
1849 "The _json_serializer argument to the SQLite dialect has "
1850 "been renamed to the correct name of json_serializer. The old "
1851 "argument name will be removed in a future release.",
1852 ),
1853 _json_deserializer=(
1854 "1.3.7",
1855 "The _json_deserializer argument to the SQLite dialect has "
1856 "been renamed to the correct name of json_deserializer. The old "
1857 "argument name will be removed in a future release.",
1858 ),
1859 )
1860 def __init__(
1861 self,
1862 isolation_level=None,
1863 native_datetime=False,
1864 json_serializer=None,
1865 json_deserializer=None,
1866 _json_serializer=None,
1867 _json_deserializer=None,
1868 **kwargs
1869 ):
1870 default.DefaultDialect.__init__(self, **kwargs)
1871 self.isolation_level = isolation_level
1873 if _json_serializer:
1874 json_serializer = _json_serializer
1875 if _json_deserializer:
1876 json_deserializer = _json_deserializer
1877 self._json_serializer = json_serializer
1878 self._json_deserializer = json_deserializer
1880 # this flag used by pysqlite dialect, and perhaps others in the
1881 # future, to indicate the driver is handling date/timestamp
1882 # conversions (and perhaps datetime/time as well on some hypothetical
1883 # driver ?)
1884 self.native_datetime = native_datetime
1886 if self.dbapi is not None:
1887 if self.dbapi.sqlite_version_info < (3, 7, 16):
1888 util.warn(
1889 "SQLite version %s is older than 3.7.16, and will not "
1890 "support right nested joins, as are sometimes used in "
1891 "more complex ORM scenarios. SQLAlchemy 1.4 and above "
1892 "no longer tries to rewrite these joins."
1893 % (self.dbapi.sqlite_version_info,)
1894 )
1896 self._broken_dotted_colnames = self.dbapi.sqlite_version_info < (
1897 3,
1898 10,
1899 0,
1900 )
1901 self.supports_default_values = self.dbapi.sqlite_version_info >= (
1902 3,
1903 3,
1904 8,
1905 )
1906 self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3)
1907 self.supports_multivalues_insert = (
1908 # https://www.sqlite.org/releaselog/3_7_11.html
1909 self.dbapi.sqlite_version_info
1910 >= (3, 7, 11)
1911 )
1912 # see https://www.sqlalchemy.org/trac/ticket/2568
1913 # as well as https://www.sqlite.org/src/info/600482d161
1914 self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < (
1915 3,
1916 6,
1917 14,
1918 )
1920 _isolation_lookup = util.immutabledict(
1921 {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0}
1922 )
1924 def set_isolation_level(self, connection, level):
1925 try:
1926 isolation_level = self._isolation_lookup[level.replace("_", " ")]
1927 except KeyError as err:
1928 util.raise_(
1929 exc.ArgumentError(
1930 "Invalid value '%s' for isolation_level. "
1931 "Valid isolation levels for %s are %s"
1932 % (
1933 level,
1934 self.name,
1935 ", ".join(self._isolation_lookup),
1936 )
1937 ),
1938 replace_context=err,
1939 )
1940 cursor = connection.cursor()
1941 cursor.execute("PRAGMA read_uncommitted = %d" % isolation_level)
1942 cursor.close()
1944 def get_isolation_level(self, connection):
1945 cursor = connection.cursor()
1946 cursor.execute("PRAGMA read_uncommitted")
1947 res = cursor.fetchone()
1948 if res:
1949 value = res[0]
1950 else:
1951 # https://www.sqlite.org/changes.html#version_3_3_3
1952 # "Optional READ UNCOMMITTED isolation (instead of the
1953 # default isolation level of SERIALIZABLE) and
1954 # table level locking when database connections
1955 # share a common cache.""
1956 # pre-SQLite 3.3.0 default to 0
1957 value = 0
1958 cursor.close()
1959 if value == 0:
1960 return "SERIALIZABLE"
1961 elif value == 1:
1962 return "READ UNCOMMITTED"
1963 else:
1964 assert False, "Unknown isolation level %s" % value
1966 def on_connect(self):
1967 if self.isolation_level is not None:
1969 def connect(conn):
1970 self.set_isolation_level(conn, self.isolation_level)
1972 return connect
1973 else:
1974 return None
1976 @reflection.cache
1977 def get_schema_names(self, connection, **kw):
1978 s = "PRAGMA database_list"
1979 dl = connection.exec_driver_sql(s)
1981 return [db[1] for db in dl if db[1] != "temp"]
1983 @reflection.cache
1984 def get_table_names(self, connection, schema=None, **kw):
1985 if schema is not None:
1986 qschema = self.identifier_preparer.quote_identifier(schema)
1987 master = "%s.sqlite_master" % qschema
1988 else:
1989 master = "sqlite_master"
1990 s = ("SELECT name FROM %s " "WHERE type='table' ORDER BY name") % (
1991 master,
1992 )
1993 rs = connection.exec_driver_sql(s)
1994 return [row[0] for row in rs]
1996 @reflection.cache
1997 def get_temp_table_names(self, connection, **kw):
1998 s = (
1999 "SELECT name FROM sqlite_temp_master "
2000 "WHERE type='table' ORDER BY name "
2001 )
2002 rs = connection.exec_driver_sql(s)
2004 return [row[0] for row in rs]
2006 @reflection.cache
2007 def get_temp_view_names(self, connection, **kw):
2008 s = (
2009 "SELECT name FROM sqlite_temp_master "
2010 "WHERE type='view' ORDER BY name "
2011 )
2012 rs = connection.exec_driver_sql(s)
2014 return [row[0] for row in rs]
2016 def has_table(self, connection, table_name, schema=None):
2017 self._ensure_has_table_connection(connection)
2019 info = self._get_table_pragma(
2020 connection, "table_info", table_name, schema=schema
2021 )
2022 return bool(info)
2024 def _get_default_schema_name(self, connection):
2025 return "main"
2027 @reflection.cache
2028 def get_view_names(self, connection, schema=None, **kw):
2029 if schema is not None:
2030 qschema = self.identifier_preparer.quote_identifier(schema)
2031 master = "%s.sqlite_master" % qschema
2032 else:
2033 master = "sqlite_master"
2034 s = ("SELECT name FROM %s " "WHERE type='view' ORDER BY name") % (
2035 master,
2036 )
2037 rs = connection.exec_driver_sql(s)
2039 return [row[0] for row in rs]
2041 @reflection.cache
2042 def get_view_definition(self, connection, view_name, schema=None, **kw):
2043 if schema is not None:
2044 qschema = self.identifier_preparer.quote_identifier(schema)
2045 master = "%s.sqlite_master" % qschema
2046 s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % (
2047 master,
2048 )
2049 rs = connection.exec_driver_sql(s, (view_name,))
2050 else:
2051 try:
2052 s = (
2053 "SELECT sql FROM "
2054 " (SELECT * FROM sqlite_master UNION ALL "
2055 " SELECT * FROM sqlite_temp_master) "
2056 "WHERE name = ? "
2057 "AND type='view'"
2058 )
2059 rs = connection.exec_driver_sql(s, (view_name,))
2060 except exc.DBAPIError:
2061 s = (
2062 "SELECT sql FROM sqlite_master WHERE name = ? "
2063 "AND type='view'"
2064 )
2065 rs = connection.exec_driver_sql(s, (view_name,))
2067 result = rs.fetchall()
2068 if result:
2069 return result[0].sql
2071 @reflection.cache
2072 def get_columns(self, connection, table_name, schema=None, **kw):
2073 pragma = "table_info"
2074 # computed columns are threaded as hidden, they require table_xinfo
2075 if self.server_version_info >= (3, 31):
2076 pragma = "table_xinfo"
2077 info = self._get_table_pragma(
2078 connection, pragma, table_name, schema=schema
2079 )
2080 columns = []
2081 tablesql = None
2082 for row in info:
2083 name = row[1]
2084 type_ = row[2].upper()
2085 nullable = not row[3]
2086 default = row[4]
2087 primary_key = row[5]
2088 hidden = row[6] if pragma == "table_xinfo" else 0
2090 # hidden has value 0 for normal columns, 1 for hidden columns,
2091 # 2 for computed virtual columns and 3 for computed stored columns
2092 # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b
2093 if hidden == 1:
2094 continue
2096 generated = bool(hidden)
2097 persisted = hidden == 3
2099 if tablesql is None and generated:
2100 tablesql = self._get_table_sql(
2101 connection, table_name, schema, **kw
2102 )
2104 columns.append(
2105 self._get_column_info(
2106 name,
2107 type_,
2108 nullable,
2109 default,
2110 primary_key,
2111 generated,
2112 persisted,
2113 tablesql,
2114 )
2115 )
2116 return columns
2118 def _get_column_info(
2119 self,
2120 name,
2121 type_,
2122 nullable,
2123 default,
2124 primary_key,
2125 generated,
2126 persisted,
2127 tablesql,
2128 ):
2130 if generated:
2131 # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)"
2132 # somehow is "INTEGER GENERATED ALWAYS"
2133 type_ = re.sub("generated", "", type_, flags=re.IGNORECASE)
2134 type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip()
2136 coltype = self._resolve_type_affinity(type_)
2138 if default is not None:
2139 default = util.text_type(default)
2141 colspec = {
2142 "name": name,
2143 "type": coltype,
2144 "nullable": nullable,
2145 "default": default,
2146 "autoincrement": "auto",
2147 "primary_key": primary_key,
2148 }
2149 if generated:
2150 sqltext = ""
2151 if tablesql:
2152 pattern = r"[^,]*\s+AS\s+\(([^,]*)\)\s*(?:virtual|stored)?"
2153 match = re.search(
2154 re.escape(name) + pattern, tablesql, re.IGNORECASE
2155 )
2156 if match:
2157 sqltext = match.group(1)
2158 colspec["computed"] = {"sqltext": sqltext, "persisted": persisted}
2159 return colspec
2161 def _resolve_type_affinity(self, type_):
2162 """Return a data type from a reflected column, using affinity rules.
2164 SQLite's goal for universal compatibility introduces some complexity
2165 during reflection, as a column's defined type might not actually be a
2166 type that SQLite understands - or indeed, my not be defined *at all*.
2167 Internally, SQLite handles this with a 'data type affinity' for each
2168 column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER',
2169 'REAL', or 'NONE' (raw bits). The algorithm that determines this is
2170 listed in https://www.sqlite.org/datatype3.html section 2.1.
2172 This method allows SQLAlchemy to support that algorithm, while still
2173 providing access to smarter reflection utilities by recognizing
2174 column definitions that SQLite only supports through affinity (like
2175 DATE and DOUBLE).
2177 """
2178 match = re.match(r"([\w ]+)(\(.*?\))?", type_)
2179 if match:
2180 coltype = match.group(1)
2181 args = match.group(2)
2182 else:
2183 coltype = ""
2184 args = ""
2186 if coltype in self.ischema_names:
2187 coltype = self.ischema_names[coltype]
2188 elif "INT" in coltype:
2189 coltype = sqltypes.INTEGER
2190 elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype:
2191 coltype = sqltypes.TEXT
2192 elif "BLOB" in coltype or not coltype:
2193 coltype = sqltypes.NullType
2194 elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype:
2195 coltype = sqltypes.REAL
2196 else:
2197 coltype = sqltypes.NUMERIC
2199 if args is not None:
2200 args = re.findall(r"(\d+)", args)
2201 try:
2202 coltype = coltype(*[int(a) for a in args])
2203 except TypeError:
2204 util.warn(
2205 "Could not instantiate type %s with "
2206 "reflected arguments %s; using no arguments."
2207 % (coltype, args)
2208 )
2209 coltype = coltype()
2210 else:
2211 coltype = coltype()
2213 return coltype
2215 @reflection.cache
2216 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
2217 constraint_name = None
2218 table_data = self._get_table_sql(connection, table_name, schema=schema)
2219 if table_data:
2220 PK_PATTERN = r"CONSTRAINT (\w+) PRIMARY KEY"
2221 result = re.search(PK_PATTERN, table_data, re.I)
2222 constraint_name = result.group(1) if result else None
2224 cols = self.get_columns(connection, table_name, schema, **kw)
2225 cols.sort(key=lambda col: col.get("primary_key"))
2226 pkeys = []
2227 for col in cols:
2228 if col["primary_key"]:
2229 pkeys.append(col["name"])
2231 return {"constrained_columns": pkeys, "name": constraint_name}
2233 @reflection.cache
2234 def get_foreign_keys(self, connection, table_name, schema=None, **kw):
2235 # sqlite makes this *extremely difficult*.
2236 # First, use the pragma to get the actual FKs.
2237 pragma_fks = self._get_table_pragma(
2238 connection, "foreign_key_list", table_name, schema=schema
2239 )
2241 fks = {}
2243 for row in pragma_fks:
2244 (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4])
2246 if not rcol:
2247 # no referred column, which means it was not named in the
2248 # original DDL. The referred columns of the foreign key
2249 # constraint are therefore the primary key of the referred
2250 # table.
2251 referred_pk = self.get_pk_constraint(
2252 connection, rtbl, schema=schema, **kw
2253 )
2254 # note that if table doesn't exist, we still get back a record,
2255 # just it has no columns in it
2256 referred_columns = referred_pk["constrained_columns"]
2257 else:
2258 # note we use this list only if this is the first column
2259 # in the constraint. for subsequent columns we ignore the
2260 # list and append "rcol" if present.
2261 referred_columns = []
2263 if self._broken_fk_pragma_quotes:
2264 rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl)
2266 if numerical_id in fks:
2267 fk = fks[numerical_id]
2268 else:
2269 fk = fks[numerical_id] = {
2270 "name": None,
2271 "constrained_columns": [],
2272 "referred_schema": schema,
2273 "referred_table": rtbl,
2274 "referred_columns": referred_columns,
2275 "options": {},
2276 }
2277 fks[numerical_id] = fk
2279 fk["constrained_columns"].append(lcol)
2281 if rcol:
2282 fk["referred_columns"].append(rcol)
2284 def fk_sig(constrained_columns, referred_table, referred_columns):
2285 return (
2286 tuple(constrained_columns)
2287 + (referred_table,)
2288 + tuple(referred_columns)
2289 )
2291 # then, parse the actual SQL and attempt to find DDL that matches
2292 # the names as well. SQLite saves the DDL in whatever format
2293 # it was typed in as, so need to be liberal here.
2295 keys_by_signature = dict(
2296 (
2297 fk_sig(
2298 fk["constrained_columns"],
2299 fk["referred_table"],
2300 fk["referred_columns"],
2301 ),
2302 fk,
2303 )
2304 for fk in fks.values()
2305 )
2307 table_data = self._get_table_sql(connection, table_name, schema=schema)
2308 if table_data is None:
2309 # system tables, etc.
2310 return []
2312 def parse_fks():
2313 FK_PATTERN = (
2314 r"(?:CONSTRAINT (\w+) +)?"
2315 r"FOREIGN KEY *\( *(.+?) *\) +"
2316 r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\((.+?)\) *'
2317 r"((?:ON (?:DELETE|UPDATE) "
2318 r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)"
2319 r"((?:NOT +)?DEFERRABLE)?"
2320 r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?"
2321 )
2322 for match in re.finditer(FK_PATTERN, table_data, re.I):
2323 (
2324 constraint_name,
2325 constrained_columns,
2326 referred_quoted_name,
2327 referred_name,
2328 referred_columns,
2329 onupdatedelete,
2330 deferrable,
2331 initially,
2332 ) = match.group(1, 2, 3, 4, 5, 6, 7, 8)
2333 constrained_columns = list(
2334 self._find_cols_in_sig(constrained_columns)
2335 )
2336 if not referred_columns:
2337 referred_columns = constrained_columns
2338 else:
2339 referred_columns = list(
2340 self._find_cols_in_sig(referred_columns)
2341 )
2342 referred_name = referred_quoted_name or referred_name
2343 options = {}
2345 for token in re.split(r" *\bON\b *", onupdatedelete.upper()):
2346 if token.startswith("DELETE"):
2347 ondelete = token[6:].strip()
2348 if ondelete and ondelete != "NO ACTION":
2349 options["ondelete"] = ondelete
2350 elif token.startswith("UPDATE"):
2351 onupdate = token[6:].strip()
2352 if onupdate and onupdate != "NO ACTION":
2353 options["onupdate"] = onupdate
2355 if deferrable:
2356 options["deferrable"] = "NOT" not in deferrable.upper()
2357 if initially:
2358 options["initially"] = initially.upper()
2360 yield (
2361 constraint_name,
2362 constrained_columns,
2363 referred_name,
2364 referred_columns,
2365 options,
2366 )
2368 fkeys = []
2370 for (
2371 constraint_name,
2372 constrained_columns,
2373 referred_name,
2374 referred_columns,
2375 options,
2376 ) in parse_fks():
2377 sig = fk_sig(constrained_columns, referred_name, referred_columns)
2378 if sig not in keys_by_signature:
2379 util.warn(
2380 "WARNING: SQL-parsed foreign key constraint "
2381 "'%s' could not be located in PRAGMA "
2382 "foreign_keys for table %s" % (sig, table_name)
2383 )
2384 continue
2385 key = keys_by_signature.pop(sig)
2386 key["name"] = constraint_name
2387 key["options"] = options
2388 fkeys.append(key)
2389 # assume the remainders are the unnamed, inline constraints, just
2390 # use them as is as it's extremely difficult to parse inline
2391 # constraints
2392 fkeys.extend(keys_by_signature.values())
2393 return fkeys
2395 def _find_cols_in_sig(self, sig):
2396 for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I):
2397 yield match.group(1) or match.group(2)
2399 @reflection.cache
2400 def get_unique_constraints(
2401 self, connection, table_name, schema=None, **kw
2402 ):
2404 auto_index_by_sig = {}
2405 for idx in self.get_indexes(
2406 connection,
2407 table_name,
2408 schema=schema,
2409 include_auto_indexes=True,
2410 **kw
2411 ):
2412 if not idx["name"].startswith("sqlite_autoindex"):
2413 continue
2414 sig = tuple(idx["column_names"])
2415 auto_index_by_sig[sig] = idx
2417 table_data = self._get_table_sql(
2418 connection, table_name, schema=schema, **kw
2419 )
2420 if not table_data:
2421 return []
2423 unique_constraints = []
2425 def parse_uqs():
2426 UNIQUE_PATTERN = r'(?:CONSTRAINT "?(.+?)"? +)?UNIQUE *\((.+?)\)'
2427 INLINE_UNIQUE_PATTERN = (
2428 r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?) '
2429 r"+[a-z0-9_ ]+? +UNIQUE"
2430 )
2432 for match in re.finditer(UNIQUE_PATTERN, table_data, re.I):
2433 name, cols = match.group(1, 2)
2434 yield name, list(self._find_cols_in_sig(cols))
2436 # we need to match inlines as well, as we seek to differentiate
2437 # a UNIQUE constraint from a UNIQUE INDEX, even though these
2438 # are kind of the same thing :)
2439 for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I):
2440 cols = list(
2441 self._find_cols_in_sig(match.group(1) or match.group(2))
2442 )
2443 yield None, cols
2445 for name, cols in parse_uqs():
2446 sig = tuple(cols)
2447 if sig in auto_index_by_sig:
2448 auto_index_by_sig.pop(sig)
2449 parsed_constraint = {"name": name, "column_names": cols}
2450 unique_constraints.append(parsed_constraint)
2451 # NOTE: auto_index_by_sig might not be empty here,
2452 # the PRIMARY KEY may have an entry.
2453 return unique_constraints
2455 @reflection.cache
2456 def get_check_constraints(self, connection, table_name, schema=None, **kw):
2457 table_data = self._get_table_sql(
2458 connection, table_name, schema=schema, **kw
2459 )
2460 if not table_data:
2461 return []
2463 CHECK_PATTERN = r"(?:CONSTRAINT (.+) +)?" r"CHECK *\( *(.+) *\),? *"
2464 check_constraints = []
2465 # NOTE: we aren't using re.S here because we actually are
2466 # taking advantage of each CHECK constraint being all on one
2467 # line in the table definition in order to delineate. This
2468 # necessarily makes assumptions as to how the CREATE TABLE
2469 # was emitted.
2471 for match in re.finditer(CHECK_PATTERN, table_data, re.I):
2472 name = match.group(1)
2474 if name:
2475 name = re.sub(r'^"|"$', "", name)
2477 check_constraints.append({"sqltext": match.group(2), "name": name})
2479 return check_constraints
2481 @reflection.cache
2482 def get_indexes(self, connection, table_name, schema=None, **kw):
2483 pragma_indexes = self._get_table_pragma(
2484 connection, "index_list", table_name, schema=schema
2485 )
2486 indexes = []
2488 # regular expression to extract the filter predicate of a partial
2489 # index. this could fail to extract the predicate correctly on
2490 # indexes created like
2491 # CREATE INDEX i ON t (col || ') where') WHERE col <> ''
2492 # but as this function does not support expression-based indexes
2493 # this case does not occur.
2494 partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE)
2496 if schema:
2497 schema_expr = "%s." % self.identifier_preparer.quote_identifier(
2498 schema
2499 )
2500 else:
2501 schema_expr = ""
2503 include_auto_indexes = kw.pop("include_auto_indexes", False)
2504 for row in pragma_indexes:
2505 # ignore implicit primary key index.
2506 # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html
2507 if not include_auto_indexes and row[1].startswith(
2508 "sqlite_autoindex"
2509 ):
2510 continue
2511 indexes.append(
2512 dict(
2513 name=row[1],
2514 column_names=[],
2515 unique=row[2],
2516 dialect_options={},
2517 )
2518 )
2520 # check partial indexes
2521 if len(row) >= 5 and row[4]:
2522 s = (
2523 "SELECT sql FROM %(schema)ssqlite_master "
2524 "WHERE name = ? "
2525 "AND type = 'index'" % {"schema": schema_expr}
2526 )
2527 rs = connection.exec_driver_sql(s, (row[1],))
2528 index_sql = rs.scalar()
2529 predicate_match = partial_pred_re.search(index_sql)
2530 if predicate_match is None:
2531 # unless the regex is broken this case shouldn't happen
2532 # because we know this is a partial index, so the
2533 # definition sql should match the regex
2534 util.warn(
2535 "Failed to look up filter predicate of "
2536 "partial index %s" % row[1]
2537 )
2538 else:
2539 predicate = predicate_match.group(1)
2540 indexes[-1]["dialect_options"]["sqlite_where"] = text(
2541 predicate
2542 )
2544 # loop thru unique indexes to get the column names.
2545 for idx in list(indexes):
2546 pragma_index = self._get_table_pragma(
2547 connection, "index_info", idx["name"], schema=schema
2548 )
2550 for row in pragma_index:
2551 if row[2] is None:
2552 util.warn(
2553 "Skipped unsupported reflection of "
2554 "expression-based index %s" % idx["name"]
2555 )
2556 indexes.remove(idx)
2557 break
2558 else:
2559 idx["column_names"].append(row[2])
2561 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last
2562 return indexes
2564 @reflection.cache
2565 def _get_table_sql(self, connection, table_name, schema=None, **kw):
2566 if schema:
2567 schema_expr = "%s." % (
2568 self.identifier_preparer.quote_identifier(schema)
2569 )
2570 else:
2571 schema_expr = ""
2572 try:
2573 s = (
2574 "SELECT sql FROM "
2575 " (SELECT * FROM %(schema)ssqlite_master UNION ALL "
2576 " SELECT * FROM %(schema)ssqlite_temp_master) "
2577 "WHERE name = ? "
2578 "AND type = 'table'" % {"schema": schema_expr}
2579 )
2580 rs = connection.exec_driver_sql(s, (table_name,))
2581 except exc.DBAPIError:
2582 s = (
2583 "SELECT sql FROM %(schema)ssqlite_master "
2584 "WHERE name = ? "
2585 "AND type = 'table'" % {"schema": schema_expr}
2586 )
2587 rs = connection.exec_driver_sql(s, (table_name,))
2588 return rs.scalar()
2590 def _get_table_pragma(self, connection, pragma, table_name, schema=None):
2591 quote = self.identifier_preparer.quote_identifier
2592 if schema is not None:
2593 statements = ["PRAGMA %s." % quote(schema)]
2594 else:
2595 # because PRAGMA looks in all attached databases if no schema
2596 # given, need to specify "main" schema, however since we want
2597 # 'temp' tables in the same namespace as 'main', need to run
2598 # the PRAGMA twice
2599 statements = ["PRAGMA main.", "PRAGMA temp."]
2601 qtable = quote(table_name)
2602 for statement in statements:
2603 statement = "%s%s(%s)" % (statement, pragma, qtable)
2604 cursor = connection.exec_driver_sql(statement)
2605 if not cursor._soft_closed:
2606 # work around SQLite issue whereby cursor.description
2607 # is blank when PRAGMA returns no rows:
2608 # https://www.sqlite.org/cvstrac/tktview?tn=1884
2609 result = cursor.fetchall()
2610 else:
2611 result = []
2612 if result:
2613 return result
2614 else:
2615 return []