Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/mssql/base.py: 33%
1054 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# mssql/base.py
2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7"""
8.. dialect:: mssql
9 :name: Microsoft SQL Server
10 :full_support: 2017
11 :normal_support: 2012+
12 :best_effort: 2005+
14.. _mssql_external_dialects:
16External Dialects
17-----------------
19In addition to the above DBAPI layers with native SQLAlchemy support, there
20are third-party dialects for other DBAPI layers that are compatible
21with SQL Server. See the "External Dialects" list on the
22:ref:`dialect_toplevel` page.
24.. _mssql_identity:
26Auto Increment Behavior / IDENTITY Columns
27------------------------------------------
29SQL Server provides so-called "auto incrementing" behavior using the
30``IDENTITY`` construct, which can be placed on any single integer column in a
31table. SQLAlchemy considers ``IDENTITY`` within its default "autoincrement"
32behavior for an integer primary key column, described at
33:paramref:`_schema.Column.autoincrement`. This means that by default,
34the first integer primary key column in a :class:`_schema.Table` will be
35considered to be the identity column - unless it is associated with a
36:class:`.Sequence` - and will generate DDL as such::
38 from sqlalchemy import Table, MetaData, Column, Integer
40 m = MetaData()
41 t = Table('t', m,
42 Column('id', Integer, primary_key=True),
43 Column('x', Integer))
44 m.create_all(engine)
46The above example will generate DDL as:
48.. sourcecode:: sql
50 CREATE TABLE t (
51 id INTEGER NOT NULL IDENTITY,
52 x INTEGER NULL,
53 PRIMARY KEY (id)
54 )
56For the case where this default generation of ``IDENTITY`` is not desired,
57specify ``False`` for the :paramref:`_schema.Column.autoincrement` flag,
58on the first integer primary key column::
60 m = MetaData()
61 t = Table('t', m,
62 Column('id', Integer, primary_key=True, autoincrement=False),
63 Column('x', Integer))
64 m.create_all(engine)
66To add the ``IDENTITY`` keyword to a non-primary key column, specify
67``True`` for the :paramref:`_schema.Column.autoincrement` flag on the desired
68:class:`_schema.Column` object, and ensure that
69:paramref:`_schema.Column.autoincrement`
70is set to ``False`` on any integer primary key column::
72 m = MetaData()
73 t = Table('t', m,
74 Column('id', Integer, primary_key=True, autoincrement=False),
75 Column('x', Integer, autoincrement=True))
76 m.create_all(engine)
78.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
79 in a :class:`_schema.Column` to specify the start and increment
80 parameters of an IDENTITY. These replace
81 the use of the :class:`.Sequence` object in order to specify these values.
83.. deprecated:: 1.4
85 The ``mssql_identity_start`` and ``mssql_identity_increment`` parameters
86 to :class:`_schema.Column` are deprecated and should we replaced by
87 an :class:`_schema.Identity` object. Specifying both ways of configuring
88 an IDENTITY will result in a compile error.
89 These options are also no longer returned as part of the
90 ``dialect_options`` key in :meth:`_reflection.Inspector.get_columns`.
91 Use the information in the ``identity`` key instead.
93.. deprecated:: 1.3
95 The use of :class:`.Sequence` to specify IDENTITY characteristics is
96 deprecated and will be removed in a future release. Please use
97 the :class:`_schema.Identity` object parameters
98 :paramref:`_schema.Identity.start` and
99 :paramref:`_schema.Identity.increment`.
101.. versionchanged:: 1.4 Removed the ability to use a :class:`.Sequence`
102 object to modify IDENTITY characteristics. :class:`.Sequence` objects
103 now only manipulate true T-SQL SEQUENCE types.
105.. note::
107 There can only be one IDENTITY column on the table. When using
108 ``autoincrement=True`` to enable the IDENTITY keyword, SQLAlchemy does not
109 guard against multiple columns specifying the option simultaneously. The
110 SQL Server database will instead reject the ``CREATE TABLE`` statement.
112.. note::
114 An INSERT statement which attempts to provide a value for a column that is
115 marked with IDENTITY will be rejected by SQL Server. In order for the
116 value to be accepted, a session-level option "SET IDENTITY_INSERT" must be
117 enabled. The SQLAlchemy SQL Server dialect will perform this operation
118 automatically when using a core :class:`_expression.Insert`
119 construct; if the
120 execution specifies a value for the IDENTITY column, the "IDENTITY_INSERT"
121 option will be enabled for the span of that statement's invocation.However,
122 this scenario is not high performing and should not be relied upon for
123 normal use. If a table doesn't actually require IDENTITY behavior in its
124 integer primary key column, the keyword should be disabled when creating
125 the table by ensuring that ``autoincrement=False`` is set.
127Controlling "Start" and "Increment"
128^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
130Specific control over the "start" and "increment" values for
131the ``IDENTITY`` generator are provided using the
132:paramref:`_schema.Identity.start` and :paramref:`_schema.Identity.increment`
133parameters passed to the :class:`_schema.Identity` object::
135 from sqlalchemy import Table, Integer, Column, Identity
137 test = Table(
138 'test', metadata,
139 Column(
140 'id',
141 Integer,
142 primary_key=True,
143 Identity(start=100, increment=10)
144 ),
145 Column('name', String(20))
146 )
148The CREATE TABLE for the above :class:`_schema.Table` object would be:
150.. sourcecode:: sql
152 CREATE TABLE test (
153 id INTEGER NOT NULL IDENTITY(100,10) PRIMARY KEY,
154 name VARCHAR(20) NULL,
155 )
157.. note::
159 The :class:`_schema.Identity` object supports many other parameter in
160 addition to ``start`` and ``increment``. These are not supported by
161 SQL Server and will be ignored when generating the CREATE TABLE ddl.
163.. versionchanged:: 1.3.19 The :class:`_schema.Identity` object is
164 now used to affect the
165 ``IDENTITY`` generator for a :class:`_schema.Column` under SQL Server.
166 Previously, the :class:`.Sequence` object was used. As SQL Server now
167 supports real sequences as a separate construct, :class:`.Sequence` will be
168 functional in the normal way starting from SQLAlchemy version 1.4.
171Using IDENTITY with Non-Integer numeric types
172^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
174SQL Server also allows ``IDENTITY`` to be used with ``NUMERIC`` columns. To
175implement this pattern smoothly in SQLAlchemy, the primary datatype of the
176column should remain as ``Integer``, however the underlying implementation
177type deployed to the SQL Server database can be specified as ``Numeric`` using
178:meth:`.TypeEngine.with_variant`::
180 from sqlalchemy import Column
181 from sqlalchemy import Integer
182 from sqlalchemy import Numeric
183 from sqlalchemy import String
184 from sqlalchemy.ext.declarative import declarative_base
186 Base = declarative_base()
188 class TestTable(Base):
189 __tablename__ = "test"
190 id = Column(
191 Integer().with_variant(Numeric(10, 0), "mssql"),
192 primary_key=True,
193 autoincrement=True,
194 )
195 name = Column(String)
197In the above example, ``Integer().with_variant()`` provides clear usage
198information that accurately describes the intent of the code. The general
199restriction that ``autoincrement`` only applies to ``Integer`` is established
200at the metadata level and not at the per-dialect level.
202When using the above pattern, the primary key identifier that comes back from
203the insertion of a row, which is also the value that would be assigned to an
204ORM object such as ``TestTable`` above, will be an instance of ``Decimal()``
205and not ``int`` when using SQL Server. The numeric return type of the
206:class:`_types.Numeric` type can be changed to return floats by passing False
207to :paramref:`_types.Numeric.asdecimal`. To normalize the return type of the
208above ``Numeric(10, 0)`` to return Python ints (which also support "long"
209integer values in Python 3), use :class:`_types.TypeDecorator` as follows::
211 from sqlalchemy import TypeDecorator
213 class NumericAsInteger(TypeDecorator):
214 '''normalize floating point return values into ints'''
216 impl = Numeric(10, 0, asdecimal=False)
217 cache_ok = True
219 def process_result_value(self, value, dialect):
220 if value is not None:
221 value = int(value)
222 return value
224 class TestTable(Base):
225 __tablename__ = "test"
226 id = Column(
227 Integer().with_variant(NumericAsInteger, "mssql"),
228 primary_key=True,
229 autoincrement=True,
230 )
231 name = Column(String)
234INSERT behavior
235^^^^^^^^^^^^^^^^
237Handling of the ``IDENTITY`` column at INSERT time involves two key
238techniques. The most common is being able to fetch the "last inserted value"
239for a given ``IDENTITY`` column, a process which SQLAlchemy performs
240implicitly in many cases, most importantly within the ORM.
242The process for fetching this value has several variants:
244* In the vast majority of cases, RETURNING is used in conjunction with INSERT
245 statements on SQL Server in order to get newly generated primary key values:
247 .. sourcecode:: sql
249 INSERT INTO t (x) OUTPUT inserted.id VALUES (?)
251* When RETURNING is not available or has been disabled via
252 ``implicit_returning=False``, either the ``scope_identity()`` function or
253 the ``@@identity`` variable is used; behavior varies by backend:
255 * when using PyODBC, the phrase ``; select scope_identity()`` will be
256 appended to the end of the INSERT statement; a second result set will be
257 fetched in order to receive the value. Given a table as::
259 t = Table('t', m, Column('id', Integer, primary_key=True),
260 Column('x', Integer),
261 implicit_returning=False)
263 an INSERT will look like:
265 .. sourcecode:: sql
267 INSERT INTO t (x) VALUES (?); select scope_identity()
269 * Other dialects such as pymssql will call upon
270 ``SELECT scope_identity() AS lastrowid`` subsequent to an INSERT
271 statement. If the flag ``use_scope_identity=False`` is passed to
272 :func:`_sa.create_engine`,
273 the statement ``SELECT @@identity AS lastrowid``
274 is used instead.
276A table that contains an ``IDENTITY`` column will prohibit an INSERT statement
277that refers to the identity column explicitly. The SQLAlchemy dialect will
278detect when an INSERT construct, created using a core
279:func:`_expression.insert`
280construct (not a plain string SQL), refers to the identity column, and
281in this case will emit ``SET IDENTITY_INSERT ON`` prior to the insert
282statement proceeding, and ``SET IDENTITY_INSERT OFF`` subsequent to the
283execution. Given this example::
285 m = MetaData()
286 t = Table('t', m, Column('id', Integer, primary_key=True),
287 Column('x', Integer))
288 m.create_all(engine)
290 with engine.begin() as conn:
291 conn.execute(t.insert(), {'id': 1, 'x':1}, {'id':2, 'x':2})
293The above column will be created with IDENTITY, however the INSERT statement
294we emit is specifying explicit values. In the echo output we can see
295how SQLAlchemy handles this:
297.. sourcecode:: sql
299 CREATE TABLE t (
300 id INTEGER NOT NULL IDENTITY(1,1),
301 x INTEGER NULL,
302 PRIMARY KEY (id)
303 )
305 COMMIT
306 SET IDENTITY_INSERT t ON
307 INSERT INTO t (id, x) VALUES (?, ?)
308 ((1, 1), (2, 2))
309 SET IDENTITY_INSERT t OFF
310 COMMIT
314This is an auxiliary use case suitable for testing and bulk insert scenarios.
316SEQUENCE support
317----------------
319The :class:`.Sequence` object now creates "real" sequences, i.e.,
320``CREATE SEQUENCE``. To provide compatibility with other dialects,
321:class:`.Sequence` defaults to a start value of 1, even though the
322T-SQL defaults is -9223372036854775808.
324.. versionadded:: 1.4.0
326MAX on VARCHAR / NVARCHAR
327-------------------------
329SQL Server supports the special string "MAX" within the
330:class:`_types.VARCHAR` and :class:`_types.NVARCHAR` datatypes,
331to indicate "maximum length possible". The dialect currently handles this as
332a length of "None" in the base type, rather than supplying a
333dialect-specific version of these types, so that a base type
334specified such as ``VARCHAR(None)`` can assume "unlengthed" behavior on
335more than one backend without using dialect-specific types.
337To build a SQL Server VARCHAR or NVARCHAR with MAX length, use None::
339 my_table = Table(
340 'my_table', metadata,
341 Column('my_data', VARCHAR(None)),
342 Column('my_n_data', NVARCHAR(None))
343 )
346Collation Support
347-----------------
349Character collations are supported by the base string types,
350specified by the string argument "collation"::
352 from sqlalchemy import VARCHAR
353 Column('login', VARCHAR(32, collation='Latin1_General_CI_AS'))
355When such a column is associated with a :class:`_schema.Table`, the
356CREATE TABLE statement for this column will yield::
358 login VARCHAR(32) COLLATE Latin1_General_CI_AS NULL
360LIMIT/OFFSET Support
361--------------------
363MSSQL has added support for LIMIT / OFFSET as of SQL Server 2012, via the
364"OFFSET n ROWS" and "FETCH NEXT n ROWS" clauses. SQLAlchemy supports these
365syntaxes automatically if SQL Server 2012 or greater is detected.
367.. versionchanged:: 1.4 support added for SQL Server "OFFSET n ROWS" and
368 "FETCH NEXT n ROWS" syntax.
370For statements that specify only LIMIT and no OFFSET, all versions of SQL
371Server support the TOP keyword. This syntax is used for all SQL Server
372versions when no OFFSET clause is present. A statement such as::
374 select(some_table).limit(5)
376will render similarly to::
378 SELECT TOP 5 col1, col2.. FROM table
380For versions of SQL Server prior to SQL Server 2012, a statement that uses
381LIMIT and OFFSET, or just OFFSET alone, will be rendered using the
382``ROW_NUMBER()`` window function. A statement such as::
384 select(some_table).order_by(some_table.c.col3).limit(5).offset(10)
386will render similarly to::
388 SELECT anon_1.col1, anon_1.col2 FROM (SELECT col1, col2,
389 ROW_NUMBER() OVER (ORDER BY col3) AS
390 mssql_rn FROM table WHERE t.x = :x_1) AS
391 anon_1 WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1
393Note that when using LIMIT and/or OFFSET, whether using the older
394or newer SQL Server syntaxes, the statement must have an ORDER BY as well,
395else a :class:`.CompileError` is raised.
397.. _mssql_isolation_level:
399Transaction Isolation Level
400---------------------------
402All SQL Server dialects support setting of transaction isolation level
403both via a dialect-specific parameter
404:paramref:`_sa.create_engine.isolation_level`
405accepted by :func:`_sa.create_engine`,
406as well as the :paramref:`.Connection.execution_options.isolation_level`
407argument as passed to
408:meth:`_engine.Connection.execution_options`.
409This feature works by issuing the
410command ``SET TRANSACTION ISOLATION LEVEL <level>`` for
411each new connection.
413To set isolation level using :func:`_sa.create_engine`::
415 engine = create_engine(
416 "mssql+pyodbc://scott:tiger@ms_2008",
417 isolation_level="REPEATABLE READ"
418 )
420To set using per-connection execution options::
422 connection = engine.connect()
423 connection = connection.execution_options(
424 isolation_level="READ COMMITTED"
425 )
427Valid values for ``isolation_level`` include:
429* ``AUTOCOMMIT`` - pyodbc / pymssql-specific
430* ``READ COMMITTED``
431* ``READ UNCOMMITTED``
432* ``REPEATABLE READ``
433* ``SERIALIZABLE``
434* ``SNAPSHOT`` - specific to SQL Server
436There are also more options for isolation level configurations, such as
437"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply
438different isolation level settings. See the discussion at
439:ref:`dbapi_autocommit` for background.
441.. seealso::
443 :ref:`dbapi_autocommit`
445.. _mssql_reset_on_return:
447Temporary Table / Resource Reset for Connection Pooling
448-------------------------------------------------------
450The :class:`.QueuePool` connection pool implementation used
451by the SQLAlchemy :class:`_sa.Engine` object includes
452:ref:`reset on return <pool_reset_on_return>` behavior that will invoke
453the DBAPI ``.rollback()`` method when connections are returned to the pool.
454While this rollback will clear out the immediate state used by the previous
455transaction, it does not cover a wider range of session-level state, including
456temporary tables as well as other server state such as prepared statement
457handles and statement caches. An undocumented SQL Server procedure known
458as ``sp_reset_connection`` is known to be a workaround for this issue which
459will reset most of the session state that builds up on a connection, including
460temporary tables.
462To install ``sp_reset_connection`` as the means of performing reset-on-return,
463the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated in the
464example below (**requires SQLAlchemy 1.4.43 or greater**). The
465:paramref:`_sa.create_engine.pool_reset_on_return` parameter is set to ``None``
466so that the custom scheme can replace the default behavior completely. The
467custom hook implementation calls ``.rollback()`` in any case, as it's usually
468important that the DBAPI's own tracking of commit/rollback will remain
469consistent with the state of the transaction::
471 from sqlalchemy import create_engine
472 from sqlalchemy import event
474 mssql_engine = create_engine(
475 "mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server",
477 # disable default reset-on-return scheme
478 pool_reset_on_return=None,
479 )
482 @event.listens_for(mssql_engine, "reset")
483 def _reset_mssql(dbapi_connection, connection_record, reset_state):
484 dbapi_connection.execute("{call sys.sp_reset_connection}")
486 # so that the DBAPI itself knows that the connection has been
487 # reset
488 dbapi_connection.rollback()
490.. versionchanged:: 1.4.43 Ensured the :meth:`.PoolEvents.reset` event
491 is invoked for all "reset" occurrences, so that it's appropriate
492 as a place for custom "reset" handlers. Previous schemes which
493 use the :meth:`.PoolEvents.checkin` handler remain usable as well.
495.. seealso::
497 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation
499Nullability
500-----------
501MSSQL has support for three levels of column nullability. The default
502nullability allows nulls and is explicit in the CREATE TABLE
503construct::
505 name VARCHAR(20) NULL
507If ``nullable=None`` is specified then no specification is made. In
508other words the database's configured default is used. This will
509render::
511 name VARCHAR(20)
513If ``nullable`` is ``True`` or ``False`` then the column will be
514``NULL`` or ``NOT NULL`` respectively.
516Date / Time Handling
517--------------------
518DATE and TIME are supported. Bind parameters are converted
519to datetime.datetime() objects as required by most MSSQL drivers,
520and results are processed from strings if needed.
521The DATE and TIME types are not available for MSSQL 2005 and
522previous - if a server version below 2008 is detected, DDL
523for these types will be issued as DATETIME.
525.. _mssql_large_type_deprecation:
527Large Text/Binary Type Deprecation
528----------------------------------
530Per
531`SQL Server 2012/2014 Documentation <https://technet.microsoft.com/en-us/library/ms187993.aspx>`_,
532the ``NTEXT``, ``TEXT`` and ``IMAGE`` datatypes are to be removed from SQL
533Server in a future release. SQLAlchemy normally relates these types to the
534:class:`.UnicodeText`, :class:`_expression.TextClause` and
535:class:`.LargeBinary` datatypes.
537In order to accommodate this change, a new flag ``deprecate_large_types``
538is added to the dialect, which will be automatically set based on detection
539of the server version in use, if not otherwise set by the user. The
540behavior of this flag is as follows:
542* When this flag is ``True``, the :class:`.UnicodeText`,
543 :class:`_expression.TextClause` and
544 :class:`.LargeBinary` datatypes, when used to render DDL, will render the
545 types ``NVARCHAR(max)``, ``VARCHAR(max)``, and ``VARBINARY(max)``,
546 respectively. This is a new behavior as of the addition of this flag.
548* When this flag is ``False``, the :class:`.UnicodeText`,
549 :class:`_expression.TextClause` and
550 :class:`.LargeBinary` datatypes, when used to render DDL, will render the
551 types ``NTEXT``, ``TEXT``, and ``IMAGE``,
552 respectively. This is the long-standing behavior of these types.
554* The flag begins with the value ``None``, before a database connection is
555 established. If the dialect is used to render DDL without the flag being
556 set, it is interpreted the same as ``False``.
558* On first connection, the dialect detects if SQL Server version 2012 or
559 greater is in use; if the flag is still at ``None``, it sets it to ``True``
560 or ``False`` based on whether 2012 or greater is detected.
562* The flag can be set to either ``True`` or ``False`` when the dialect
563 is created, typically via :func:`_sa.create_engine`::
565 eng = create_engine("mssql+pymssql://user:pass@host/db",
566 deprecate_large_types=True)
568* Complete control over whether the "old" or "new" types are rendered is
569 available in all SQLAlchemy versions by using the UPPERCASE type objects
570 instead: :class:`_types.NVARCHAR`, :class:`_types.VARCHAR`,
571 :class:`_types.VARBINARY`, :class:`_types.TEXT`, :class:`_mssql.NTEXT`,
572 :class:`_mssql.IMAGE`
573 will always remain fixed and always output exactly that
574 type.
576.. versionadded:: 1.0.0
578.. _multipart_schema_names:
580Multipart Schema Names
581----------------------
583SQL Server schemas sometimes require multiple parts to their "schema"
584qualifier, that is, including the database name and owner name as separate
585tokens, such as ``mydatabase.dbo.some_table``. These multipart names can be set
586at once using the :paramref:`_schema.Table.schema` argument of
587:class:`_schema.Table`::
589 Table(
590 "some_table", metadata,
591 Column("q", String(50)),
592 schema="mydatabase.dbo"
593 )
595When performing operations such as table or component reflection, a schema
596argument that contains a dot will be split into separate
597"database" and "owner" components in order to correctly query the SQL
598Server information schema tables, as these two values are stored separately.
599Additionally, when rendering the schema name for DDL or SQL, the two
600components will be quoted separately for case sensitive names and other
601special characters. Given an argument as below::
603 Table(
604 "some_table", metadata,
605 Column("q", String(50)),
606 schema="MyDataBase.dbo"
607 )
609The above schema would be rendered as ``[MyDataBase].dbo``, and also in
610reflection, would be reflected using "dbo" as the owner and "MyDataBase"
611as the database name.
613To control how the schema name is broken into database / owner,
614specify brackets (which in SQL Server are quoting characters) in the name.
615Below, the "owner" will be considered as ``MyDataBase.dbo`` and the
616"database" will be None::
618 Table(
619 "some_table", metadata,
620 Column("q", String(50)),
621 schema="[MyDataBase.dbo]"
622 )
624To individually specify both database and owner name with special characters
625or embedded dots, use two sets of brackets::
627 Table(
628 "some_table", metadata,
629 Column("q", String(50)),
630 schema="[MyDataBase.Period].[MyOwner.Dot]"
631 )
634.. versionchanged:: 1.2 the SQL Server dialect now treats brackets as
635 identifier delimiters splitting the schema into separate database
636 and owner tokens, to allow dots within either name itself.
638.. _legacy_schema_rendering:
640Legacy Schema Mode
641------------------
643Very old versions of the MSSQL dialect introduced the behavior such that a
644schema-qualified table would be auto-aliased when used in a
645SELECT statement; given a table::
647 account_table = Table(
648 'account', metadata,
649 Column('id', Integer, primary_key=True),
650 Column('info', String(100)),
651 schema="customer_schema"
652 )
654this legacy mode of rendering would assume that "customer_schema.account"
655would not be accepted by all parts of the SQL statement, as illustrated
656below::
658 >>> eng = create_engine("mssql+pymssql://mydsn", legacy_schema_aliasing=True)
659 >>> print(account_table.select().compile(eng))
660 SELECT account_1.id, account_1.info
661 FROM customer_schema.account AS account_1
663This mode of behavior is now off by default, as it appears to have served
664no purpose; however in the case that legacy applications rely upon it,
665it is available using the ``legacy_schema_aliasing`` argument to
666:func:`_sa.create_engine` as illustrated above.
668.. versionchanged:: 1.1 the ``legacy_schema_aliasing`` flag introduced
669 in version 1.0.5 to allow disabling of legacy mode for schemas now
670 defaults to False.
672.. deprecated:: 1.4
674 The ``legacy_schema_aliasing`` flag is now
675 deprecated and will be removed in a future release.
677.. _mssql_indexes:
679Clustered Index Support
680-----------------------
682The MSSQL dialect supports clustered indexes (and primary keys) via the
683``mssql_clustered`` option. This option is available to :class:`.Index`,
684:class:`.UniqueConstraint`. and :class:`.PrimaryKeyConstraint`.
686To generate a clustered index::
688 Index("my_index", table.c.x, mssql_clustered=True)
690which renders the index as ``CREATE CLUSTERED INDEX my_index ON table (x)``.
692To generate a clustered primary key use::
694 Table('my_table', metadata,
695 Column('x', ...),
696 Column('y', ...),
697 PrimaryKeyConstraint("x", "y", mssql_clustered=True))
699which will render the table, for example, as::
701 CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL,
702 PRIMARY KEY CLUSTERED (x, y))
704Similarly, we can generate a clustered unique constraint using::
706 Table('my_table', metadata,
707 Column('x', ...),
708 Column('y', ...),
709 PrimaryKeyConstraint("x"),
710 UniqueConstraint("y", mssql_clustered=True),
711 )
713To explicitly request a non-clustered primary key (for example, when
714a separate clustered index is desired), use::
716 Table('my_table', metadata,
717 Column('x', ...),
718 Column('y', ...),
719 PrimaryKeyConstraint("x", "y", mssql_clustered=False))
721which will render the table, for example, as::
723 CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL,
724 PRIMARY KEY NONCLUSTERED (x, y))
726.. versionchanged:: 1.1 the ``mssql_clustered`` option now defaults
727 to None, rather than False. ``mssql_clustered=False`` now explicitly
728 renders the NONCLUSTERED clause, whereas None omits the CLUSTERED
729 clause entirely, allowing SQL Server defaults to take effect.
732MSSQL-Specific Index Options
733-----------------------------
735In addition to clustering, the MSSQL dialect supports other special options
736for :class:`.Index`.
738INCLUDE
739^^^^^^^
741The ``mssql_include`` option renders INCLUDE(colname) for the given string
742names::
744 Index("my_index", table.c.x, mssql_include=['y'])
746would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)``
748.. _mssql_index_where:
750Filtered Indexes
751^^^^^^^^^^^^^^^^
753The ``mssql_where`` option renders WHERE(condition) for the given string
754names::
756 Index("my_index", table.c.x, mssql_where=table.c.x > 10)
758would render the index as ``CREATE INDEX my_index ON table (x) WHERE x > 10``.
760.. versionadded:: 1.3.4
762Index ordering
763^^^^^^^^^^^^^^
765Index ordering is available via functional expressions, such as::
767 Index("my_index", table.c.x.desc())
769would render the index as ``CREATE INDEX my_index ON table (x DESC)``
771.. seealso::
773 :ref:`schema_indexes_functional`
775Compatibility Levels
776--------------------
777MSSQL supports the notion of setting compatibility levels at the
778database level. This allows, for instance, to run a database that
779is compatible with SQL2000 while running on a SQL2005 database
780server. ``server_version_info`` will always return the database
781server version information (in this case SQL2005) and not the
782compatibility level information. Because of this, if running under
783a backwards compatibility mode SQLAlchemy may attempt to use T-SQL
784statements that are unable to be parsed by the database server.
786Triggers
787--------
789SQLAlchemy by default uses OUTPUT INSERTED to get at newly
790generated primary key values via IDENTITY columns or other
791server side defaults. MS-SQL does not
792allow the usage of OUTPUT INSERTED on tables that have triggers.
793To disable the usage of OUTPUT INSERTED on a per-table basis,
794specify ``implicit_returning=False`` for each :class:`_schema.Table`
795which has triggers::
797 Table('mytable', metadata,
798 Column('id', Integer, primary_key=True),
799 # ...,
800 implicit_returning=False
801 )
803Declarative form::
805 class MyClass(Base):
806 # ...
807 __table_args__ = {'implicit_returning':False}
810This option can also be specified engine-wide using the
811``implicit_returning=False`` argument on :func:`_sa.create_engine`.
813.. _mssql_rowcount_versioning:
815Rowcount Support / ORM Versioning
816---------------------------------
818The SQL Server drivers may have limited ability to return the number
819of rows updated from an UPDATE or DELETE statement.
821As of this writing, the PyODBC driver is not able to return a rowcount when
822OUTPUT INSERTED is used. This impacts the SQLAlchemy ORM's versioning feature
823in many cases where server-side value generators are in use in that while the
824versioning operations can succeed, the ORM cannot always check that an UPDATE
825or DELETE statement matched the number of rows expected, which is how it
826verifies that the version identifier matched. When this condition occurs, a
827warning will be emitted but the operation will proceed.
829The use of OUTPUT INSERTED can be disabled by setting the
830:paramref:`_schema.Table.implicit_returning` flag to ``False`` on a particular
831:class:`_schema.Table`, which in declarative looks like::
833 class MyTable(Base):
834 __tablename__ = 'mytable'
835 id = Column(Integer, primary_key=True)
836 stuff = Column(String(10))
837 timestamp = Column(TIMESTAMP(), default=text('DEFAULT'))
838 __mapper_args__ = {
839 'version_id_col': timestamp,
840 'version_id_generator': False,
841 }
842 __table_args__ = {
843 'implicit_returning': False
844 }
846Enabling Snapshot Isolation
847---------------------------
849SQL Server has a default transaction
850isolation mode that locks entire tables, and causes even mildly concurrent
851applications to have long held locks and frequent deadlocks.
852Enabling snapshot isolation for the database as a whole is recommended
853for modern levels of concurrency support. This is accomplished via the
854following ALTER DATABASE commands executed at the SQL prompt::
856 ALTER DATABASE MyDatabase SET ALLOW_SNAPSHOT_ISOLATION ON
858 ALTER DATABASE MyDatabase SET READ_COMMITTED_SNAPSHOT ON
860Background on SQL Server snapshot isolation is available at
861https://msdn.microsoft.com/en-us/library/ms175095.aspx.
863""" # noqa
865import codecs
866import datetime
867import operator
868import re
870from . import information_schema as ischema
871from .json import JSON
872from .json import JSONIndexType
873from .json import JSONPathType
874from ... import exc
875from ... import Identity
876from ... import schema as sa_schema
877from ... import Sequence
878from ... import sql
879from ... import text
880from ... import types as sqltypes
881from ... import util
882from ...engine import cursor as _cursor
883from ...engine import default
884from ...engine import reflection
885from ...sql import coercions
886from ...sql import compiler
887from ...sql import elements
888from ...sql import expression
889from ...sql import func
890from ...sql import quoted_name
891from ...sql import roles
892from ...sql import util as sql_util
893from ...types import BIGINT
894from ...types import BINARY
895from ...types import CHAR
896from ...types import DATE
897from ...types import DATETIME
898from ...types import DECIMAL
899from ...types import FLOAT
900from ...types import INTEGER
901from ...types import NCHAR
902from ...types import NUMERIC
903from ...types import NVARCHAR
904from ...types import SMALLINT
905from ...types import TEXT
906from ...types import VARCHAR
907from ...util import compat
908from ...util import update_wrapper
909from ...util.langhelpers import public_factory
912# https://sqlserverbuilds.blogspot.com/
913MS_2017_VERSION = (14,)
914MS_2016_VERSION = (13,)
915MS_2014_VERSION = (12,)
916MS_2012_VERSION = (11,)
917MS_2008_VERSION = (10,)
918MS_2005_VERSION = (9,)
919MS_2000_VERSION = (8,)
921RESERVED_WORDS = set(
922 [
923 "add",
924 "all",
925 "alter",
926 "and",
927 "any",
928 "as",
929 "asc",
930 "authorization",
931 "backup",
932 "begin",
933 "between",
934 "break",
935 "browse",
936 "bulk",
937 "by",
938 "cascade",
939 "case",
940 "check",
941 "checkpoint",
942 "close",
943 "clustered",
944 "coalesce",
945 "collate",
946 "column",
947 "commit",
948 "compute",
949 "constraint",
950 "contains",
951 "containstable",
952 "continue",
953 "convert",
954 "create",
955 "cross",
956 "current",
957 "current_date",
958 "current_time",
959 "current_timestamp",
960 "current_user",
961 "cursor",
962 "database",
963 "dbcc",
964 "deallocate",
965 "declare",
966 "default",
967 "delete",
968 "deny",
969 "desc",
970 "disk",
971 "distinct",
972 "distributed",
973 "double",
974 "drop",
975 "dump",
976 "else",
977 "end",
978 "errlvl",
979 "escape",
980 "except",
981 "exec",
982 "execute",
983 "exists",
984 "exit",
985 "external",
986 "fetch",
987 "file",
988 "fillfactor",
989 "for",
990 "foreign",
991 "freetext",
992 "freetexttable",
993 "from",
994 "full",
995 "function",
996 "goto",
997 "grant",
998 "group",
999 "having",
1000 "holdlock",
1001 "identity",
1002 "identity_insert",
1003 "identitycol",
1004 "if",
1005 "in",
1006 "index",
1007 "inner",
1008 "insert",
1009 "intersect",
1010 "into",
1011 "is",
1012 "join",
1013 "key",
1014 "kill",
1015 "left",
1016 "like",
1017 "lineno",
1018 "load",
1019 "merge",
1020 "national",
1021 "nocheck",
1022 "nonclustered",
1023 "not",
1024 "null",
1025 "nullif",
1026 "of",
1027 "off",
1028 "offsets",
1029 "on",
1030 "open",
1031 "opendatasource",
1032 "openquery",
1033 "openrowset",
1034 "openxml",
1035 "option",
1036 "or",
1037 "order",
1038 "outer",
1039 "over",
1040 "percent",
1041 "pivot",
1042 "plan",
1043 "precision",
1044 "primary",
1045 "print",
1046 "proc",
1047 "procedure",
1048 "public",
1049 "raiserror",
1050 "read",
1051 "readtext",
1052 "reconfigure",
1053 "references",
1054 "replication",
1055 "restore",
1056 "restrict",
1057 "return",
1058 "revert",
1059 "revoke",
1060 "right",
1061 "rollback",
1062 "rowcount",
1063 "rowguidcol",
1064 "rule",
1065 "save",
1066 "schema",
1067 "securityaudit",
1068 "select",
1069 "session_user",
1070 "set",
1071 "setuser",
1072 "shutdown",
1073 "some",
1074 "statistics",
1075 "system_user",
1076 "table",
1077 "tablesample",
1078 "textsize",
1079 "then",
1080 "to",
1081 "top",
1082 "tran",
1083 "transaction",
1084 "trigger",
1085 "truncate",
1086 "tsequal",
1087 "union",
1088 "unique",
1089 "unpivot",
1090 "update",
1091 "updatetext",
1092 "use",
1093 "user",
1094 "values",
1095 "varying",
1096 "view",
1097 "waitfor",
1098 "when",
1099 "where",
1100 "while",
1101 "with",
1102 "writetext",
1103 ]
1104)
1107class REAL(sqltypes.REAL):
1108 __visit_name__ = "REAL"
1110 def __init__(self, **kw):
1111 # REAL is a synonym for FLOAT(24) on SQL server.
1112 # it is only accepted as the word "REAL" in DDL, the numeric
1113 # precision value is not allowed to be present
1114 kw.setdefault("precision", 24)
1115 super(REAL, self).__init__(**kw)
1118class TINYINT(sqltypes.Integer):
1119 __visit_name__ = "TINYINT"
1122# MSSQL DATE/TIME types have varied behavior, sometimes returning
1123# strings. MSDate/TIME check for everything, and always
1124# filter bind parameters into datetime objects (required by pyodbc,
1125# not sure about other dialects).
1128class _MSDate(sqltypes.Date):
1129 def bind_processor(self, dialect):
1130 def process(value):
1131 if type(value) == datetime.date:
1132 return datetime.datetime(value.year, value.month, value.day)
1133 else:
1134 return value
1136 return process
1138 _reg = re.compile(r"(\d+)-(\d+)-(\d+)")
1140 def result_processor(self, dialect, coltype):
1141 def process(value):
1142 if isinstance(value, datetime.datetime):
1143 return value.date()
1144 elif isinstance(value, util.string_types):
1145 m = self._reg.match(value)
1146 if not m:
1147 raise ValueError(
1148 "could not parse %r as a date value" % (value,)
1149 )
1150 return datetime.date(*[int(x or 0) for x in m.groups()])
1151 else:
1152 return value
1154 return process
1157class TIME(sqltypes.TIME):
1158 def __init__(self, precision=None, **kwargs):
1159 self.precision = precision
1160 super(TIME, self).__init__()
1162 __zero_date = datetime.date(1900, 1, 1)
1164 def bind_processor(self, dialect):
1165 def process(value):
1166 if isinstance(value, datetime.datetime):
1167 value = datetime.datetime.combine(
1168 self.__zero_date, value.time()
1169 )
1170 elif isinstance(value, datetime.time):
1171 """issue #5339
1172 per: https://github.com/mkleehammer/pyodbc/wiki/Tips-and-Tricks-by-Database-Platform#time-columns
1173 pass TIME value as string
1174 """ # noqa
1175 value = str(value)
1176 return value
1178 return process
1180 _reg = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d{0,6}))?")
1182 def result_processor(self, dialect, coltype):
1183 def process(value):
1184 if isinstance(value, datetime.datetime):
1185 return value.time()
1186 elif isinstance(value, util.string_types):
1187 m = self._reg.match(value)
1188 if not m:
1189 raise ValueError(
1190 "could not parse %r as a time value" % (value,)
1191 )
1192 return datetime.time(*[int(x or 0) for x in m.groups()])
1193 else:
1194 return value
1196 return process
1199_MSTime = TIME
1202class _BASETIMEIMPL(TIME):
1203 __visit_name__ = "_BASETIMEIMPL"
1206class _DateTimeBase(object):
1207 def bind_processor(self, dialect):
1208 def process(value):
1209 if type(value) == datetime.date:
1210 return datetime.datetime(value.year, value.month, value.day)
1211 else:
1212 return value
1214 return process
1217class _MSDateTime(_DateTimeBase, sqltypes.DateTime):
1218 pass
1221class SMALLDATETIME(_DateTimeBase, sqltypes.DateTime):
1222 __visit_name__ = "SMALLDATETIME"
1225class DATETIME2(_DateTimeBase, sqltypes.DateTime):
1226 __visit_name__ = "DATETIME2"
1228 def __init__(self, precision=None, **kw):
1229 super(DATETIME2, self).__init__(**kw)
1230 self.precision = precision
1233class DATETIMEOFFSET(_DateTimeBase, sqltypes.DateTime):
1234 __visit_name__ = "DATETIMEOFFSET"
1236 def __init__(self, precision=None, **kw):
1237 super(DATETIMEOFFSET, self).__init__(**kw)
1238 self.precision = precision
1241class _UnicodeLiteral(object):
1242 def literal_processor(self, dialect):
1243 def process(value):
1245 value = value.replace("'", "''")
1247 if dialect.identifier_preparer._double_percents:
1248 value = value.replace("%", "%%")
1250 return "N'%s'" % value
1252 return process
1255class _MSUnicode(_UnicodeLiteral, sqltypes.Unicode):
1256 pass
1259class _MSUnicodeText(_UnicodeLiteral, sqltypes.UnicodeText):
1260 pass
1263class TIMESTAMP(sqltypes._Binary):
1264 """Implement the SQL Server TIMESTAMP type.
1266 Note this is **completely different** than the SQL Standard
1267 TIMESTAMP type, which is not supported by SQL Server. It
1268 is a read-only datatype that does not support INSERT of values.
1270 .. versionadded:: 1.2
1272 .. seealso::
1274 :class:`_mssql.ROWVERSION`
1276 """
1278 __visit_name__ = "TIMESTAMP"
1280 # expected by _Binary to be present
1281 length = None
1283 def __init__(self, convert_int=False):
1284 """Construct a TIMESTAMP or ROWVERSION type.
1286 :param convert_int: if True, binary integer values will
1287 be converted to integers on read.
1289 .. versionadded:: 1.2
1291 """
1292 self.convert_int = convert_int
1294 def result_processor(self, dialect, coltype):
1295 super_ = super(TIMESTAMP, self).result_processor(dialect, coltype)
1296 if self.convert_int:
1298 def process(value):
1299 value = super_(value)
1300 if value is not None:
1301 # https://stackoverflow.com/a/30403242/34549
1302 value = int(codecs.encode(value, "hex"), 16)
1303 return value
1305 return process
1306 else:
1307 return super_
1310class ROWVERSION(TIMESTAMP):
1311 """Implement the SQL Server ROWVERSION type.
1313 The ROWVERSION datatype is a SQL Server synonym for the TIMESTAMP
1314 datatype, however current SQL Server documentation suggests using
1315 ROWVERSION for new datatypes going forward.
1317 The ROWVERSION datatype does **not** reflect (e.g. introspect) from the
1318 database as itself; the returned datatype will be
1319 :class:`_mssql.TIMESTAMP`.
1321 This is a read-only datatype that does not support INSERT of values.
1323 .. versionadded:: 1.2
1325 .. seealso::
1327 :class:`_mssql.TIMESTAMP`
1329 """
1331 __visit_name__ = "ROWVERSION"
1334class NTEXT(sqltypes.UnicodeText):
1336 """MSSQL NTEXT type, for variable-length unicode text up to 2^30
1337 characters."""
1339 __visit_name__ = "NTEXT"
1342class VARBINARY(sqltypes.VARBINARY, sqltypes.LargeBinary):
1343 """The MSSQL VARBINARY type.
1345 This type adds additional features to the core :class:`_types.VARBINARY`
1346 type, including "deprecate_large_types" mode where
1347 either ``VARBINARY(max)`` or IMAGE is rendered, as well as the SQL
1348 Server ``FILESTREAM`` option.
1350 .. versionadded:: 1.0.0
1352 .. seealso::
1354 :ref:`mssql_large_type_deprecation`
1356 """
1358 __visit_name__ = "VARBINARY"
1360 def __init__(self, length=None, filestream=False):
1361 """
1362 Construct a VARBINARY type.
1364 :param length: optional, a length for the column for use in
1365 DDL statements, for those binary types that accept a length,
1366 such as the MySQL BLOB type.
1368 :param filestream=False: if True, renders the ``FILESTREAM`` keyword
1369 in the table definition. In this case ``length`` must be ``None``
1370 or ``'max'``.
1372 .. versionadded:: 1.4.31
1374 """
1376 self.filestream = filestream
1377 if self.filestream and length not in (None, "max"):
1378 raise ValueError(
1379 "length must be None or 'max' when setting filestream"
1380 )
1381 super(VARBINARY, self).__init__(length=length)
1384class IMAGE(sqltypes.LargeBinary):
1385 __visit_name__ = "IMAGE"
1388class XML(sqltypes.Text):
1389 """MSSQL XML type.
1391 This is a placeholder type for reflection purposes that does not include
1392 any Python-side datatype support. It also does not currently support
1393 additional arguments, such as "CONTENT", "DOCUMENT",
1394 "xml_schema_collection".
1396 .. versionadded:: 1.1.11
1398 """
1400 __visit_name__ = "XML"
1403class BIT(sqltypes.Boolean):
1404 """MSSQL BIT type.
1406 Both pyodbc and pymssql return values from BIT columns as
1407 Python <class 'bool'> so just subclass Boolean.
1409 """
1411 __visit_name__ = "BIT"
1414class MONEY(sqltypes.TypeEngine):
1415 __visit_name__ = "MONEY"
1418class SMALLMONEY(sqltypes.TypeEngine):
1419 __visit_name__ = "SMALLMONEY"
1422class UNIQUEIDENTIFIER(sqltypes.TypeEngine):
1423 __visit_name__ = "UNIQUEIDENTIFIER"
1426class SQL_VARIANT(sqltypes.TypeEngine):
1427 __visit_name__ = "SQL_VARIANT"
1430class TryCast(sql.elements.Cast):
1431 """Represent a SQL Server TRY_CAST expression."""
1433 __visit_name__ = "try_cast"
1435 stringify_dialect = "mssql"
1436 inherit_cache = True
1438 def __init__(self, *arg, **kw):
1439 """Create a TRY_CAST expression.
1441 :class:`.TryCast` is a subclass of SQLAlchemy's :class:`.Cast`
1442 construct, and works in the same way, except that the SQL expression
1443 rendered is "TRY_CAST" rather than "CAST"::
1445 from sqlalchemy import select
1446 from sqlalchemy import Numeric
1447 from sqlalchemy.dialects.mssql import try_cast
1449 stmt = select(
1450 try_cast(product_table.c.unit_price, Numeric(10, 4))
1451 )
1453 The above would render::
1455 SELECT TRY_CAST (product_table.unit_price AS NUMERIC(10, 4))
1456 FROM product_table
1458 .. versionadded:: 1.3.7
1460 """
1461 super(TryCast, self).__init__(*arg, **kw)
1464try_cast = public_factory(TryCast, ".dialects.mssql.try_cast")
1466# old names.
1467MSDateTime = _MSDateTime
1468MSDate = _MSDate
1469MSReal = REAL
1470MSTinyInteger = TINYINT
1471MSTime = TIME
1472MSSmallDateTime = SMALLDATETIME
1473MSDateTime2 = DATETIME2
1474MSDateTimeOffset = DATETIMEOFFSET
1475MSText = TEXT
1476MSNText = NTEXT
1477MSString = VARCHAR
1478MSNVarchar = NVARCHAR
1479MSChar = CHAR
1480MSNChar = NCHAR
1481MSBinary = BINARY
1482MSVarBinary = VARBINARY
1483MSImage = IMAGE
1484MSBit = BIT
1485MSMoney = MONEY
1486MSSmallMoney = SMALLMONEY
1487MSUniqueIdentifier = UNIQUEIDENTIFIER
1488MSVariant = SQL_VARIANT
1490ischema_names = {
1491 "int": INTEGER,
1492 "bigint": BIGINT,
1493 "smallint": SMALLINT,
1494 "tinyint": TINYINT,
1495 "varchar": VARCHAR,
1496 "nvarchar": NVARCHAR,
1497 "char": CHAR,
1498 "nchar": NCHAR,
1499 "text": TEXT,
1500 "ntext": NTEXT,
1501 "decimal": DECIMAL,
1502 "numeric": NUMERIC,
1503 "float": FLOAT,
1504 "datetime": DATETIME,
1505 "datetime2": DATETIME2,
1506 "datetimeoffset": DATETIMEOFFSET,
1507 "date": DATE,
1508 "time": TIME,
1509 "smalldatetime": SMALLDATETIME,
1510 "binary": BINARY,
1511 "varbinary": VARBINARY,
1512 "bit": BIT,
1513 "real": REAL,
1514 "image": IMAGE,
1515 "xml": XML,
1516 "timestamp": TIMESTAMP,
1517 "money": MONEY,
1518 "smallmoney": SMALLMONEY,
1519 "uniqueidentifier": UNIQUEIDENTIFIER,
1520 "sql_variant": SQL_VARIANT,
1521}
1524class MSTypeCompiler(compiler.GenericTypeCompiler):
1525 def _extend(self, spec, type_, length=None):
1526 """Extend a string-type declaration with standard SQL
1527 COLLATE annotations.
1529 """
1531 if getattr(type_, "collation", None):
1532 collation = "COLLATE %s" % type_.collation
1533 else:
1534 collation = None
1536 if not length:
1537 length = type_.length
1539 if length:
1540 spec = spec + "(%s)" % length
1542 return " ".join([c for c in (spec, collation) if c is not None])
1544 def visit_FLOAT(self, type_, **kw):
1545 precision = getattr(type_, "precision", None)
1546 if precision is None:
1547 return "FLOAT"
1548 else:
1549 return "FLOAT(%(precision)s)" % {"precision": precision}
1551 def visit_TINYINT(self, type_, **kw):
1552 return "TINYINT"
1554 def visit_TIME(self, type_, **kw):
1555 precision = getattr(type_, "precision", None)
1556 if precision is not None:
1557 return "TIME(%s)" % precision
1558 else:
1559 return "TIME"
1561 def visit_TIMESTAMP(self, type_, **kw):
1562 return "TIMESTAMP"
1564 def visit_ROWVERSION(self, type_, **kw):
1565 return "ROWVERSION"
1567 def visit_datetime(self, type_, **kw):
1568 if type_.timezone:
1569 return self.visit_DATETIMEOFFSET(type_, **kw)
1570 else:
1571 return self.visit_DATETIME(type_, **kw)
1573 def visit_DATETIMEOFFSET(self, type_, **kw):
1574 precision = getattr(type_, "precision", None)
1575 if precision is not None:
1576 return "DATETIMEOFFSET(%s)" % type_.precision
1577 else:
1578 return "DATETIMEOFFSET"
1580 def visit_DATETIME2(self, type_, **kw):
1581 precision = getattr(type_, "precision", None)
1582 if precision is not None:
1583 return "DATETIME2(%s)" % precision
1584 else:
1585 return "DATETIME2"
1587 def visit_SMALLDATETIME(self, type_, **kw):
1588 return "SMALLDATETIME"
1590 def visit_unicode(self, type_, **kw):
1591 return self.visit_NVARCHAR(type_, **kw)
1593 def visit_text(self, type_, **kw):
1594 if self.dialect.deprecate_large_types:
1595 return self.visit_VARCHAR(type_, **kw)
1596 else:
1597 return self.visit_TEXT(type_, **kw)
1599 def visit_unicode_text(self, type_, **kw):
1600 if self.dialect.deprecate_large_types:
1601 return self.visit_NVARCHAR(type_, **kw)
1602 else:
1603 return self.visit_NTEXT(type_, **kw)
1605 def visit_NTEXT(self, type_, **kw):
1606 return self._extend("NTEXT", type_)
1608 def visit_TEXT(self, type_, **kw):
1609 return self._extend("TEXT", type_)
1611 def visit_VARCHAR(self, type_, **kw):
1612 return self._extend("VARCHAR", type_, length=type_.length or "max")
1614 def visit_CHAR(self, type_, **kw):
1615 return self._extend("CHAR", type_)
1617 def visit_NCHAR(self, type_, **kw):
1618 return self._extend("NCHAR", type_)
1620 def visit_NVARCHAR(self, type_, **kw):
1621 return self._extend("NVARCHAR", type_, length=type_.length or "max")
1623 def visit_date(self, type_, **kw):
1624 if self.dialect.server_version_info < MS_2008_VERSION:
1625 return self.visit_DATETIME(type_, **kw)
1626 else:
1627 return self.visit_DATE(type_, **kw)
1629 def visit__BASETIMEIMPL(self, type_, **kw):
1630 return self.visit_time(type_, **kw)
1632 def visit_time(self, type_, **kw):
1633 if self.dialect.server_version_info < MS_2008_VERSION:
1634 return self.visit_DATETIME(type_, **kw)
1635 else:
1636 return self.visit_TIME(type_, **kw)
1638 def visit_large_binary(self, type_, **kw):
1639 if self.dialect.deprecate_large_types:
1640 return self.visit_VARBINARY(type_, **kw)
1641 else:
1642 return self.visit_IMAGE(type_, **kw)
1644 def visit_IMAGE(self, type_, **kw):
1645 return "IMAGE"
1647 def visit_XML(self, type_, **kw):
1648 return "XML"
1650 def visit_VARBINARY(self, type_, **kw):
1651 text = self._extend("VARBINARY", type_, length=type_.length or "max")
1652 if getattr(type_, "filestream", False):
1653 text += " FILESTREAM"
1654 return text
1656 def visit_boolean(self, type_, **kw):
1657 return self.visit_BIT(type_)
1659 def visit_BIT(self, type_, **kw):
1660 return "BIT"
1662 def visit_JSON(self, type_, **kw):
1663 # this is a bit of a break with SQLAlchemy's convention of
1664 # "UPPERCASE name goes to UPPERCASE type name with no modification"
1665 return self._extend("NVARCHAR", type_, length="max")
1667 def visit_MONEY(self, type_, **kw):
1668 return "MONEY"
1670 def visit_SMALLMONEY(self, type_, **kw):
1671 return "SMALLMONEY"
1673 def visit_UNIQUEIDENTIFIER(self, type_, **kw):
1674 return "UNIQUEIDENTIFIER"
1676 def visit_SQL_VARIANT(self, type_, **kw):
1677 return "SQL_VARIANT"
1680class MSExecutionContext(default.DefaultExecutionContext):
1681 _enable_identity_insert = False
1682 _select_lastrowid = False
1683 _lastrowid = None
1684 _rowcount = None
1686 def _opt_encode(self, statement):
1688 if not self.dialect.supports_unicode_statements:
1689 encoded = self.dialect._encoder(statement)[0]
1690 else:
1691 encoded = statement
1693 if self.compiled and self.compiled.schema_translate_map:
1695 rst = self.compiled.preparer._render_schema_translates
1696 encoded = rst(encoded, self.compiled.schema_translate_map)
1698 return encoded
1700 def pre_exec(self):
1701 """Activate IDENTITY_INSERT if needed."""
1703 if self.isinsert:
1704 tbl = self.compiled.compile_state.dml_table
1705 id_column = tbl._autoincrement_column
1706 insert_has_identity = (id_column is not None) and (
1707 not isinstance(id_column.default, Sequence)
1708 )
1710 if insert_has_identity:
1711 compile_state = self.compiled.dml_compile_state
1712 self._enable_identity_insert = (
1713 id_column.key in self.compiled_parameters[0]
1714 ) or (
1715 compile_state._dict_parameters
1716 and (id_column.key in compile_state._insert_col_keys)
1717 )
1719 else:
1720 self._enable_identity_insert = False
1722 self._select_lastrowid = (
1723 not self.compiled.inline
1724 and insert_has_identity
1725 and not self.compiled.returning
1726 and not self._enable_identity_insert
1727 and not self.executemany
1728 )
1730 if self._enable_identity_insert:
1731 self.root_connection._cursor_execute(
1732 self.cursor,
1733 self._opt_encode(
1734 "SET IDENTITY_INSERT %s ON"
1735 % self.identifier_preparer.format_table(tbl)
1736 ),
1737 (),
1738 self,
1739 )
1741 def post_exec(self):
1742 """Disable IDENTITY_INSERT if enabled."""
1744 conn = self.root_connection
1746 if self.isinsert or self.isupdate or self.isdelete:
1747 self._rowcount = self.cursor.rowcount
1749 if self._select_lastrowid:
1750 if self.dialect.use_scope_identity:
1751 conn._cursor_execute(
1752 self.cursor,
1753 "SELECT scope_identity() AS lastrowid",
1754 (),
1755 self,
1756 )
1757 else:
1758 conn._cursor_execute(
1759 self.cursor, "SELECT @@identity AS lastrowid", (), self
1760 )
1761 # fetchall() ensures the cursor is consumed without closing it
1762 row = self.cursor.fetchall()[0]
1763 self._lastrowid = int(row[0])
1765 elif (
1766 self.isinsert or self.isupdate or self.isdelete
1767 ) and self.compiled.returning:
1768 self.cursor_fetch_strategy = (
1769 _cursor.FullyBufferedCursorFetchStrategy(
1770 self.cursor,
1771 self.cursor.description,
1772 self.cursor.fetchall(),
1773 )
1774 )
1776 if self._enable_identity_insert:
1777 conn._cursor_execute(
1778 self.cursor,
1779 self._opt_encode(
1780 "SET IDENTITY_INSERT %s OFF"
1781 % self.identifier_preparer.format_table(
1782 self.compiled.compile_state.dml_table
1783 )
1784 ),
1785 (),
1786 self,
1787 )
1789 def get_lastrowid(self):
1790 return self._lastrowid
1792 @property
1793 def rowcount(self):
1794 if self._rowcount is not None:
1795 return self._rowcount
1796 else:
1797 return self.cursor.rowcount
1799 def handle_dbapi_exception(self, e):
1800 if self._enable_identity_insert:
1801 try:
1802 self.cursor.execute(
1803 self._opt_encode(
1804 "SET IDENTITY_INSERT %s OFF"
1805 % self.identifier_preparer.format_table(
1806 self.compiled.compile_state.dml_table
1807 )
1808 )
1809 )
1810 except Exception:
1811 pass
1813 def fire_sequence(self, seq, type_):
1814 return self._execute_scalar(
1815 (
1816 "SELECT NEXT VALUE FOR %s"
1817 % self.identifier_preparer.format_sequence(seq)
1818 ),
1819 type_,
1820 )
1822 def get_insert_default(self, column):
1823 if (
1824 isinstance(column, sa_schema.Column)
1825 and column is column.table._autoincrement_column
1826 and isinstance(column.default, sa_schema.Sequence)
1827 and column.default.optional
1828 ):
1829 return None
1830 return super(MSExecutionContext, self).get_insert_default(column)
1833class MSSQLCompiler(compiler.SQLCompiler):
1834 returning_precedes_values = True
1836 extract_map = util.update_copy(
1837 compiler.SQLCompiler.extract_map,
1838 {
1839 "doy": "dayofyear",
1840 "dow": "weekday",
1841 "milliseconds": "millisecond",
1842 "microseconds": "microsecond",
1843 },
1844 )
1846 def __init__(self, *args, **kwargs):
1847 self.tablealiases = {}
1848 super(MSSQLCompiler, self).__init__(*args, **kwargs)
1850 def _with_legacy_schema_aliasing(fn):
1851 def decorate(self, *arg, **kw):
1852 if self.dialect.legacy_schema_aliasing:
1853 return fn(self, *arg, **kw)
1854 else:
1855 super_ = getattr(super(MSSQLCompiler, self), fn.__name__)
1856 return super_(*arg, **kw)
1858 return decorate
1860 def visit_now_func(self, fn, **kw):
1861 return "CURRENT_TIMESTAMP"
1863 def visit_current_date_func(self, fn, **kw):
1864 return "GETDATE()"
1866 def visit_length_func(self, fn, **kw):
1867 return "LEN%s" % self.function_argspec(fn, **kw)
1869 def visit_char_length_func(self, fn, **kw):
1870 return "LEN%s" % self.function_argspec(fn, **kw)
1872 def visit_concat_op_binary(self, binary, operator, **kw):
1873 return "%s + %s" % (
1874 self.process(binary.left, **kw),
1875 self.process(binary.right, **kw),
1876 )
1878 def visit_true(self, expr, **kw):
1879 return "1"
1881 def visit_false(self, expr, **kw):
1882 return "0"
1884 def visit_match_op_binary(self, binary, operator, **kw):
1885 return "CONTAINS (%s, %s)" % (
1886 self.process(binary.left, **kw),
1887 self.process(binary.right, **kw),
1888 )
1890 def get_select_precolumns(self, select, **kw):
1891 """MS-SQL puts TOP, it's version of LIMIT here"""
1893 s = super(MSSQLCompiler, self).get_select_precolumns(select, **kw)
1895 if select._has_row_limiting_clause and self._use_top(select):
1896 # ODBC drivers and possibly others
1897 # don't support bind params in the SELECT clause on SQL Server.
1898 # so have to use literal here.
1899 kw["literal_execute"] = True
1900 s += "TOP %s " % self.process(
1901 self._get_limit_or_fetch(select), **kw
1902 )
1903 if select._fetch_clause is not None:
1904 if select._fetch_clause_options["percent"]:
1905 s += "PERCENT "
1906 if select._fetch_clause_options["with_ties"]:
1907 s += "WITH TIES "
1909 return s
1911 def get_from_hint_text(self, table, text):
1912 return text
1914 def get_crud_hint_text(self, table, text):
1915 return text
1917 def _get_limit_or_fetch(self, select):
1918 if select._fetch_clause is None:
1919 return select._limit_clause
1920 else:
1921 return select._fetch_clause
1923 def _use_top(self, select):
1924 return (select._offset_clause is None) and (
1925 select._simple_int_clause(select._limit_clause)
1926 or (
1927 # limit can use TOP with is by itself. fetch only uses TOP
1928 # when it needs to because of PERCENT and/or WITH TIES
1929 select._simple_int_clause(select._fetch_clause)
1930 and (
1931 select._fetch_clause_options["percent"]
1932 or select._fetch_clause_options["with_ties"]
1933 )
1934 )
1935 )
1937 def fetch_clause(self, cs, **kwargs):
1938 return ""
1940 def limit_clause(self, cs, **kwargs):
1941 return ""
1943 def _check_can_use_fetch_limit(self, select):
1944 # to use ROW_NUMBER(), an ORDER BY is required.
1945 # OFFSET are FETCH are options of the ORDER BY clause
1946 if not select._order_by_clause.clauses:
1947 raise exc.CompileError(
1948 "MSSQL requires an order_by when "
1949 "using an OFFSET or a non-simple "
1950 "LIMIT clause"
1951 )
1953 if select._fetch_clause_options is not None and (
1954 select._fetch_clause_options["percent"]
1955 or select._fetch_clause_options["with_ties"]
1956 ):
1957 raise exc.CompileError(
1958 "MSSQL needs TOP to use PERCENT and/or WITH TIES. "
1959 "Only simple fetch without offset can be used."
1960 )
1962 def _row_limit_clause(self, select, **kw):
1963 """MSSQL 2012 supports OFFSET/FETCH operators
1964 Use it instead subquery with row_number
1966 """
1968 if self.dialect._supports_offset_fetch and not self._use_top(select):
1969 self._check_can_use_fetch_limit(select)
1971 text = ""
1973 if select._offset_clause is not None:
1974 offset_str = self.process(select._offset_clause, **kw)
1975 else:
1976 offset_str = "0"
1977 text += "\n OFFSET %s ROWS" % offset_str
1979 limit = self._get_limit_or_fetch(select)
1981 if limit is not None:
1982 text += "\n FETCH FIRST %s ROWS ONLY" % self.process(
1983 limit, **kw
1984 )
1985 return text
1986 else:
1987 return ""
1989 def visit_try_cast(self, element, **kw):
1990 return "TRY_CAST (%s AS %s)" % (
1991 self.process(element.clause, **kw),
1992 self.process(element.typeclause, **kw),
1993 )
1995 def translate_select_structure(self, select_stmt, **kwargs):
1996 """Look for ``LIMIT`` and OFFSET in a select statement, and if
1997 so tries to wrap it in a subquery with ``row_number()`` criterion.
1998 MSSQL 2012 and above are excluded
2000 """
2001 select = select_stmt
2003 if (
2004 select._has_row_limiting_clause
2005 and not self.dialect._supports_offset_fetch
2006 and not self._use_top(select)
2007 and not getattr(select, "_mssql_visit", None)
2008 ):
2009 self._check_can_use_fetch_limit(select)
2011 _order_by_clauses = [
2012 sql_util.unwrap_label_reference(elem)
2013 for elem in select._order_by_clause.clauses
2014 ]
2016 limit_clause = self._get_limit_or_fetch(select)
2017 offset_clause = select._offset_clause
2019 select = select._generate()
2020 select._mssql_visit = True
2021 select = (
2022 select.add_columns(
2023 sql.func.ROW_NUMBER()
2024 .over(order_by=_order_by_clauses)
2025 .label("mssql_rn")
2026 )
2027 .order_by(None)
2028 .alias()
2029 )
2031 mssql_rn = sql.column("mssql_rn")
2032 limitselect = sql.select(
2033 *[c for c in select.c if c.key != "mssql_rn"]
2034 )
2035 if offset_clause is not None:
2036 limitselect = limitselect.where(mssql_rn > offset_clause)
2037 if limit_clause is not None:
2038 limitselect = limitselect.where(
2039 mssql_rn <= (limit_clause + offset_clause)
2040 )
2041 else:
2042 limitselect = limitselect.where(mssql_rn <= (limit_clause))
2043 return limitselect
2044 else:
2045 return select
2047 @_with_legacy_schema_aliasing
2048 def visit_table(self, table, mssql_aliased=False, iscrud=False, **kwargs):
2049 if mssql_aliased is table or iscrud:
2050 return super(MSSQLCompiler, self).visit_table(table, **kwargs)
2052 # alias schema-qualified tables
2053 alias = self._schema_aliased_table(table)
2054 if alias is not None:
2055 return self.process(alias, mssql_aliased=table, **kwargs)
2056 else:
2057 return super(MSSQLCompiler, self).visit_table(table, **kwargs)
2059 @_with_legacy_schema_aliasing
2060 def visit_alias(self, alias, **kw):
2061 # translate for schema-qualified table aliases
2062 kw["mssql_aliased"] = alias.element
2063 return super(MSSQLCompiler, self).visit_alias(alias, **kw)
2065 @_with_legacy_schema_aliasing
2066 def visit_column(self, column, add_to_result_map=None, **kw):
2067 if (
2068 column.table is not None
2069 and (not self.isupdate and not self.isdelete)
2070 or self.is_subquery()
2071 ):
2072 # translate for schema-qualified table aliases
2073 t = self._schema_aliased_table(column.table)
2074 if t is not None:
2075 converted = elements._corresponding_column_or_error(t, column)
2076 if add_to_result_map is not None:
2077 add_to_result_map(
2078 column.name,
2079 column.name,
2080 (column, column.name, column.key),
2081 column.type,
2082 )
2084 return super(MSSQLCompiler, self).visit_column(converted, **kw)
2086 return super(MSSQLCompiler, self).visit_column(
2087 column, add_to_result_map=add_to_result_map, **kw
2088 )
2090 def _schema_aliased_table(self, table):
2091 if getattr(table, "schema", None) is not None:
2092 if table not in self.tablealiases:
2093 self.tablealiases[table] = table.alias()
2094 return self.tablealiases[table]
2095 else:
2096 return None
2098 def visit_extract(self, extract, **kw):
2099 field = self.extract_map.get(extract.field, extract.field)
2100 return "DATEPART(%s, %s)" % (field, self.process(extract.expr, **kw))
2102 def visit_savepoint(self, savepoint_stmt):
2103 return "SAVE TRANSACTION %s" % self.preparer.format_savepoint(
2104 savepoint_stmt
2105 )
2107 def visit_rollback_to_savepoint(self, savepoint_stmt):
2108 return "ROLLBACK TRANSACTION %s" % self.preparer.format_savepoint(
2109 savepoint_stmt
2110 )
2112 def visit_binary(self, binary, **kwargs):
2113 """Move bind parameters to the right-hand side of an operator, where
2114 possible.
2116 """
2117 if (
2118 isinstance(binary.left, expression.BindParameter)
2119 and binary.operator == operator.eq
2120 and not isinstance(binary.right, expression.BindParameter)
2121 ):
2122 return self.process(
2123 expression.BinaryExpression(
2124 binary.right, binary.left, binary.operator
2125 ),
2126 **kwargs
2127 )
2128 return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
2130 def returning_clause(self, stmt, returning_cols):
2131 # SQL server returning clause requires that the columns refer to
2132 # the virtual table names "inserted" or "deleted". Here, we make
2133 # a simple alias of our table with that name, and then adapt the
2134 # columns we have from the list of RETURNING columns to that new name
2135 # so that they render as "inserted.<colname>" / "deleted.<colname>".
2137 if self.isinsert or self.isupdate:
2138 target = stmt.table.alias("inserted")
2139 else:
2140 target = stmt.table.alias("deleted")
2142 adapter = sql_util.ClauseAdapter(target)
2144 # adapter.traverse() takes a column from our target table and returns
2145 # the one that is linked to the "inserted" / "deleted" tables. So in
2146 # order to retrieve these values back from the result (e.g. like
2147 # row[column]), tell the compiler to also add the original unadapted
2148 # column to the result map. Before #4877, these were (unknowingly)
2149 # falling back using string name matching in the result set which
2150 # necessarily used an expensive KeyError in order to match.
2152 columns = [
2153 self._label_returning_column(
2154 stmt,
2155 adapter.traverse(c),
2156 {"result_map_targets": (c,)},
2157 fallback_label_name=c._non_anon_label,
2158 )
2159 for c in expression._select_iterables(returning_cols)
2160 ]
2162 return "OUTPUT " + ", ".join(columns)
2164 def get_cte_preamble(self, recursive):
2165 # SQL Server finds it too inconvenient to accept
2166 # an entirely optional, SQL standard specified,
2167 # "RECURSIVE" word with their "WITH",
2168 # so here we go
2169 return "WITH"
2171 def label_select_column(self, select, column, asfrom):
2172 if isinstance(column, expression.Function):
2173 return column.label(None)
2174 else:
2175 return super(MSSQLCompiler, self).label_select_column(
2176 select, column, asfrom
2177 )
2179 def for_update_clause(self, select, **kw):
2180 # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which
2181 # SQLAlchemy doesn't use
2182 return ""
2184 def order_by_clause(self, select, **kw):
2185 # MSSQL only allows ORDER BY in subqueries if there is a LIMIT
2186 if (
2187 self.is_subquery()
2188 and not select._limit
2189 and (
2190 select._offset is None
2191 or not self.dialect._supports_offset_fetch
2192 )
2193 ):
2194 # avoid processing the order by clause if we won't end up
2195 # using it, because we don't want all the bind params tacked
2196 # onto the positional list if that is what the dbapi requires
2197 return ""
2199 order_by = self.process(select._order_by_clause, **kw)
2201 if order_by:
2202 return " ORDER BY " + order_by
2203 else:
2204 return ""
2206 def update_from_clause(
2207 self, update_stmt, from_table, extra_froms, from_hints, **kw
2208 ):
2209 """Render the UPDATE..FROM clause specific to MSSQL.
2211 In MSSQL, if the UPDATE statement involves an alias of the table to
2212 be updated, then the table itself must be added to the FROM list as
2213 well. Otherwise, it is optional. Here, we add it regardless.
2215 """
2216 return "FROM " + ", ".join(
2217 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw)
2218 for t in [from_table] + extra_froms
2219 )
2221 def delete_table_clause(self, delete_stmt, from_table, extra_froms):
2222 """If we have extra froms make sure we render any alias as hint."""
2223 ashint = False
2224 if extra_froms:
2225 ashint = True
2226 return from_table._compiler_dispatch(
2227 self, asfrom=True, iscrud=True, ashint=ashint
2228 )
2230 def delete_extra_from_clause(
2231 self, delete_stmt, from_table, extra_froms, from_hints, **kw
2232 ):
2233 """Render the DELETE .. FROM clause specific to MSSQL.
2235 Yes, it has the FROM keyword twice.
2237 """
2238 return "FROM " + ", ".join(
2239 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw)
2240 for t in [from_table] + extra_froms
2241 )
2243 def visit_empty_set_expr(self, type_):
2244 return "SELECT 1 WHERE 1!=1"
2246 def visit_is_distinct_from_binary(self, binary, operator, **kw):
2247 return "NOT EXISTS (SELECT %s INTERSECT SELECT %s)" % (
2248 self.process(binary.left),
2249 self.process(binary.right),
2250 )
2252 def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
2253 return "EXISTS (SELECT %s INTERSECT SELECT %s)" % (
2254 self.process(binary.left),
2255 self.process(binary.right),
2256 )
2258 def _render_json_extract_from_binary(self, binary, operator, **kw):
2259 # note we are intentionally calling upon the process() calls in the
2260 # order in which they appear in the SQL String as this is used
2261 # by positional parameter rendering
2263 if binary.type._type_affinity is sqltypes.JSON:
2264 return "JSON_QUERY(%s, %s)" % (
2265 self.process(binary.left, **kw),
2266 self.process(binary.right, **kw),
2267 )
2269 # as with other dialects, start with an explicit test for NULL
2270 case_expression = "CASE JSON_VALUE(%s, %s) WHEN NULL THEN NULL" % (
2271 self.process(binary.left, **kw),
2272 self.process(binary.right, **kw),
2273 )
2275 if binary.type._type_affinity is sqltypes.Integer:
2276 type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS INTEGER)" % (
2277 self.process(binary.left, **kw),
2278 self.process(binary.right, **kw),
2279 )
2280 elif binary.type._type_affinity is sqltypes.Numeric:
2281 type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS %s)" % (
2282 self.process(binary.left, **kw),
2283 self.process(binary.right, **kw),
2284 "FLOAT"
2285 if isinstance(binary.type, sqltypes.Float)
2286 else "NUMERIC(%s, %s)"
2287 % (binary.type.precision, binary.type.scale),
2288 )
2289 elif binary.type._type_affinity is sqltypes.Boolean:
2290 # the NULL handling is particularly weird with boolean, so
2291 # explicitly return numeric (BIT) constants
2292 type_expression = (
2293 "WHEN 'true' THEN 1 WHEN 'false' THEN 0 ELSE NULL"
2294 )
2295 elif binary.type._type_affinity is sqltypes.String:
2296 # TODO: does this comment (from mysql) apply to here, too?
2297 # this fails with a JSON value that's a four byte unicode
2298 # string. SQLite has the same problem at the moment
2299 type_expression = "ELSE JSON_VALUE(%s, %s)" % (
2300 self.process(binary.left, **kw),
2301 self.process(binary.right, **kw),
2302 )
2303 else:
2304 # other affinity....this is not expected right now
2305 type_expression = "ELSE JSON_QUERY(%s, %s)" % (
2306 self.process(binary.left, **kw),
2307 self.process(binary.right, **kw),
2308 )
2310 return case_expression + " " + type_expression + " END"
2312 def visit_json_getitem_op_binary(self, binary, operator, **kw):
2313 return self._render_json_extract_from_binary(binary, operator, **kw)
2315 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
2316 return self._render_json_extract_from_binary(binary, operator, **kw)
2318 def visit_sequence(self, seq, **kw):
2319 return "NEXT VALUE FOR %s" % self.preparer.format_sequence(seq)
2322class MSSQLStrictCompiler(MSSQLCompiler):
2324 """A subclass of MSSQLCompiler which disables the usage of bind
2325 parameters where not allowed natively by MS-SQL.
2327 A dialect may use this compiler on a platform where native
2328 binds are used.
2330 """
2332 ansi_bind_rules = True
2334 def visit_in_op_binary(self, binary, operator, **kw):
2335 kw["literal_execute"] = True
2336 return "%s IN %s" % (
2337 self.process(binary.left, **kw),
2338 self.process(binary.right, **kw),
2339 )
2341 def visit_not_in_op_binary(self, binary, operator, **kw):
2342 kw["literal_execute"] = True
2343 return "%s NOT IN %s" % (
2344 self.process(binary.left, **kw),
2345 self.process(binary.right, **kw),
2346 )
2348 def render_literal_value(self, value, type_):
2349 """
2350 For date and datetime values, convert to a string
2351 format acceptable to MSSQL. That seems to be the
2352 so-called ODBC canonical date format which looks
2353 like this:
2355 yyyy-mm-dd hh:mi:ss.mmm(24h)
2357 For other data types, call the base class implementation.
2358 """
2359 # datetime and date are both subclasses of datetime.date
2360 if issubclass(type(value), datetime.date):
2361 # SQL Server wants single quotes around the date string.
2362 return "'" + str(value) + "'"
2363 else:
2364 return super(MSSQLStrictCompiler, self).render_literal_value(
2365 value, type_
2366 )
2369class MSDDLCompiler(compiler.DDLCompiler):
2370 def get_column_specification(self, column, **kwargs):
2371 colspec = self.preparer.format_column(column)
2373 # type is not accepted in a computed column
2374 if column.computed is not None:
2375 colspec += " " + self.process(column.computed)
2376 else:
2377 colspec += " " + self.dialect.type_compiler.process(
2378 column.type, type_expression=column
2379 )
2381 if column.nullable is not None:
2382 if (
2383 not column.nullable
2384 or column.primary_key
2385 or isinstance(column.default, sa_schema.Sequence)
2386 or column.autoincrement is True
2387 or column.identity
2388 ):
2389 colspec += " NOT NULL"
2390 elif column.computed is None:
2391 # don't specify "NULL" for computed columns
2392 colspec += " NULL"
2394 if column.table is None:
2395 raise exc.CompileError(
2396 "mssql requires Table-bound columns "
2397 "in order to generate DDL"
2398 )
2400 d_opt = column.dialect_options["mssql"]
2401 start = d_opt["identity_start"]
2402 increment = d_opt["identity_increment"]
2403 if start is not None or increment is not None:
2404 if column.identity:
2405 raise exc.CompileError(
2406 "Cannot specify options 'mssql_identity_start' and/or "
2407 "'mssql_identity_increment' while also using the "
2408 "'Identity' construct."
2409 )
2410 util.warn_deprecated(
2411 "The dialect options 'mssql_identity_start' and "
2412 "'mssql_identity_increment' are deprecated. "
2413 "Use the 'Identity' object instead.",
2414 "1.4",
2415 )
2417 if column.identity:
2418 colspec += self.process(column.identity, **kwargs)
2419 elif (
2420 column is column.table._autoincrement_column
2421 or column.autoincrement is True
2422 ) and (
2423 not isinstance(column.default, Sequence) or column.default.optional
2424 ):
2425 colspec += self.process(Identity(start=start, increment=increment))
2426 else:
2427 default = self.get_column_default_string(column)
2428 if default is not None:
2429 colspec += " DEFAULT " + default
2431 return colspec
2433 def visit_create_index(self, create, include_schema=False):
2434 index = create.element
2435 self._verify_index_table(index)
2436 preparer = self.preparer
2437 text = "CREATE "
2438 if index.unique:
2439 text += "UNIQUE "
2441 # handle clustering option
2442 clustered = index.dialect_options["mssql"]["clustered"]
2443 if clustered is not None:
2444 if clustered:
2445 text += "CLUSTERED "
2446 else:
2447 text += "NONCLUSTERED "
2449 text += "INDEX %s ON %s (%s)" % (
2450 self._prepared_index_name(index, include_schema=include_schema),
2451 preparer.format_table(index.table),
2452 ", ".join(
2453 self.sql_compiler.process(
2454 expr, include_table=False, literal_binds=True
2455 )
2456 for expr in index.expressions
2457 ),
2458 )
2460 # handle other included columns
2461 if index.dialect_options["mssql"]["include"]:
2462 inclusions = [
2463 index.table.c[col]
2464 if isinstance(col, util.string_types)
2465 else col
2466 for col in index.dialect_options["mssql"]["include"]
2467 ]
2469 text += " INCLUDE (%s)" % ", ".join(
2470 [preparer.quote(c.name) for c in inclusions]
2471 )
2473 whereclause = index.dialect_options["mssql"]["where"]
2475 if whereclause is not None:
2476 whereclause = coercions.expect(
2477 roles.DDLExpressionRole, whereclause
2478 )
2480 where_compiled = self.sql_compiler.process(
2481 whereclause, include_table=False, literal_binds=True
2482 )
2483 text += " WHERE " + where_compiled
2485 return text
2487 def visit_drop_index(self, drop):
2488 return "\nDROP INDEX %s ON %s" % (
2489 self._prepared_index_name(drop.element, include_schema=False),
2490 self.preparer.format_table(drop.element.table),
2491 )
2493 def visit_primary_key_constraint(self, constraint):
2494 if len(constraint) == 0:
2495 return ""
2496 text = ""
2497 if constraint.name is not None:
2498 text += "CONSTRAINT %s " % self.preparer.format_constraint(
2499 constraint
2500 )
2501 text += "PRIMARY KEY "
2503 clustered = constraint.dialect_options["mssql"]["clustered"]
2504 if clustered is not None:
2505 if clustered:
2506 text += "CLUSTERED "
2507 else:
2508 text += "NONCLUSTERED "
2510 text += "(%s)" % ", ".join(
2511 self.preparer.quote(c.name) for c in constraint
2512 )
2513 text += self.define_constraint_deferrability(constraint)
2514 return text
2516 def visit_unique_constraint(self, constraint):
2517 if len(constraint) == 0:
2518 return ""
2519 text = ""
2520 if constraint.name is not None:
2521 formatted_name = self.preparer.format_constraint(constraint)
2522 if formatted_name is not None:
2523 text += "CONSTRAINT %s " % formatted_name
2524 text += "UNIQUE "
2526 clustered = constraint.dialect_options["mssql"]["clustered"]
2527 if clustered is not None:
2528 if clustered:
2529 text += "CLUSTERED "
2530 else:
2531 text += "NONCLUSTERED "
2533 text += "(%s)" % ", ".join(
2534 self.preparer.quote(c.name) for c in constraint
2535 )
2536 text += self.define_constraint_deferrability(constraint)
2537 return text
2539 def visit_computed_column(self, generated):
2540 text = "AS (%s)" % self.sql_compiler.process(
2541 generated.sqltext, include_table=False, literal_binds=True
2542 )
2543 # explicitly check for True|False since None means server default
2544 if generated.persisted is True:
2545 text += " PERSISTED"
2546 return text
2548 def visit_create_sequence(self, create, **kw):
2549 prefix = None
2550 if create.element.data_type is not None:
2551 data_type = create.element.data_type
2552 prefix = " AS %s" % self.type_compiler.process(data_type)
2553 return super(MSDDLCompiler, self).visit_create_sequence(
2554 create, prefix=prefix, **kw
2555 )
2557 def visit_identity_column(self, identity, **kw):
2558 text = " IDENTITY"
2559 if identity.start is not None or identity.increment is not None:
2560 start = 1 if identity.start is None else identity.start
2561 increment = 1 if identity.increment is None else identity.increment
2562 text += "(%s,%s)" % (start, increment)
2563 return text
2566class MSIdentifierPreparer(compiler.IdentifierPreparer):
2567 reserved_words = RESERVED_WORDS
2569 def __init__(self, dialect):
2570 super(MSIdentifierPreparer, self).__init__(
2571 dialect,
2572 initial_quote="[",
2573 final_quote="]",
2574 quote_case_sensitive_collations=False,
2575 )
2577 def _escape_identifier(self, value):
2578 return value.replace("]", "]]")
2580 def _unescape_identifier(self, value):
2581 return value.replace("]]", "]")
2583 def quote_schema(self, schema, force=None):
2584 """Prepare a quoted table and schema name."""
2586 # need to re-implement the deprecation warning entirely
2587 if force is not None:
2588 # not using the util.deprecated_params() decorator in this
2589 # case because of the additional function call overhead on this
2590 # very performance-critical spot.
2591 util.warn_deprecated(
2592 "The IdentifierPreparer.quote_schema.force parameter is "
2593 "deprecated and will be removed in a future release. This "
2594 "flag has no effect on the behavior of the "
2595 "IdentifierPreparer.quote method; please refer to "
2596 "quoted_name().",
2597 version="1.3",
2598 )
2600 dbname, owner = _schema_elements(schema)
2601 if dbname:
2602 result = "%s.%s" % (self.quote(dbname), self.quote(owner))
2603 elif owner:
2604 result = self.quote(owner)
2605 else:
2606 result = ""
2607 return result
2610def _db_plus_owner_listing(fn):
2611 def wrap(dialect, connection, schema=None, **kw):
2612 dbname, owner = _owner_plus_db(dialect, schema)
2613 return _switch_db(
2614 dbname,
2615 connection,
2616 fn,
2617 dialect,
2618 connection,
2619 dbname,
2620 owner,
2621 schema,
2622 **kw
2623 )
2625 return update_wrapper(wrap, fn)
2628def _db_plus_owner(fn):
2629 def wrap(dialect, connection, tablename, schema=None, **kw):
2630 dbname, owner = _owner_plus_db(dialect, schema)
2631 return _switch_db(
2632 dbname,
2633 connection,
2634 fn,
2635 dialect,
2636 connection,
2637 tablename,
2638 dbname,
2639 owner,
2640 schema,
2641 **kw
2642 )
2644 return update_wrapper(wrap, fn)
2647def _switch_db(dbname, connection, fn, *arg, **kw):
2648 if dbname:
2649 current_db = connection.exec_driver_sql("select db_name()").scalar()
2650 if current_db != dbname:
2651 connection.exec_driver_sql(
2652 "use %s" % connection.dialect.identifier_preparer.quote(dbname)
2653 )
2654 try:
2655 return fn(*arg, **kw)
2656 finally:
2657 if dbname and current_db != dbname:
2658 connection.exec_driver_sql(
2659 "use %s"
2660 % connection.dialect.identifier_preparer.quote(current_db)
2661 )
2664def _owner_plus_db(dialect, schema):
2665 if not schema:
2666 return None, dialect.default_schema_name
2667 else:
2668 return _schema_elements(schema)
2671_memoized_schema = util.LRUCache()
2674def _schema_elements(schema):
2675 if isinstance(schema, quoted_name) and schema.quote:
2676 return None, schema
2678 if schema in _memoized_schema:
2679 return _memoized_schema[schema]
2681 # tests for this function are in:
2682 # test/dialect/mssql/test_reflection.py ->
2683 # OwnerPlusDBTest.test_owner_database_pairs
2684 # test/dialect/mssql/test_compiler.py -> test_force_schema_*
2685 # test/dialect/mssql/test_compiler.py -> test_schema_many_tokens_*
2686 #
2688 if schema.startswith("__[SCHEMA_"):
2689 return None, schema
2691 push = []
2692 symbol = ""
2693 bracket = False
2694 has_brackets = False
2695 for token in re.split(r"(\[|\]|\.)", schema):
2696 if not token:
2697 continue
2698 if token == "[":
2699 bracket = True
2700 has_brackets = True
2701 elif token == "]":
2702 bracket = False
2703 elif not bracket and token == ".":
2704 if has_brackets:
2705 push.append("[%s]" % symbol)
2706 else:
2707 push.append(symbol)
2708 symbol = ""
2709 has_brackets = False
2710 else:
2711 symbol += token
2712 if symbol:
2713 push.append(symbol)
2714 if len(push) > 1:
2715 dbname, owner = ".".join(push[0:-1]), push[-1]
2717 # test for internal brackets
2718 if re.match(r".*\].*\[.*", dbname[1:-1]):
2719 dbname = quoted_name(dbname, quote=False)
2720 else:
2721 dbname = dbname.lstrip("[").rstrip("]")
2723 elif len(push):
2724 dbname, owner = None, push[0]
2725 else:
2726 dbname, owner = None, None
2728 _memoized_schema[schema] = dbname, owner
2729 return dbname, owner
2732class MSDialect(default.DefaultDialect):
2733 # will assume it's at least mssql2005
2734 name = "mssql"
2735 supports_statement_cache = True
2736 supports_default_values = True
2737 supports_empty_insert = False
2738 execution_ctx_cls = MSExecutionContext
2739 use_scope_identity = True
2740 max_identifier_length = 128
2741 schema_name = "dbo"
2743 implicit_returning = True
2744 full_returning = True
2746 colspecs = {
2747 sqltypes.DateTime: _MSDateTime,
2748 sqltypes.Date: _MSDate,
2749 sqltypes.JSON: JSON,
2750 sqltypes.JSON.JSONIndexType: JSONIndexType,
2751 sqltypes.JSON.JSONPathType: JSONPathType,
2752 sqltypes.Time: _BASETIMEIMPL,
2753 sqltypes.Unicode: _MSUnicode,
2754 sqltypes.UnicodeText: _MSUnicodeText,
2755 DATETIMEOFFSET: DATETIMEOFFSET,
2756 DATETIME2: DATETIME2,
2757 SMALLDATETIME: SMALLDATETIME,
2758 DATETIME: DATETIME,
2759 }
2761 engine_config_types = default.DefaultDialect.engine_config_types.union(
2762 {"legacy_schema_aliasing": util.asbool}
2763 )
2765 ischema_names = ischema_names
2767 supports_sequences = True
2768 sequences_optional = True
2769 # T-SQL's actual default is -9223372036854775808
2770 default_sequence_base = 1
2772 supports_native_boolean = False
2773 non_native_boolean_check_constraint = False
2774 supports_unicode_binds = True
2775 postfetch_lastrowid = True
2776 _supports_offset_fetch = False
2777 _supports_nvarchar_max = False
2779 legacy_schema_aliasing = False
2781 server_version_info = ()
2783 statement_compiler = MSSQLCompiler
2784 ddl_compiler = MSDDLCompiler
2785 type_compiler = MSTypeCompiler
2786 preparer = MSIdentifierPreparer
2788 construct_arguments = [
2789 (sa_schema.PrimaryKeyConstraint, {"clustered": None}),
2790 (sa_schema.UniqueConstraint, {"clustered": None}),
2791 (sa_schema.Index, {"clustered": None, "include": None, "where": None}),
2792 (
2793 sa_schema.Column,
2794 {"identity_start": None, "identity_increment": None},
2795 ),
2796 ]
2798 def __init__(
2799 self,
2800 query_timeout=None,
2801 use_scope_identity=True,
2802 schema_name="dbo",
2803 isolation_level=None,
2804 deprecate_large_types=None,
2805 json_serializer=None,
2806 json_deserializer=None,
2807 legacy_schema_aliasing=None,
2808 ignore_no_transaction_on_rollback=False,
2809 **opts
2810 ):
2811 self.query_timeout = int(query_timeout or 0)
2812 self.schema_name = schema_name
2814 self.use_scope_identity = use_scope_identity
2815 self.deprecate_large_types = deprecate_large_types
2816 self.ignore_no_transaction_on_rollback = (
2817 ignore_no_transaction_on_rollback
2818 )
2820 if legacy_schema_aliasing is not None:
2821 util.warn_deprecated(
2822 "The legacy_schema_aliasing parameter is "
2823 "deprecated and will be removed in a future release.",
2824 "1.4",
2825 )
2826 self.legacy_schema_aliasing = legacy_schema_aliasing
2828 super(MSDialect, self).__init__(**opts)
2830 self.isolation_level = isolation_level
2831 self._json_serializer = json_serializer
2832 self._json_deserializer = json_deserializer
2834 def do_savepoint(self, connection, name):
2835 # give the DBAPI a push
2836 connection.exec_driver_sql("IF @@TRANCOUNT = 0 BEGIN TRANSACTION")
2837 super(MSDialect, self).do_savepoint(connection, name)
2839 def do_release_savepoint(self, connection, name):
2840 # SQL Server does not support RELEASE SAVEPOINT
2841 pass
2843 def do_rollback(self, dbapi_connection):
2844 try:
2845 super(MSDialect, self).do_rollback(dbapi_connection)
2846 except self.dbapi.ProgrammingError as e:
2847 if self.ignore_no_transaction_on_rollback and re.match(
2848 r".*\b111214\b", str(e)
2849 ):
2850 util.warn(
2851 "ProgrammingError 111214 "
2852 "'No corresponding transaction found.' "
2853 "has been suppressed via "
2854 "ignore_no_transaction_on_rollback=True"
2855 )
2856 else:
2857 raise
2859 _isolation_lookup = set(
2860 [
2861 "SERIALIZABLE",
2862 "READ UNCOMMITTED",
2863 "READ COMMITTED",
2864 "REPEATABLE READ",
2865 "SNAPSHOT",
2866 ]
2867 )
2869 def set_isolation_level(self, connection, level):
2870 level = level.replace("_", " ")
2871 if level not in self._isolation_lookup:
2872 raise exc.ArgumentError(
2873 "Invalid value '%s' for isolation_level. "
2874 "Valid isolation levels for %s are %s"
2875 % (level, self.name, ", ".join(self._isolation_lookup))
2876 )
2877 cursor = connection.cursor()
2878 cursor.execute("SET TRANSACTION ISOLATION LEVEL %s" % level)
2879 cursor.close()
2880 if level == "SNAPSHOT":
2881 connection.commit()
2883 def get_isolation_level(self, dbapi_connection):
2884 cursor = dbapi_connection.cursor()
2885 view_name = "sys.system_views"
2886 try:
2887 cursor.execute(
2888 (
2889 "SELECT name FROM {} WHERE name IN "
2890 "('dm_exec_sessions', 'dm_pdw_nodes_exec_sessions')"
2891 ).format(view_name)
2892 )
2893 row = cursor.fetchone()
2894 if not row:
2895 raise NotImplementedError(
2896 "Can't fetch isolation level on this particular "
2897 "SQL Server version."
2898 )
2900 view_name = "sys.{}".format(row[0])
2902 cursor.execute(
2903 """
2904 SELECT CASE transaction_isolation_level
2905 WHEN 0 THEN NULL
2906 WHEN 1 THEN 'READ UNCOMMITTED'
2907 WHEN 2 THEN 'READ COMMITTED'
2908 WHEN 3 THEN 'REPEATABLE READ'
2909 WHEN 4 THEN 'SERIALIZABLE'
2910 WHEN 5 THEN 'SNAPSHOT' END
2911 AS TRANSACTION_ISOLATION_LEVEL
2912 FROM {}
2913 where session_id = @@SPID
2914 """.format(
2915 view_name
2916 )
2917 )
2918 except self.dbapi.Error as err:
2919 util.raise_(
2920 NotImplementedError(
2921 "Can't fetch isolation level; encountered error {} when "
2922 'attempting to query the "{}" view.'.format(err, view_name)
2923 ),
2924 from_=err,
2925 )
2926 else:
2927 row = cursor.fetchone()
2928 return row[0].upper()
2929 finally:
2930 cursor.close()
2932 def initialize(self, connection):
2933 super(MSDialect, self).initialize(connection)
2934 self._setup_version_attributes()
2935 self._setup_supports_nvarchar_max(connection)
2937 def on_connect(self):
2938 if self.isolation_level is not None:
2940 def connect(conn):
2941 self.set_isolation_level(conn, self.isolation_level)
2943 return connect
2944 else:
2945 return None
2947 def _setup_version_attributes(self):
2948 if self.server_version_info[0] not in list(range(8, 17)):
2949 util.warn(
2950 "Unrecognized server version info '%s'. Some SQL Server "
2951 "features may not function properly."
2952 % ".".join(str(x) for x in self.server_version_info)
2953 )
2955 if self.server_version_info >= MS_2008_VERSION:
2956 self.supports_multivalues_insert = True
2957 if self.deprecate_large_types is None:
2958 self.deprecate_large_types = (
2959 self.server_version_info >= MS_2012_VERSION
2960 )
2962 self._supports_offset_fetch = (
2963 self.server_version_info and self.server_version_info[0] >= 11
2964 )
2966 def _setup_supports_nvarchar_max(self, connection):
2967 try:
2968 connection.scalar(
2969 sql.text("SELECT CAST('test max support' AS NVARCHAR(max))")
2970 )
2971 except exc.DBAPIError:
2972 self._supports_nvarchar_max = False
2973 else:
2974 self._supports_nvarchar_max = True
2976 def _get_default_schema_name(self, connection):
2977 query = sql.text("SELECT schema_name()")
2978 default_schema_name = connection.scalar(query)
2979 if default_schema_name is not None:
2980 # guard against the case where the default_schema_name is being
2981 # fed back into a table reflection function.
2982 return quoted_name(default_schema_name, quote=True)
2983 else:
2984 return self.schema_name
2986 @_db_plus_owner
2987 def has_table(self, connection, tablename, dbname, owner, schema):
2988 self._ensure_has_table_connection(connection)
2990 if tablename.startswith("#"): # temporary table
2991 # mssql does not support temporary views
2992 # SQL Error [4103] [S0001]: "#v": Temporary views are not allowed
2993 return bool(
2994 connection.scalar(
2995 # U filters on user tables only.
2996 text("SELECT object_id(:table_name, 'U')"),
2997 {"table_name": "tempdb.dbo.[{}]".format(tablename)},
2998 )
2999 )
3001 else:
3002 tables = ischema.tables
3004 s = sql.select(tables.c.table_name, tables.c.table_type).where(
3005 tables.c.table_name == tablename,
3006 )
3008 if owner:
3009 s = s.where(tables.c.table_schema == owner)
3011 c = connection.execute(s)
3013 return c.first() is not None
3015 @_db_plus_owner
3016 def has_sequence(self, connection, sequencename, dbname, owner, schema):
3017 sequences = ischema.sequences
3019 s = sql.select(sequences.c.sequence_name).where(
3020 sequences.c.sequence_name == sequencename
3021 )
3023 if owner:
3024 s = s.where(sequences.c.sequence_schema == owner)
3026 c = connection.execute(s)
3028 return c.first() is not None
3030 @reflection.cache
3031 @_db_plus_owner_listing
3032 def get_sequence_names(self, connection, dbname, owner, schema, **kw):
3033 sequences = ischema.sequences
3035 s = sql.select(sequences.c.sequence_name)
3036 if owner:
3037 s = s.where(sequences.c.sequence_schema == owner)
3039 c = connection.execute(s)
3041 return [row[0] for row in c]
3043 @reflection.cache
3044 def get_schema_names(self, connection, **kw):
3045 s = sql.select(ischema.schemata.c.schema_name).order_by(
3046 ischema.schemata.c.schema_name
3047 )
3048 schema_names = [r[0] for r in connection.execute(s)]
3049 return schema_names
3051 @reflection.cache
3052 @_db_plus_owner_listing
3053 def get_table_names(self, connection, dbname, owner, schema, **kw):
3054 tables = ischema.tables
3055 s = (
3056 sql.select(tables.c.table_name)
3057 .where(
3058 sql.and_(
3059 tables.c.table_schema == owner,
3060 tables.c.table_type == "BASE TABLE",
3061 )
3062 )
3063 .order_by(tables.c.table_name)
3064 )
3065 table_names = [r[0] for r in connection.execute(s)]
3066 return table_names
3068 @reflection.cache
3069 @_db_plus_owner_listing
3070 def get_view_names(self, connection, dbname, owner, schema, **kw):
3071 tables = ischema.tables
3072 s = (
3073 sql.select(tables.c.table_name)
3074 .where(
3075 sql.and_(
3076 tables.c.table_schema == owner,
3077 tables.c.table_type == "VIEW",
3078 )
3079 )
3080 .order_by(tables.c.table_name)
3081 )
3082 view_names = [r[0] for r in connection.execute(s)]
3083 return view_names
3085 @reflection.cache
3086 @_db_plus_owner
3087 def get_indexes(self, connection, tablename, dbname, owner, schema, **kw):
3088 filter_definition = (
3089 "ind.filter_definition"
3090 if self.server_version_info >= MS_2008_VERSION
3091 else "NULL as filter_definition"
3092 )
3093 rp = connection.execution_options(future_result=True).execute(
3094 sql.text(
3095 "select ind.index_id, ind.is_unique, ind.name, "
3096 "%s "
3097 "from sys.indexes as ind join sys.tables as tab on "
3098 "ind.object_id=tab.object_id "
3099 "join sys.schemas as sch on sch.schema_id=tab.schema_id "
3100 "where tab.name = :tabname "
3101 "and sch.name=:schname "
3102 "and ind.is_primary_key=0 and ind.type != 0"
3103 % filter_definition
3104 )
3105 .bindparams(
3106 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
3107 sql.bindparam("schname", owner, ischema.CoerceUnicode()),
3108 )
3109 .columns(name=sqltypes.Unicode())
3110 )
3111 indexes = {}
3112 for row in rp.mappings():
3113 indexes[row["index_id"]] = {
3114 "name": row["name"],
3115 "unique": row["is_unique"] == 1,
3116 "column_names": [],
3117 "include_columns": [],
3118 }
3120 if row["filter_definition"] is not None:
3121 indexes[row["index_id"]].setdefault("dialect_options", {})[
3122 "mssql_where"
3123 ] = row["filter_definition"]
3125 rp = connection.execution_options(future_result=True).execute(
3126 sql.text(
3127 "select ind_col.index_id, ind_col.object_id, col.name, "
3128 "ind_col.is_included_column "
3129 "from sys.columns as col "
3130 "join sys.tables as tab on tab.object_id=col.object_id "
3131 "join sys.index_columns as ind_col on "
3132 "(ind_col.column_id=col.column_id and "
3133 "ind_col.object_id=tab.object_id) "
3134 "join sys.schemas as sch on sch.schema_id=tab.schema_id "
3135 "where tab.name=:tabname "
3136 "and sch.name=:schname"
3137 )
3138 .bindparams(
3139 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
3140 sql.bindparam("schname", owner, ischema.CoerceUnicode()),
3141 )
3142 .columns(name=sqltypes.Unicode())
3143 )
3144 for row in rp.mappings():
3145 if row["index_id"] in indexes:
3146 if row["is_included_column"]:
3147 indexes[row["index_id"]]["include_columns"].append(
3148 row["name"]
3149 )
3150 else:
3151 indexes[row["index_id"]]["column_names"].append(
3152 row["name"]
3153 )
3154 for index_info in indexes.values():
3155 # NOTE: "root level" include_columns is legacy, now part of
3156 # dialect_options (issue #7382)
3157 index_info.setdefault("dialect_options", {})[
3158 "mssql_include"
3159 ] = index_info["include_columns"]
3161 return list(indexes.values())
3163 @reflection.cache
3164 @_db_plus_owner
3165 def get_view_definition(
3166 self, connection, viewname, dbname, owner, schema, **kw
3167 ):
3168 rp = connection.execute(
3169 sql.text(
3170 "select definition from sys.sql_modules as mod, "
3171 "sys.views as views, "
3172 "sys.schemas as sch"
3173 " where "
3174 "mod.object_id=views.object_id and "
3175 "views.schema_id=sch.schema_id and "
3176 "views.name=:viewname and sch.name=:schname"
3177 ).bindparams(
3178 sql.bindparam("viewname", viewname, ischema.CoerceUnicode()),
3179 sql.bindparam("schname", owner, ischema.CoerceUnicode()),
3180 )
3181 )
3183 if rp:
3184 view_def = rp.scalar()
3185 return view_def
3187 def _temp_table_name_like_pattern(self, tablename):
3188 # LIKE uses '%' to match zero or more characters and '_' to match any
3189 # single character. We want to match literal underscores, so T-SQL
3190 # requires that we enclose them in square brackets.
3191 return tablename + (
3192 ("[_][_][_]%") if not tablename.startswith("##") else ""
3193 )
3195 def _get_internal_temp_table_name(self, connection, tablename):
3196 # it's likely that schema is always "dbo", but since we can
3197 # get it here, let's get it.
3198 # see https://stackoverflow.com/questions/8311959/
3199 # specifying-schema-for-temporary-tables
3201 try:
3202 return connection.execute(
3203 sql.text(
3204 "select table_schema, table_name "
3205 "from tempdb.information_schema.tables "
3206 "where table_name like :p1"
3207 ),
3208 {"p1": self._temp_table_name_like_pattern(tablename)},
3209 ).one()
3210 except exc.MultipleResultsFound as me:
3211 util.raise_(
3212 exc.UnreflectableTableError(
3213 "Found more than one temporary table named '%s' in tempdb "
3214 "at this time. Cannot reliably resolve that name to its "
3215 "internal table name." % tablename
3216 ),
3217 replace_context=me,
3218 )
3219 except exc.NoResultFound as ne:
3220 util.raise_(
3221 exc.NoSuchTableError(
3222 "Unable to find a temporary table named '%s' in tempdb."
3223 % tablename
3224 ),
3225 replace_context=ne,
3226 )
3228 @reflection.cache
3229 @_db_plus_owner
3230 def get_columns(self, connection, tablename, dbname, owner, schema, **kw):
3231 is_temp_table = tablename.startswith("#")
3232 if is_temp_table:
3233 owner, tablename = self._get_internal_temp_table_name(
3234 connection, tablename
3235 )
3237 columns = ischema.mssql_temp_table_columns
3238 else:
3239 columns = ischema.columns
3241 computed_cols = ischema.computed_columns
3242 identity_cols = ischema.identity_columns
3243 if owner:
3244 whereclause = sql.and_(
3245 columns.c.table_name == tablename,
3246 columns.c.table_schema == owner,
3247 )
3248 full_name = columns.c.table_schema + "." + columns.c.table_name
3249 else:
3250 whereclause = columns.c.table_name == tablename
3251 full_name = columns.c.table_name
3253 join = columns.join(
3254 computed_cols,
3255 onclause=sql.and_(
3256 computed_cols.c.object_id == func.object_id(full_name),
3257 computed_cols.c.name
3258 == columns.c.column_name.collate("DATABASE_DEFAULT"),
3259 ),
3260 isouter=True,
3261 ).join(
3262 identity_cols,
3263 onclause=sql.and_(
3264 identity_cols.c.object_id == func.object_id(full_name),
3265 identity_cols.c.name
3266 == columns.c.column_name.collate("DATABASE_DEFAULT"),
3267 ),
3268 isouter=True,
3269 )
3271 if self._supports_nvarchar_max:
3272 computed_definition = computed_cols.c.definition
3273 else:
3274 # tds_version 4.2 does not support NVARCHAR(MAX)
3275 computed_definition = sql.cast(
3276 computed_cols.c.definition, NVARCHAR(4000)
3277 )
3279 s = (
3280 sql.select(
3281 columns,
3282 computed_definition,
3283 computed_cols.c.is_persisted,
3284 identity_cols.c.is_identity,
3285 identity_cols.c.seed_value,
3286 identity_cols.c.increment_value,
3287 )
3288 .where(whereclause)
3289 .select_from(join)
3290 .order_by(columns.c.ordinal_position)
3291 )
3293 c = connection.execution_options(future_result=True).execute(s)
3295 cols = []
3296 for row in c.mappings():
3297 name = row[columns.c.column_name]
3298 type_ = row[columns.c.data_type]
3299 nullable = row[columns.c.is_nullable] == "YES"
3300 charlen = row[columns.c.character_maximum_length]
3301 numericprec = row[columns.c.numeric_precision]
3302 numericscale = row[columns.c.numeric_scale]
3303 default = row[columns.c.column_default]
3304 collation = row[columns.c.collation_name]
3305 definition = row[computed_definition]
3306 is_persisted = row[computed_cols.c.is_persisted]
3307 is_identity = row[identity_cols.c.is_identity]
3308 identity_start = row[identity_cols.c.seed_value]
3309 identity_increment = row[identity_cols.c.increment_value]
3311 coltype = self.ischema_names.get(type_, None)
3313 kwargs = {}
3314 if coltype in (
3315 MSString,
3316 MSChar,
3317 MSNVarchar,
3318 MSNChar,
3319 MSText,
3320 MSNText,
3321 MSBinary,
3322 MSVarBinary,
3323 sqltypes.LargeBinary,
3324 ):
3325 if charlen == -1:
3326 charlen = None
3327 kwargs["length"] = charlen
3328 if collation:
3329 kwargs["collation"] = collation
3331 if coltype is None:
3332 util.warn(
3333 "Did not recognize type '%s' of column '%s'"
3334 % (type_, name)
3335 )
3336 coltype = sqltypes.NULLTYPE
3337 else:
3338 if issubclass(coltype, sqltypes.Numeric):
3339 kwargs["precision"] = numericprec
3341 if not issubclass(coltype, sqltypes.Float):
3342 kwargs["scale"] = numericscale
3344 coltype = coltype(**kwargs)
3345 cdict = {
3346 "name": name,
3347 "type": coltype,
3348 "nullable": nullable,
3349 "default": default,
3350 "autoincrement": is_identity is not None,
3351 }
3353 if definition is not None and is_persisted is not None:
3354 cdict["computed"] = {
3355 "sqltext": definition,
3356 "persisted": is_persisted,
3357 }
3359 if is_identity is not None:
3360 # identity_start and identity_increment are Decimal or None
3361 if identity_start is None or identity_increment is None:
3362 cdict["identity"] = {}
3363 else:
3364 if isinstance(coltype, sqltypes.BigInteger):
3365 start = compat.long_type(identity_start)
3366 increment = compat.long_type(identity_increment)
3367 elif isinstance(coltype, sqltypes.Integer):
3368 start = int(identity_start)
3369 increment = int(identity_increment)
3370 else:
3371 start = identity_start
3372 increment = identity_increment
3374 cdict["identity"] = {
3375 "start": start,
3376 "increment": increment,
3377 }
3379 cols.append(cdict)
3381 return cols
3383 @reflection.cache
3384 @_db_plus_owner
3385 def get_pk_constraint(
3386 self, connection, tablename, dbname, owner, schema, **kw
3387 ):
3388 pkeys = []
3389 TC = ischema.constraints
3390 C = ischema.key_constraints.alias("C")
3392 # Primary key constraints
3393 s = (
3394 sql.select(
3395 C.c.column_name, TC.c.constraint_type, C.c.constraint_name
3396 )
3397 .where(
3398 sql.and_(
3399 TC.c.constraint_name == C.c.constraint_name,
3400 TC.c.table_schema == C.c.table_schema,
3401 C.c.table_name == tablename,
3402 C.c.table_schema == owner,
3403 ),
3404 )
3405 .order_by(TC.c.constraint_name, C.c.ordinal_position)
3406 )
3407 c = connection.execution_options(future_result=True).execute(s)
3408 constraint_name = None
3409 for row in c.mappings():
3410 if "PRIMARY" in row[TC.c.constraint_type.name]:
3411 pkeys.append(row["COLUMN_NAME"])
3412 if constraint_name is None:
3413 constraint_name = row[C.c.constraint_name.name]
3414 return {"constrained_columns": pkeys, "name": constraint_name}
3416 @reflection.cache
3417 @_db_plus_owner
3418 def get_foreign_keys(
3419 self, connection, tablename, dbname, owner, schema, **kw
3420 ):
3421 # Foreign key constraints
3422 s = (
3423 text(
3424 """\
3425WITH fk_info AS (
3426 SELECT
3427 ischema_ref_con.constraint_schema,
3428 ischema_ref_con.constraint_name,
3429 ischema_key_col.ordinal_position,
3430 ischema_key_col.table_schema,
3431 ischema_key_col.table_name,
3432 ischema_ref_con.unique_constraint_schema,
3433 ischema_ref_con.unique_constraint_name,
3434 ischema_ref_con.match_option,
3435 ischema_ref_con.update_rule,
3436 ischema_ref_con.delete_rule,
3437 ischema_key_col.column_name AS constrained_column
3438 FROM
3439 INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS ischema_ref_con
3440 INNER JOIN
3441 INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col ON
3442 ischema_key_col.table_schema = ischema_ref_con.constraint_schema
3443 AND ischema_key_col.constraint_name =
3444 ischema_ref_con.constraint_name
3445 WHERE ischema_key_col.table_name = :tablename
3446 AND ischema_key_col.table_schema = :owner
3447),
3448constraint_info AS (
3449 SELECT
3450 ischema_key_col.constraint_schema,
3451 ischema_key_col.constraint_name,
3452 ischema_key_col.ordinal_position,
3453 ischema_key_col.table_schema,
3454 ischema_key_col.table_name,
3455 ischema_key_col.column_name
3456 FROM
3457 INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col
3458),
3459index_info AS (
3460 SELECT
3461 sys.schemas.name AS index_schema,
3462 sys.indexes.name AS index_name,
3463 sys.index_columns.key_ordinal AS ordinal_position,
3464 sys.schemas.name AS table_schema,
3465 sys.objects.name AS table_name,
3466 sys.columns.name AS column_name
3467 FROM
3468 sys.indexes
3469 INNER JOIN
3470 sys.objects ON
3471 sys.objects.object_id = sys.indexes.object_id
3472 INNER JOIN
3473 sys.schemas ON
3474 sys.schemas.schema_id = sys.objects.schema_id
3475 INNER JOIN
3476 sys.index_columns ON
3477 sys.index_columns.object_id = sys.objects.object_id
3478 AND sys.index_columns.index_id = sys.indexes.index_id
3479 INNER JOIN
3480 sys.columns ON
3481 sys.columns.object_id = sys.indexes.object_id
3482 AND sys.columns.column_id = sys.index_columns.column_id
3483)
3484 SELECT
3485 fk_info.constraint_schema,
3486 fk_info.constraint_name,
3487 fk_info.ordinal_position,
3488 fk_info.constrained_column,
3489 constraint_info.table_schema AS referred_table_schema,
3490 constraint_info.table_name AS referred_table_name,
3491 constraint_info.column_name AS referred_column,
3492 fk_info.match_option,
3493 fk_info.update_rule,
3494 fk_info.delete_rule
3495 FROM
3496 fk_info INNER JOIN constraint_info ON
3497 constraint_info.constraint_schema =
3498 fk_info.unique_constraint_schema
3499 AND constraint_info.constraint_name =
3500 fk_info.unique_constraint_name
3501 AND constraint_info.ordinal_position = fk_info.ordinal_position
3502 UNION
3503 SELECT
3504 fk_info.constraint_schema,
3505 fk_info.constraint_name,
3506 fk_info.ordinal_position,
3507 fk_info.constrained_column,
3508 index_info.table_schema AS referred_table_schema,
3509 index_info.table_name AS referred_table_name,
3510 index_info.column_name AS referred_column,
3511 fk_info.match_option,
3512 fk_info.update_rule,
3513 fk_info.delete_rule
3514 FROM
3515 fk_info INNER JOIN index_info ON
3516 index_info.index_schema = fk_info.unique_constraint_schema
3517 AND index_info.index_name = fk_info.unique_constraint_name
3518 AND index_info.ordinal_position = fk_info.ordinal_position
3520 ORDER BY fk_info.constraint_schema, fk_info.constraint_name,
3521 fk_info.ordinal_position
3522"""
3523 )
3524 .bindparams(
3525 sql.bindparam("tablename", tablename, ischema.CoerceUnicode()),
3526 sql.bindparam("owner", owner, ischema.CoerceUnicode()),
3527 )
3528 .columns(
3529 constraint_schema=sqltypes.Unicode(),
3530 constraint_name=sqltypes.Unicode(),
3531 table_schema=sqltypes.Unicode(),
3532 table_name=sqltypes.Unicode(),
3533 constrained_column=sqltypes.Unicode(),
3534 referred_table_schema=sqltypes.Unicode(),
3535 referred_table_name=sqltypes.Unicode(),
3536 referred_column=sqltypes.Unicode(),
3537 )
3538 )
3540 # group rows by constraint ID, to handle multi-column FKs
3541 fkeys = []
3543 def fkey_rec():
3544 return {
3545 "name": None,
3546 "constrained_columns": [],
3547 "referred_schema": None,
3548 "referred_table": None,
3549 "referred_columns": [],
3550 "options": {},
3551 }
3553 fkeys = util.defaultdict(fkey_rec)
3555 for r in connection.execute(s).fetchall():
3556 (
3557 _, # constraint schema
3558 rfknm,
3559 _, # ordinal position
3560 scol,
3561 rschema,
3562 rtbl,
3563 rcol,
3564 # TODO: we support match=<keyword> for foreign keys so
3565 # we can support this also, PG has match=FULL for example
3566 # but this seems to not be a valid value for SQL Server
3567 _, # match rule
3568 fkuprule,
3569 fkdelrule,
3570 ) = r
3572 rec = fkeys[rfknm]
3573 rec["name"] = rfknm
3575 if fkuprule != "NO ACTION":
3576 rec["options"]["onupdate"] = fkuprule
3578 if fkdelrule != "NO ACTION":
3579 rec["options"]["ondelete"] = fkdelrule
3581 if not rec["referred_table"]:
3582 rec["referred_table"] = rtbl
3583 if schema is not None or owner != rschema:
3584 if dbname:
3585 rschema = dbname + "." + rschema
3586 rec["referred_schema"] = rschema
3588 local_cols, remote_cols = (
3589 rec["constrained_columns"],
3590 rec["referred_columns"],
3591 )
3593 local_cols.append(scol)
3594 remote_cols.append(rcol)
3596 return list(fkeys.values())