Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/mssql/base.py: 33%
1056 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# mssql/base.py
2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7"""
8.. dialect:: mssql
9 :name: Microsoft SQL Server
10 :full_support: 2017
11 :normal_support: 2012+
12 :best_effort: 2005+
14.. _mssql_external_dialects:
16External Dialects
17-----------------
19In addition to the above DBAPI layers with native SQLAlchemy support, there
20are third-party dialects for other DBAPI layers that are compatible
21with SQL Server. See the "External Dialects" list on the
22:ref:`dialect_toplevel` page.
24.. _mssql_identity:
26Auto Increment Behavior / IDENTITY Columns
27------------------------------------------
29SQL Server provides so-called "auto incrementing" behavior using the
30``IDENTITY`` construct, which can be placed on any single integer column in a
31table. SQLAlchemy considers ``IDENTITY`` within its default "autoincrement"
32behavior for an integer primary key column, described at
33:paramref:`_schema.Column.autoincrement`. This means that by default,
34the first integer primary key column in a :class:`_schema.Table` will be
35considered to be the identity column - unless it is associated with a
36:class:`.Sequence` - and will generate DDL as such::
38 from sqlalchemy import Table, MetaData, Column, Integer
40 m = MetaData()
41 t = Table('t', m,
42 Column('id', Integer, primary_key=True),
43 Column('x', Integer))
44 m.create_all(engine)
46The above example will generate DDL as:
48.. sourcecode:: sql
50 CREATE TABLE t (
51 id INTEGER NOT NULL IDENTITY,
52 x INTEGER NULL,
53 PRIMARY KEY (id)
54 )
56For the case where this default generation of ``IDENTITY`` is not desired,
57specify ``False`` for the :paramref:`_schema.Column.autoincrement` flag,
58on the first integer primary key column::
60 m = MetaData()
61 t = Table('t', m,
62 Column('id', Integer, primary_key=True, autoincrement=False),
63 Column('x', Integer))
64 m.create_all(engine)
66To add the ``IDENTITY`` keyword to a non-primary key column, specify
67``True`` for the :paramref:`_schema.Column.autoincrement` flag on the desired
68:class:`_schema.Column` object, and ensure that
69:paramref:`_schema.Column.autoincrement`
70is set to ``False`` on any integer primary key column::
72 m = MetaData()
73 t = Table('t', m,
74 Column('id', Integer, primary_key=True, autoincrement=False),
75 Column('x', Integer, autoincrement=True))
76 m.create_all(engine)
78.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
79 in a :class:`_schema.Column` to specify the start and increment
80 parameters of an IDENTITY. These replace
81 the use of the :class:`.Sequence` object in order to specify these values.
83.. deprecated:: 1.4
85 The ``mssql_identity_start`` and ``mssql_identity_increment`` parameters
86 to :class:`_schema.Column` are deprecated and should we replaced by
87 an :class:`_schema.Identity` object. Specifying both ways of configuring
88 an IDENTITY will result in a compile error.
89 These options are also no longer returned as part of the
90 ``dialect_options`` key in :meth:`_reflection.Inspector.get_columns`.
91 Use the information in the ``identity`` key instead.
93.. deprecated:: 1.3
95 The use of :class:`.Sequence` to specify IDENTITY characteristics is
96 deprecated and will be removed in a future release. Please use
97 the :class:`_schema.Identity` object parameters
98 :paramref:`_schema.Identity.start` and
99 :paramref:`_schema.Identity.increment`.
101.. versionchanged:: 1.4 Removed the ability to use a :class:`.Sequence`
102 object to modify IDENTITY characteristics. :class:`.Sequence` objects
103 now only manipulate true T-SQL SEQUENCE types.
105.. note::
107 There can only be one IDENTITY column on the table. When using
108 ``autoincrement=True`` to enable the IDENTITY keyword, SQLAlchemy does not
109 guard against multiple columns specifying the option simultaneously. The
110 SQL Server database will instead reject the ``CREATE TABLE`` statement.
112.. note::
114 An INSERT statement which attempts to provide a value for a column that is
115 marked with IDENTITY will be rejected by SQL Server. In order for the
116 value to be accepted, a session-level option "SET IDENTITY_INSERT" must be
117 enabled. The SQLAlchemy SQL Server dialect will perform this operation
118 automatically when using a core :class:`_expression.Insert`
119 construct; if the
120 execution specifies a value for the IDENTITY column, the "IDENTITY_INSERT"
121 option will be enabled for the span of that statement's invocation.However,
122 this scenario is not high performing and should not be relied upon for
123 normal use. If a table doesn't actually require IDENTITY behavior in its
124 integer primary key column, the keyword should be disabled when creating
125 the table by ensuring that ``autoincrement=False`` is set.
127Controlling "Start" and "Increment"
128^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
130Specific control over the "start" and "increment" values for
131the ``IDENTITY`` generator are provided using the
132:paramref:`_schema.Identity.start` and :paramref:`_schema.Identity.increment`
133parameters passed to the :class:`_schema.Identity` object::
135 from sqlalchemy import Table, Integer, Column, Identity
137 test = Table(
138 'test', metadata,
139 Column(
140 'id',
141 Integer,
142 primary_key=True,
143 Identity(start=100, increment=10)
144 ),
145 Column('name', String(20))
146 )
148The CREATE TABLE for the above :class:`_schema.Table` object would be:
150.. sourcecode:: sql
152 CREATE TABLE test (
153 id INTEGER NOT NULL IDENTITY(100,10) PRIMARY KEY,
154 name VARCHAR(20) NULL,
155 )
157.. note::
159 The :class:`_schema.Identity` object supports many other parameter in
160 addition to ``start`` and ``increment``. These are not supported by
161 SQL Server and will be ignored when generating the CREATE TABLE ddl.
163.. versionchanged:: 1.3.19 The :class:`_schema.Identity` object is
164 now used to affect the
165 ``IDENTITY`` generator for a :class:`_schema.Column` under SQL Server.
166 Previously, the :class:`.Sequence` object was used. As SQL Server now
167 supports real sequences as a separate construct, :class:`.Sequence` will be
168 functional in the normal way starting from SQLAlchemy version 1.4.
171Using IDENTITY with Non-Integer numeric types
172^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
174SQL Server also allows ``IDENTITY`` to be used with ``NUMERIC`` columns. To
175implement this pattern smoothly in SQLAlchemy, the primary datatype of the
176column should remain as ``Integer``, however the underlying implementation
177type deployed to the SQL Server database can be specified as ``Numeric`` using
178:meth:`.TypeEngine.with_variant`::
180 from sqlalchemy import Column
181 from sqlalchemy import Integer
182 from sqlalchemy import Numeric
183 from sqlalchemy import String
184 from sqlalchemy.ext.declarative import declarative_base
186 Base = declarative_base()
188 class TestTable(Base):
189 __tablename__ = "test"
190 id = Column(
191 Integer().with_variant(Numeric(10, 0), "mssql"),
192 primary_key=True,
193 autoincrement=True,
194 )
195 name = Column(String)
197In the above example, ``Integer().with_variant()`` provides clear usage
198information that accurately describes the intent of the code. The general
199restriction that ``autoincrement`` only applies to ``Integer`` is established
200at the metadata level and not at the per-dialect level.
202When using the above pattern, the primary key identifier that comes back from
203the insertion of a row, which is also the value that would be assigned to an
204ORM object such as ``TestTable`` above, will be an instance of ``Decimal()``
205and not ``int`` when using SQL Server. The numeric return type of the
206:class:`_types.Numeric` type can be changed to return floats by passing False
207to :paramref:`_types.Numeric.asdecimal`. To normalize the return type of the
208above ``Numeric(10, 0)`` to return Python ints (which also support "long"
209integer values in Python 3), use :class:`_types.TypeDecorator` as follows::
211 from sqlalchemy import TypeDecorator
213 class NumericAsInteger(TypeDecorator):
214 '''normalize floating point return values into ints'''
216 impl = Numeric(10, 0, asdecimal=False)
217 cache_ok = True
219 def process_result_value(self, value, dialect):
220 if value is not None:
221 value = int(value)
222 return value
224 class TestTable(Base):
225 __tablename__ = "test"
226 id = Column(
227 Integer().with_variant(NumericAsInteger, "mssql"),
228 primary_key=True,
229 autoincrement=True,
230 )
231 name = Column(String)
234INSERT behavior
235^^^^^^^^^^^^^^^^
237Handling of the ``IDENTITY`` column at INSERT time involves two key
238techniques. The most common is being able to fetch the "last inserted value"
239for a given ``IDENTITY`` column, a process which SQLAlchemy performs
240implicitly in many cases, most importantly within the ORM.
242The process for fetching this value has several variants:
244* In the vast majority of cases, RETURNING is used in conjunction with INSERT
245 statements on SQL Server in order to get newly generated primary key values:
247 .. sourcecode:: sql
249 INSERT INTO t (x) OUTPUT inserted.id VALUES (?)
251* When RETURNING is not available or has been disabled via
252 ``implicit_returning=False``, either the ``scope_identity()`` function or
253 the ``@@identity`` variable is used; behavior varies by backend:
255 * when using PyODBC, the phrase ``; select scope_identity()`` will be
256 appended to the end of the INSERT statement; a second result set will be
257 fetched in order to receive the value. Given a table as::
259 t = Table('t', m, Column('id', Integer, primary_key=True),
260 Column('x', Integer),
261 implicit_returning=False)
263 an INSERT will look like:
265 .. sourcecode:: sql
267 INSERT INTO t (x) VALUES (?); select scope_identity()
269 * Other dialects such as pymssql will call upon
270 ``SELECT scope_identity() AS lastrowid`` subsequent to an INSERT
271 statement. If the flag ``use_scope_identity=False`` is passed to
272 :func:`_sa.create_engine`,
273 the statement ``SELECT @@identity AS lastrowid``
274 is used instead.
276A table that contains an ``IDENTITY`` column will prohibit an INSERT statement
277that refers to the identity column explicitly. The SQLAlchemy dialect will
278detect when an INSERT construct, created using a core
279:func:`_expression.insert`
280construct (not a plain string SQL), refers to the identity column, and
281in this case will emit ``SET IDENTITY_INSERT ON`` prior to the insert
282statement proceeding, and ``SET IDENTITY_INSERT OFF`` subsequent to the
283execution. Given this example::
285 m = MetaData()
286 t = Table('t', m, Column('id', Integer, primary_key=True),
287 Column('x', Integer))
288 m.create_all(engine)
290 with engine.begin() as conn:
291 conn.execute(t.insert(), {'id': 1, 'x':1}, {'id':2, 'x':2})
293The above column will be created with IDENTITY, however the INSERT statement
294we emit is specifying explicit values. In the echo output we can see
295how SQLAlchemy handles this:
297.. sourcecode:: sql
299 CREATE TABLE t (
300 id INTEGER NOT NULL IDENTITY(1,1),
301 x INTEGER NULL,
302 PRIMARY KEY (id)
303 )
305 COMMIT
306 SET IDENTITY_INSERT t ON
307 INSERT INTO t (id, x) VALUES (?, ?)
308 ((1, 1), (2, 2))
309 SET IDENTITY_INSERT t OFF
310 COMMIT
314This is an auxiliary use case suitable for testing and bulk insert scenarios.
316SEQUENCE support
317----------------
319The :class:`.Sequence` object now creates "real" sequences, i.e.,
320``CREATE SEQUENCE``. To provide compatibility with other dialects,
321:class:`.Sequence` defaults to a start value of 1, even though the
322T-SQL defaults is -9223372036854775808.
324.. versionadded:: 1.4.0
326MAX on VARCHAR / NVARCHAR
327-------------------------
329SQL Server supports the special string "MAX" within the
330:class:`_types.VARCHAR` and :class:`_types.NVARCHAR` datatypes,
331to indicate "maximum length possible". The dialect currently handles this as
332a length of "None" in the base type, rather than supplying a
333dialect-specific version of these types, so that a base type
334specified such as ``VARCHAR(None)`` can assume "unlengthed" behavior on
335more than one backend without using dialect-specific types.
337To build a SQL Server VARCHAR or NVARCHAR with MAX length, use None::
339 my_table = Table(
340 'my_table', metadata,
341 Column('my_data', VARCHAR(None)),
342 Column('my_n_data', NVARCHAR(None))
343 )
346Collation Support
347-----------------
349Character collations are supported by the base string types,
350specified by the string argument "collation"::
352 from sqlalchemy import VARCHAR
353 Column('login', VARCHAR(32, collation='Latin1_General_CI_AS'))
355When such a column is associated with a :class:`_schema.Table`, the
356CREATE TABLE statement for this column will yield::
358 login VARCHAR(32) COLLATE Latin1_General_CI_AS NULL
360LIMIT/OFFSET Support
361--------------------
363MSSQL has added support for LIMIT / OFFSET as of SQL Server 2012, via the
364"OFFSET n ROWS" and "FETCH NEXT n ROWS" clauses. SQLAlchemy supports these
365syntaxes automatically if SQL Server 2012 or greater is detected.
367.. versionchanged:: 1.4 support added for SQL Server "OFFSET n ROWS" and
368 "FETCH NEXT n ROWS" syntax.
370For statements that specify only LIMIT and no OFFSET, all versions of SQL
371Server support the TOP keyword. This syntax is used for all SQL Server
372versions when no OFFSET clause is present. A statement such as::
374 select(some_table).limit(5)
376will render similarly to::
378 SELECT TOP 5 col1, col2.. FROM table
380For versions of SQL Server prior to SQL Server 2012, a statement that uses
381LIMIT and OFFSET, or just OFFSET alone, will be rendered using the
382``ROW_NUMBER()`` window function. A statement such as::
384 select(some_table).order_by(some_table.c.col3).limit(5).offset(10)
386will render similarly to::
388 SELECT anon_1.col1, anon_1.col2 FROM (SELECT col1, col2,
389 ROW_NUMBER() OVER (ORDER BY col3) AS
390 mssql_rn FROM table WHERE t.x = :x_1) AS
391 anon_1 WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1
393Note that when using LIMIT and/or OFFSET, whether using the older
394or newer SQL Server syntaxes, the statement must have an ORDER BY as well,
395else a :class:`.CompileError` is raised.
397.. _mssql_isolation_level:
399Transaction Isolation Level
400---------------------------
402All SQL Server dialects support setting of transaction isolation level
403both via a dialect-specific parameter
404:paramref:`_sa.create_engine.isolation_level`
405accepted by :func:`_sa.create_engine`,
406as well as the :paramref:`.Connection.execution_options.isolation_level`
407argument as passed to
408:meth:`_engine.Connection.execution_options`.
409This feature works by issuing the
410command ``SET TRANSACTION ISOLATION LEVEL <level>`` for
411each new connection.
413To set isolation level using :func:`_sa.create_engine`::
415 engine = create_engine(
416 "mssql+pyodbc://scott:tiger@ms_2008",
417 isolation_level="REPEATABLE READ"
418 )
420To set using per-connection execution options::
422 connection = engine.connect()
423 connection = connection.execution_options(
424 isolation_level="READ COMMITTED"
425 )
427Valid values for ``isolation_level`` include:
429* ``AUTOCOMMIT`` - pyodbc / pymssql-specific
430* ``READ COMMITTED``
431* ``READ UNCOMMITTED``
432* ``REPEATABLE READ``
433* ``SERIALIZABLE``
434* ``SNAPSHOT`` - specific to SQL Server
436There are also more options for isolation level configurations, such as
437"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply
438different isolation level settings. See the discussion at
439:ref:`dbapi_autocommit` for background.
441.. seealso::
443 :ref:`dbapi_autocommit`
445.. _mssql_reset_on_return:
447Temporary Table / Resource Reset for Connection Pooling
448-------------------------------------------------------
450The :class:`.QueuePool` connection pool implementation used
451by the SQLAlchemy :class:`_sa.Engine` object includes
452:ref:`reset on return <pool_reset_on_return>` behavior that will invoke
453the DBAPI ``.rollback()`` method when connections are returned to the pool.
454While this rollback will clear out the immediate state used by the previous
455transaction, it does not cover a wider range of session-level state, including
456temporary tables as well as other server state such as prepared statement
457handles and statement caches. An undocumented SQL Server procedure known
458as ``sp_reset_connection`` is known to be a workaround for this issue which
459will reset most of the session state that builds up on a connection, including
460temporary tables.
462To install ``sp_reset_connection`` as the means of performing reset-on-return,
463the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated in the
464example below (**requires SQLAlchemy 1.4.43 or greater**). The
465:paramref:`_sa.create_engine.pool_reset_on_return` parameter is set to ``None``
466so that the custom scheme can replace the default behavior completely. The
467custom hook implementation calls ``.rollback()`` in any case, as it's usually
468important that the DBAPI's own tracking of commit/rollback will remain
469consistent with the state of the transaction::
471 from sqlalchemy import create_engine
472 from sqlalchemy import event
474 mssql_engine = create_engine(
475 "mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server",
477 # disable default reset-on-return scheme
478 pool_reset_on_return=None,
479 )
482 @event.listens_for(mssql_engine, "reset")
483 def _reset_mssql(dbapi_connection, connection_record, reset_state):
484 dbapi_connection.execute("{call sys.sp_reset_connection}")
486 # so that the DBAPI itself knows that the connection has been
487 # reset
488 dbapi_connection.rollback()
490.. versionchanged:: 1.4.43 Ensured the :meth:`.PoolEvents.reset` event
491 is invoked for all "reset" occurrences, so that it's appropriate
492 as a place for custom "reset" handlers. Previous schemes which
493 use the :meth:`.PoolEvents.checkin` handler remain usable as well.
495.. seealso::
497 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation
499Nullability
500-----------
501MSSQL has support for three levels of column nullability. The default
502nullability allows nulls and is explicit in the CREATE TABLE
503construct::
505 name VARCHAR(20) NULL
507If ``nullable=None`` is specified then no specification is made. In
508other words the database's configured default is used. This will
509render::
511 name VARCHAR(20)
513If ``nullable`` is ``True`` or ``False`` then the column will be
514``NULL`` or ``NOT NULL`` respectively.
516Date / Time Handling
517--------------------
518DATE and TIME are supported. Bind parameters are converted
519to datetime.datetime() objects as required by most MSSQL drivers,
520and results are processed from strings if needed.
521The DATE and TIME types are not available for MSSQL 2005 and
522previous - if a server version below 2008 is detected, DDL
523for these types will be issued as DATETIME.
525.. _mssql_large_type_deprecation:
527Large Text/Binary Type Deprecation
528----------------------------------
530Per
531`SQL Server 2012/2014 Documentation <https://technet.microsoft.com/en-us/library/ms187993.aspx>`_,
532the ``NTEXT``, ``TEXT`` and ``IMAGE`` datatypes are to be removed from SQL
533Server in a future release. SQLAlchemy normally relates these types to the
534:class:`.UnicodeText`, :class:`_expression.TextClause` and
535:class:`.LargeBinary` datatypes.
537In order to accommodate this change, a new flag ``deprecate_large_types``
538is added to the dialect, which will be automatically set based on detection
539of the server version in use, if not otherwise set by the user. The
540behavior of this flag is as follows:
542* When this flag is ``True``, the :class:`.UnicodeText`,
543 :class:`_expression.TextClause` and
544 :class:`.LargeBinary` datatypes, when used to render DDL, will render the
545 types ``NVARCHAR(max)``, ``VARCHAR(max)``, and ``VARBINARY(max)``,
546 respectively. This is a new behavior as of the addition of this flag.
548* When this flag is ``False``, the :class:`.UnicodeText`,
549 :class:`_expression.TextClause` and
550 :class:`.LargeBinary` datatypes, when used to render DDL, will render the
551 types ``NTEXT``, ``TEXT``, and ``IMAGE``,
552 respectively. This is the long-standing behavior of these types.
554* The flag begins with the value ``None``, before a database connection is
555 established. If the dialect is used to render DDL without the flag being
556 set, it is interpreted the same as ``False``.
558* On first connection, the dialect detects if SQL Server version 2012 or
559 greater is in use; if the flag is still at ``None``, it sets it to ``True``
560 or ``False`` based on whether 2012 or greater is detected.
562* The flag can be set to either ``True`` or ``False`` when the dialect
563 is created, typically via :func:`_sa.create_engine`::
565 eng = create_engine("mssql+pymssql://user:pass@host/db",
566 deprecate_large_types=True)
568* Complete control over whether the "old" or "new" types are rendered is
569 available in all SQLAlchemy versions by using the UPPERCASE type objects
570 instead: :class:`_types.NVARCHAR`, :class:`_types.VARCHAR`,
571 :class:`_types.VARBINARY`, :class:`_types.TEXT`, :class:`_mssql.NTEXT`,
572 :class:`_mssql.IMAGE`
573 will always remain fixed and always output exactly that
574 type.
576.. versionadded:: 1.0.0
578.. _multipart_schema_names:
580Multipart Schema Names
581----------------------
583SQL Server schemas sometimes require multiple parts to their "schema"
584qualifier, that is, including the database name and owner name as separate
585tokens, such as ``mydatabase.dbo.some_table``. These multipart names can be set
586at once using the :paramref:`_schema.Table.schema` argument of
587:class:`_schema.Table`::
589 Table(
590 "some_table", metadata,
591 Column("q", String(50)),
592 schema="mydatabase.dbo"
593 )
595When performing operations such as table or component reflection, a schema
596argument that contains a dot will be split into separate
597"database" and "owner" components in order to correctly query the SQL
598Server information schema tables, as these two values are stored separately.
599Additionally, when rendering the schema name for DDL or SQL, the two
600components will be quoted separately for case sensitive names and other
601special characters. Given an argument as below::
603 Table(
604 "some_table", metadata,
605 Column("q", String(50)),
606 schema="MyDataBase.dbo"
607 )
609The above schema would be rendered as ``[MyDataBase].dbo``, and also in
610reflection, would be reflected using "dbo" as the owner and "MyDataBase"
611as the database name.
613To control how the schema name is broken into database / owner,
614specify brackets (which in SQL Server are quoting characters) in the name.
615Below, the "owner" will be considered as ``MyDataBase.dbo`` and the
616"database" will be None::
618 Table(
619 "some_table", metadata,
620 Column("q", String(50)),
621 schema="[MyDataBase.dbo]"
622 )
624To individually specify both database and owner name with special characters
625or embedded dots, use two sets of brackets::
627 Table(
628 "some_table", metadata,
629 Column("q", String(50)),
630 schema="[MyDataBase.Period].[MyOwner.Dot]"
631 )
634.. versionchanged:: 1.2 the SQL Server dialect now treats brackets as
635 identifier delimiters splitting the schema into separate database
636 and owner tokens, to allow dots within either name itself.
638.. _legacy_schema_rendering:
640Legacy Schema Mode
641------------------
643Very old versions of the MSSQL dialect introduced the behavior such that a
644schema-qualified table would be auto-aliased when used in a
645SELECT statement; given a table::
647 account_table = Table(
648 'account', metadata,
649 Column('id', Integer, primary_key=True),
650 Column('info', String(100)),
651 schema="customer_schema"
652 )
654this legacy mode of rendering would assume that "customer_schema.account"
655would not be accepted by all parts of the SQL statement, as illustrated
656below::
658 >>> eng = create_engine("mssql+pymssql://mydsn", legacy_schema_aliasing=True)
659 >>> print(account_table.select().compile(eng))
660 SELECT account_1.id, account_1.info
661 FROM customer_schema.account AS account_1
663This mode of behavior is now off by default, as it appears to have served
664no purpose; however in the case that legacy applications rely upon it,
665it is available using the ``legacy_schema_aliasing`` argument to
666:func:`_sa.create_engine` as illustrated above.
668.. versionchanged:: 1.1 the ``legacy_schema_aliasing`` flag introduced
669 in version 1.0.5 to allow disabling of legacy mode for schemas now
670 defaults to False.
672.. deprecated:: 1.4
674 The ``legacy_schema_aliasing`` flag is now
675 deprecated and will be removed in a future release.
677.. _mssql_indexes:
679Clustered Index Support
680-----------------------
682The MSSQL dialect supports clustered indexes (and primary keys) via the
683``mssql_clustered`` option. This option is available to :class:`.Index`,
684:class:`.UniqueConstraint`. and :class:`.PrimaryKeyConstraint`.
686To generate a clustered index::
688 Index("my_index", table.c.x, mssql_clustered=True)
690which renders the index as ``CREATE CLUSTERED INDEX my_index ON table (x)``.
692To generate a clustered primary key use::
694 Table('my_table', metadata,
695 Column('x', ...),
696 Column('y', ...),
697 PrimaryKeyConstraint("x", "y", mssql_clustered=True))
699which will render the table, for example, as::
701 CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL,
702 PRIMARY KEY CLUSTERED (x, y))
704Similarly, we can generate a clustered unique constraint using::
706 Table('my_table', metadata,
707 Column('x', ...),
708 Column('y', ...),
709 PrimaryKeyConstraint("x"),
710 UniqueConstraint("y", mssql_clustered=True),
711 )
713To explicitly request a non-clustered primary key (for example, when
714a separate clustered index is desired), use::
716 Table('my_table', metadata,
717 Column('x', ...),
718 Column('y', ...),
719 PrimaryKeyConstraint("x", "y", mssql_clustered=False))
721which will render the table, for example, as::
723 CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL,
724 PRIMARY KEY NONCLUSTERED (x, y))
726.. versionchanged:: 1.1 the ``mssql_clustered`` option now defaults
727 to None, rather than False. ``mssql_clustered=False`` now explicitly
728 renders the NONCLUSTERED clause, whereas None omits the CLUSTERED
729 clause entirely, allowing SQL Server defaults to take effect.
732MSSQL-Specific Index Options
733-----------------------------
735In addition to clustering, the MSSQL dialect supports other special options
736for :class:`.Index`.
738INCLUDE
739^^^^^^^
741The ``mssql_include`` option renders INCLUDE(colname) for the given string
742names::
744 Index("my_index", table.c.x, mssql_include=['y'])
746would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)``
748.. _mssql_index_where:
750Filtered Indexes
751^^^^^^^^^^^^^^^^
753The ``mssql_where`` option renders WHERE(condition) for the given string
754names::
756 Index("my_index", table.c.x, mssql_where=table.c.x > 10)
758would render the index as ``CREATE INDEX my_index ON table (x) WHERE x > 10``.
760.. versionadded:: 1.3.4
762Index ordering
763^^^^^^^^^^^^^^
765Index ordering is available via functional expressions, such as::
767 Index("my_index", table.c.x.desc())
769would render the index as ``CREATE INDEX my_index ON table (x DESC)``
771.. seealso::
773 :ref:`schema_indexes_functional`
775Compatibility Levels
776--------------------
777MSSQL supports the notion of setting compatibility levels at the
778database level. This allows, for instance, to run a database that
779is compatible with SQL2000 while running on a SQL2005 database
780server. ``server_version_info`` will always return the database
781server version information (in this case SQL2005) and not the
782compatibility level information. Because of this, if running under
783a backwards compatibility mode SQLAlchemy may attempt to use T-SQL
784statements that are unable to be parsed by the database server.
786Triggers
787--------
789SQLAlchemy by default uses OUTPUT INSERTED to get at newly
790generated primary key values via IDENTITY columns or other
791server side defaults. MS-SQL does not
792allow the usage of OUTPUT INSERTED on tables that have triggers.
793To disable the usage of OUTPUT INSERTED on a per-table basis,
794specify ``implicit_returning=False`` for each :class:`_schema.Table`
795which has triggers::
797 Table('mytable', metadata,
798 Column('id', Integer, primary_key=True),
799 # ...,
800 implicit_returning=False
801 )
803Declarative form::
805 class MyClass(Base):
806 # ...
807 __table_args__ = {'implicit_returning':False}
810This option can also be specified engine-wide using the
811``implicit_returning=False`` argument on :func:`_sa.create_engine`.
813.. _mssql_rowcount_versioning:
815Rowcount Support / ORM Versioning
816---------------------------------
818The SQL Server drivers may have limited ability to return the number
819of rows updated from an UPDATE or DELETE statement.
821As of this writing, the PyODBC driver is not able to return a rowcount when
822OUTPUT INSERTED is used. This impacts the SQLAlchemy ORM's versioning feature
823in many cases where server-side value generators are in use in that while the
824versioning operations can succeed, the ORM cannot always check that an UPDATE
825or DELETE statement matched the number of rows expected, which is how it
826verifies that the version identifier matched. When this condition occurs, a
827warning will be emitted but the operation will proceed.
829The use of OUTPUT INSERTED can be disabled by setting the
830:paramref:`_schema.Table.implicit_returning` flag to ``False`` on a particular
831:class:`_schema.Table`, which in declarative looks like::
833 class MyTable(Base):
834 __tablename__ = 'mytable'
835 id = Column(Integer, primary_key=True)
836 stuff = Column(String(10))
837 timestamp = Column(TIMESTAMP(), default=text('DEFAULT'))
838 __mapper_args__ = {
839 'version_id_col': timestamp,
840 'version_id_generator': False,
841 }
842 __table_args__ = {
843 'implicit_returning': False
844 }
846Enabling Snapshot Isolation
847---------------------------
849SQL Server has a default transaction
850isolation mode that locks entire tables, and causes even mildly concurrent
851applications to have long held locks and frequent deadlocks.
852Enabling snapshot isolation for the database as a whole is recommended
853for modern levels of concurrency support. This is accomplished via the
854following ALTER DATABASE commands executed at the SQL prompt::
856 ALTER DATABASE MyDatabase SET ALLOW_SNAPSHOT_ISOLATION ON
858 ALTER DATABASE MyDatabase SET READ_COMMITTED_SNAPSHOT ON
860Background on SQL Server snapshot isolation is available at
861https://msdn.microsoft.com/en-us/library/ms175095.aspx.
863""" # noqa
865import codecs
866import datetime
867import operator
868import re
870from . import information_schema as ischema
871from .json import JSON
872from .json import JSONIndexType
873from .json import JSONPathType
874from ... import exc
875from ... import Identity
876from ... import schema as sa_schema
877from ... import Sequence
878from ... import sql
879from ... import text
880from ... import types as sqltypes
881from ... import util
882from ...engine import cursor as _cursor
883from ...engine import default
884from ...engine import reflection
885from ...sql import coercions
886from ...sql import compiler
887from ...sql import elements
888from ...sql import expression
889from ...sql import func
890from ...sql import quoted_name
891from ...sql import roles
892from ...sql import util as sql_util
893from ...types import BIGINT
894from ...types import BINARY
895from ...types import CHAR
896from ...types import DATE
897from ...types import DATETIME
898from ...types import DECIMAL
899from ...types import FLOAT
900from ...types import INTEGER
901from ...types import NCHAR
902from ...types import NUMERIC
903from ...types import NVARCHAR
904from ...types import SMALLINT
905from ...types import TEXT
906from ...types import VARCHAR
907from ...util import compat
908from ...util import update_wrapper
909from ...util.langhelpers import public_factory
912# https://sqlserverbuilds.blogspot.com/
913MS_2017_VERSION = (14,)
914MS_2016_VERSION = (13,)
915MS_2014_VERSION = (12,)
916MS_2012_VERSION = (11,)
917MS_2008_VERSION = (10,)
918MS_2005_VERSION = (9,)
919MS_2000_VERSION = (8,)
921RESERVED_WORDS = set(
922 [
923 "add",
924 "all",
925 "alter",
926 "and",
927 "any",
928 "as",
929 "asc",
930 "authorization",
931 "backup",
932 "begin",
933 "between",
934 "break",
935 "browse",
936 "bulk",
937 "by",
938 "cascade",
939 "case",
940 "check",
941 "checkpoint",
942 "close",
943 "clustered",
944 "coalesce",
945 "collate",
946 "column",
947 "commit",
948 "compute",
949 "constraint",
950 "contains",
951 "containstable",
952 "continue",
953 "convert",
954 "create",
955 "cross",
956 "current",
957 "current_date",
958 "current_time",
959 "current_timestamp",
960 "current_user",
961 "cursor",
962 "database",
963 "dbcc",
964 "deallocate",
965 "declare",
966 "default",
967 "delete",
968 "deny",
969 "desc",
970 "disk",
971 "distinct",
972 "distributed",
973 "double",
974 "drop",
975 "dump",
976 "else",
977 "end",
978 "errlvl",
979 "escape",
980 "except",
981 "exec",
982 "execute",
983 "exists",
984 "exit",
985 "external",
986 "fetch",
987 "file",
988 "fillfactor",
989 "for",
990 "foreign",
991 "freetext",
992 "freetexttable",
993 "from",
994 "full",
995 "function",
996 "goto",
997 "grant",
998 "group",
999 "having",
1000 "holdlock",
1001 "identity",
1002 "identity_insert",
1003 "identitycol",
1004 "if",
1005 "in",
1006 "index",
1007 "inner",
1008 "insert",
1009 "intersect",
1010 "into",
1011 "is",
1012 "join",
1013 "key",
1014 "kill",
1015 "left",
1016 "like",
1017 "lineno",
1018 "load",
1019 "merge",
1020 "national",
1021 "nocheck",
1022 "nonclustered",
1023 "not",
1024 "null",
1025 "nullif",
1026 "of",
1027 "off",
1028 "offsets",
1029 "on",
1030 "open",
1031 "opendatasource",
1032 "openquery",
1033 "openrowset",
1034 "openxml",
1035 "option",
1036 "or",
1037 "order",
1038 "outer",
1039 "over",
1040 "percent",
1041 "pivot",
1042 "plan",
1043 "precision",
1044 "primary",
1045 "print",
1046 "proc",
1047 "procedure",
1048 "public",
1049 "raiserror",
1050 "read",
1051 "readtext",
1052 "reconfigure",
1053 "references",
1054 "replication",
1055 "restore",
1056 "restrict",
1057 "return",
1058 "revert",
1059 "revoke",
1060 "right",
1061 "rollback",
1062 "rowcount",
1063 "rowguidcol",
1064 "rule",
1065 "save",
1066 "schema",
1067 "securityaudit",
1068 "select",
1069 "session_user",
1070 "set",
1071 "setuser",
1072 "shutdown",
1073 "some",
1074 "statistics",
1075 "system_user",
1076 "table",
1077 "tablesample",
1078 "textsize",
1079 "then",
1080 "to",
1081 "top",
1082 "tran",
1083 "transaction",
1084 "trigger",
1085 "truncate",
1086 "tsequal",
1087 "union",
1088 "unique",
1089 "unpivot",
1090 "update",
1091 "updatetext",
1092 "use",
1093 "user",
1094 "values",
1095 "varying",
1096 "view",
1097 "waitfor",
1098 "when",
1099 "where",
1100 "while",
1101 "with",
1102 "writetext",
1103 ]
1104)
1107class REAL(sqltypes.REAL):
1108 __visit_name__ = "REAL"
1110 def __init__(self, **kw):
1111 # REAL is a synonym for FLOAT(24) on SQL server.
1112 # it is only accepted as the word "REAL" in DDL, the numeric
1113 # precision value is not allowed to be present
1114 kw.setdefault("precision", 24)
1115 super(REAL, self).__init__(**kw)
1118class TINYINT(sqltypes.Integer):
1119 __visit_name__ = "TINYINT"
1122# MSSQL DATE/TIME types have varied behavior, sometimes returning
1123# strings. MSDate/TIME check for everything, and always
1124# filter bind parameters into datetime objects (required by pyodbc,
1125# not sure about other dialects).
1128class _MSDate(sqltypes.Date):
1129 def bind_processor(self, dialect):
1130 def process(value):
1131 if type(value) == datetime.date:
1132 return datetime.datetime(value.year, value.month, value.day)
1133 else:
1134 return value
1136 return process
1138 _reg = re.compile(r"(\d+)-(\d+)-(\d+)")
1140 def result_processor(self, dialect, coltype):
1141 def process(value):
1142 if isinstance(value, datetime.datetime):
1143 return value.date()
1144 elif isinstance(value, util.string_types):
1145 m = self._reg.match(value)
1146 if not m:
1147 raise ValueError(
1148 "could not parse %r as a date value" % (value,)
1149 )
1150 return datetime.date(*[int(x or 0) for x in m.groups()])
1151 else:
1152 return value
1154 return process
1157class TIME(sqltypes.TIME):
1158 def __init__(self, precision=None, **kwargs):
1159 self.precision = precision
1160 super(TIME, self).__init__()
1162 __zero_date = datetime.date(1900, 1, 1)
1164 def bind_processor(self, dialect):
1165 def process(value):
1166 if isinstance(value, datetime.datetime):
1167 value = datetime.datetime.combine(
1168 self.__zero_date, value.time()
1169 )
1170 elif isinstance(value, datetime.time):
1171 """issue #5339
1172 per: https://github.com/mkleehammer/pyodbc/wiki/Tips-and-Tricks-by-Database-Platform#time-columns
1173 pass TIME value as string
1174 """ # noqa
1175 value = str(value)
1176 return value
1178 return process
1180 _reg = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d{0,6}))?")
1182 def result_processor(self, dialect, coltype):
1183 def process(value):
1184 if isinstance(value, datetime.datetime):
1185 return value.time()
1186 elif isinstance(value, util.string_types):
1187 m = self._reg.match(value)
1188 if not m:
1189 raise ValueError(
1190 "could not parse %r as a time value" % (value,)
1191 )
1192 return datetime.time(*[int(x or 0) for x in m.groups()])
1193 else:
1194 return value
1196 return process
1199_MSTime = TIME
1202class _BASETIMEIMPL(TIME):
1203 __visit_name__ = "_BASETIMEIMPL"
1206class _DateTimeBase(object):
1207 def bind_processor(self, dialect):
1208 def process(value):
1209 if type(value) == datetime.date:
1210 return datetime.datetime(value.year, value.month, value.day)
1211 else:
1212 return value
1214 return process
1217class _MSDateTime(_DateTimeBase, sqltypes.DateTime):
1218 pass
1221class SMALLDATETIME(_DateTimeBase, sqltypes.DateTime):
1222 __visit_name__ = "SMALLDATETIME"
1225class DATETIME2(_DateTimeBase, sqltypes.DateTime):
1226 __visit_name__ = "DATETIME2"
1228 def __init__(self, precision=None, **kw):
1229 super(DATETIME2, self).__init__(**kw)
1230 self.precision = precision
1233class DATETIMEOFFSET(_DateTimeBase, sqltypes.DateTime):
1234 __visit_name__ = "DATETIMEOFFSET"
1236 def __init__(self, precision=None, **kw):
1237 super(DATETIMEOFFSET, self).__init__(**kw)
1238 self.precision = precision
1241class _UnicodeLiteral(object):
1242 def literal_processor(self, dialect):
1243 def process(value):
1245 value = value.replace("'", "''")
1247 if dialect.identifier_preparer._double_percents:
1248 value = value.replace("%", "%%")
1250 return "N'%s'" % value
1252 return process
1255class _MSUnicode(_UnicodeLiteral, sqltypes.Unicode):
1256 pass
1259class _MSUnicodeText(_UnicodeLiteral, sqltypes.UnicodeText):
1260 pass
1263class TIMESTAMP(sqltypes._Binary):
1264 """Implement the SQL Server TIMESTAMP type.
1266 Note this is **completely different** than the SQL Standard
1267 TIMESTAMP type, which is not supported by SQL Server. It
1268 is a read-only datatype that does not support INSERT of values.
1270 .. versionadded:: 1.2
1272 .. seealso::
1274 :class:`_mssql.ROWVERSION`
1276 """
1278 __visit_name__ = "TIMESTAMP"
1280 # expected by _Binary to be present
1281 length = None
1283 def __init__(self, convert_int=False):
1284 """Construct a TIMESTAMP or ROWVERSION type.
1286 :param convert_int: if True, binary integer values will
1287 be converted to integers on read.
1289 .. versionadded:: 1.2
1291 """
1292 self.convert_int = convert_int
1294 def result_processor(self, dialect, coltype):
1295 super_ = super(TIMESTAMP, self).result_processor(dialect, coltype)
1296 if self.convert_int:
1298 def process(value):
1299 value = super_(value)
1300 if value is not None:
1301 # https://stackoverflow.com/a/30403242/34549
1302 value = int(codecs.encode(value, "hex"), 16)
1303 return value
1305 return process
1306 else:
1307 return super_
1310class ROWVERSION(TIMESTAMP):
1311 """Implement the SQL Server ROWVERSION type.
1313 The ROWVERSION datatype is a SQL Server synonym for the TIMESTAMP
1314 datatype, however current SQL Server documentation suggests using
1315 ROWVERSION for new datatypes going forward.
1317 The ROWVERSION datatype does **not** reflect (e.g. introspect) from the
1318 database as itself; the returned datatype will be
1319 :class:`_mssql.TIMESTAMP`.
1321 This is a read-only datatype that does not support INSERT of values.
1323 .. versionadded:: 1.2
1325 .. seealso::
1327 :class:`_mssql.TIMESTAMP`
1329 """
1331 __visit_name__ = "ROWVERSION"
1334class NTEXT(sqltypes.UnicodeText):
1336 """MSSQL NTEXT type, for variable-length unicode text up to 2^30
1337 characters."""
1339 __visit_name__ = "NTEXT"
1342class VARBINARY(sqltypes.VARBINARY, sqltypes.LargeBinary):
1343 """The MSSQL VARBINARY type.
1345 This type adds additional features to the core :class:`_types.VARBINARY`
1346 type, including "deprecate_large_types" mode where
1347 either ``VARBINARY(max)`` or IMAGE is rendered, as well as the SQL
1348 Server ``FILESTREAM`` option.
1350 .. versionadded:: 1.0.0
1352 .. seealso::
1354 :ref:`mssql_large_type_deprecation`
1356 """
1358 __visit_name__ = "VARBINARY"
1360 def __init__(self, length=None, filestream=False):
1361 """
1362 Construct a VARBINARY type.
1364 :param length: optional, a length for the column for use in
1365 DDL statements, for those binary types that accept a length,
1366 such as the MySQL BLOB type.
1368 :param filestream=False: if True, renders the ``FILESTREAM`` keyword
1369 in the table definition. In this case ``length`` must be ``None``
1370 or ``'max'``.
1372 .. versionadded:: 1.4.31
1374 """
1376 self.filestream = filestream
1377 if self.filestream and length not in (None, "max"):
1378 raise ValueError(
1379 "length must be None or 'max' when setting filestream"
1380 )
1381 super(VARBINARY, self).__init__(length=length)
1384class IMAGE(sqltypes.LargeBinary):
1385 __visit_name__ = "IMAGE"
1388class XML(sqltypes.Text):
1389 """MSSQL XML type.
1391 This is a placeholder type for reflection purposes that does not include
1392 any Python-side datatype support. It also does not currently support
1393 additional arguments, such as "CONTENT", "DOCUMENT",
1394 "xml_schema_collection".
1396 .. versionadded:: 1.1.11
1398 """
1400 __visit_name__ = "XML"
1403class BIT(sqltypes.Boolean):
1404 """MSSQL BIT type.
1406 Both pyodbc and pymssql return values from BIT columns as
1407 Python <class 'bool'> so just subclass Boolean.
1409 """
1411 __visit_name__ = "BIT"
1414class MONEY(sqltypes.TypeEngine):
1415 __visit_name__ = "MONEY"
1418class SMALLMONEY(sqltypes.TypeEngine):
1419 __visit_name__ = "SMALLMONEY"
1422class UNIQUEIDENTIFIER(sqltypes.TypeEngine):
1423 __visit_name__ = "UNIQUEIDENTIFIER"
1426class SQL_VARIANT(sqltypes.TypeEngine):
1427 __visit_name__ = "SQL_VARIANT"
1430class TryCast(sql.elements.Cast):
1431 """Represent a SQL Server TRY_CAST expression."""
1433 __visit_name__ = "try_cast"
1435 stringify_dialect = "mssql"
1436 inherit_cache = True
1438 def __init__(self, *arg, **kw):
1439 """Create a TRY_CAST expression.
1441 :class:`.TryCast` is a subclass of SQLAlchemy's :class:`.Cast`
1442 construct, and works in the same way, except that the SQL expression
1443 rendered is "TRY_CAST" rather than "CAST"::
1445 from sqlalchemy import select
1446 from sqlalchemy import Numeric
1447 from sqlalchemy.dialects.mssql import try_cast
1449 stmt = select(
1450 try_cast(product_table.c.unit_price, Numeric(10, 4))
1451 )
1453 The above would render::
1455 SELECT TRY_CAST (product_table.unit_price AS NUMERIC(10, 4))
1456 FROM product_table
1458 .. versionadded:: 1.3.7
1460 """
1461 super(TryCast, self).__init__(*arg, **kw)
1464try_cast = public_factory(TryCast, ".dialects.mssql.try_cast")
1466# old names.
1467MSDateTime = _MSDateTime
1468MSDate = _MSDate
1469MSReal = REAL
1470MSTinyInteger = TINYINT
1471MSTime = TIME
1472MSSmallDateTime = SMALLDATETIME
1473MSDateTime2 = DATETIME2
1474MSDateTimeOffset = DATETIMEOFFSET
1475MSText = TEXT
1476MSNText = NTEXT
1477MSString = VARCHAR
1478MSNVarchar = NVARCHAR
1479MSChar = CHAR
1480MSNChar = NCHAR
1481MSBinary = BINARY
1482MSVarBinary = VARBINARY
1483MSImage = IMAGE
1484MSBit = BIT
1485MSMoney = MONEY
1486MSSmallMoney = SMALLMONEY
1487MSUniqueIdentifier = UNIQUEIDENTIFIER
1488MSVariant = SQL_VARIANT
1490ischema_names = {
1491 "int": INTEGER,
1492 "bigint": BIGINT,
1493 "smallint": SMALLINT,
1494 "tinyint": TINYINT,
1495 "varchar": VARCHAR,
1496 "nvarchar": NVARCHAR,
1497 "char": CHAR,
1498 "nchar": NCHAR,
1499 "text": TEXT,
1500 "ntext": NTEXT,
1501 "decimal": DECIMAL,
1502 "numeric": NUMERIC,
1503 "float": FLOAT,
1504 "datetime": DATETIME,
1505 "datetime2": DATETIME2,
1506 "datetimeoffset": DATETIMEOFFSET,
1507 "date": DATE,
1508 "time": TIME,
1509 "smalldatetime": SMALLDATETIME,
1510 "binary": BINARY,
1511 "varbinary": VARBINARY,
1512 "bit": BIT,
1513 "real": REAL,
1514 "image": IMAGE,
1515 "xml": XML,
1516 "timestamp": TIMESTAMP,
1517 "money": MONEY,
1518 "smallmoney": SMALLMONEY,
1519 "uniqueidentifier": UNIQUEIDENTIFIER,
1520 "sql_variant": SQL_VARIANT,
1521}
1524class MSTypeCompiler(compiler.GenericTypeCompiler):
1525 def _extend(self, spec, type_, length=None):
1526 """Extend a string-type declaration with standard SQL
1527 COLLATE annotations.
1529 """
1531 if getattr(type_, "collation", None):
1532 collation = "COLLATE %s" % type_.collation
1533 else:
1534 collation = None
1536 if not length:
1537 length = type_.length
1539 if length:
1540 spec = spec + "(%s)" % length
1542 return " ".join([c for c in (spec, collation) if c is not None])
1544 def visit_FLOAT(self, type_, **kw):
1545 precision = getattr(type_, "precision", None)
1546 if precision is None:
1547 return "FLOAT"
1548 else:
1549 return "FLOAT(%(precision)s)" % {"precision": precision}
1551 def visit_TINYINT(self, type_, **kw):
1552 return "TINYINT"
1554 def visit_TIME(self, type_, **kw):
1555 precision = getattr(type_, "precision", None)
1556 if precision is not None:
1557 return "TIME(%s)" % precision
1558 else:
1559 return "TIME"
1561 def visit_TIMESTAMP(self, type_, **kw):
1562 return "TIMESTAMP"
1564 def visit_ROWVERSION(self, type_, **kw):
1565 return "ROWVERSION"
1567 def visit_datetime(self, type_, **kw):
1568 if type_.timezone:
1569 return self.visit_DATETIMEOFFSET(type_, **kw)
1570 else:
1571 return self.visit_DATETIME(type_, **kw)
1573 def visit_DATETIMEOFFSET(self, type_, **kw):
1574 precision = getattr(type_, "precision", None)
1575 if precision is not None:
1576 return "DATETIMEOFFSET(%s)" % type_.precision
1577 else:
1578 return "DATETIMEOFFSET"
1580 def visit_DATETIME2(self, type_, **kw):
1581 precision = getattr(type_, "precision", None)
1582 if precision is not None:
1583 return "DATETIME2(%s)" % precision
1584 else:
1585 return "DATETIME2"
1587 def visit_SMALLDATETIME(self, type_, **kw):
1588 return "SMALLDATETIME"
1590 def visit_unicode(self, type_, **kw):
1591 return self.visit_NVARCHAR(type_, **kw)
1593 def visit_text(self, type_, **kw):
1594 if self.dialect.deprecate_large_types:
1595 return self.visit_VARCHAR(type_, **kw)
1596 else:
1597 return self.visit_TEXT(type_, **kw)
1599 def visit_unicode_text(self, type_, **kw):
1600 if self.dialect.deprecate_large_types:
1601 return self.visit_NVARCHAR(type_, **kw)
1602 else:
1603 return self.visit_NTEXT(type_, **kw)
1605 def visit_NTEXT(self, type_, **kw):
1606 return self._extend("NTEXT", type_)
1608 def visit_TEXT(self, type_, **kw):
1609 return self._extend("TEXT", type_)
1611 def visit_VARCHAR(self, type_, **kw):
1612 return self._extend("VARCHAR", type_, length=type_.length or "max")
1614 def visit_CHAR(self, type_, **kw):
1615 return self._extend("CHAR", type_)
1617 def visit_NCHAR(self, type_, **kw):
1618 return self._extend("NCHAR", type_)
1620 def visit_NVARCHAR(self, type_, **kw):
1621 return self._extend("NVARCHAR", type_, length=type_.length or "max")
1623 def visit_date(self, type_, **kw):
1624 if self.dialect.server_version_info < MS_2008_VERSION:
1625 return self.visit_DATETIME(type_, **kw)
1626 else:
1627 return self.visit_DATE(type_, **kw)
1629 def visit__BASETIMEIMPL(self, type_, **kw):
1630 return self.visit_time(type_, **kw)
1632 def visit_time(self, type_, **kw):
1633 if self.dialect.server_version_info < MS_2008_VERSION:
1634 return self.visit_DATETIME(type_, **kw)
1635 else:
1636 return self.visit_TIME(type_, **kw)
1638 def visit_large_binary(self, type_, **kw):
1639 if self.dialect.deprecate_large_types:
1640 return self.visit_VARBINARY(type_, **kw)
1641 else:
1642 return self.visit_IMAGE(type_, **kw)
1644 def visit_IMAGE(self, type_, **kw):
1645 return "IMAGE"
1647 def visit_XML(self, type_, **kw):
1648 return "XML"
1650 def visit_VARBINARY(self, type_, **kw):
1651 text = self._extend("VARBINARY", type_, length=type_.length or "max")
1652 if getattr(type_, "filestream", False):
1653 text += " FILESTREAM"
1654 return text
1656 def visit_boolean(self, type_, **kw):
1657 return self.visit_BIT(type_)
1659 def visit_BIT(self, type_, **kw):
1660 return "BIT"
1662 def visit_JSON(self, type_, **kw):
1663 # this is a bit of a break with SQLAlchemy's convention of
1664 # "UPPERCASE name goes to UPPERCASE type name with no modification"
1665 return self._extend("NVARCHAR", type_, length="max")
1667 def visit_MONEY(self, type_, **kw):
1668 return "MONEY"
1670 def visit_SMALLMONEY(self, type_, **kw):
1671 return "SMALLMONEY"
1673 def visit_UNIQUEIDENTIFIER(self, type_, **kw):
1674 return "UNIQUEIDENTIFIER"
1676 def visit_SQL_VARIANT(self, type_, **kw):
1677 return "SQL_VARIANT"
1680class MSExecutionContext(default.DefaultExecutionContext):
1681 _enable_identity_insert = False
1682 _select_lastrowid = False
1683 _lastrowid = None
1684 _rowcount = None
1686 def _opt_encode(self, statement):
1688 if not self.dialect.supports_unicode_statements:
1689 encoded = self.dialect._encoder(statement)[0]
1690 else:
1691 encoded = statement
1693 if self.compiled and self.compiled.schema_translate_map:
1695 rst = self.compiled.preparer._render_schema_translates
1696 encoded = rst(encoded, self.compiled.schema_translate_map)
1698 return encoded
1700 def pre_exec(self):
1701 """Activate IDENTITY_INSERT if needed."""
1703 if self.isinsert:
1704 tbl = self.compiled.compile_state.dml_table
1705 id_column = tbl._autoincrement_column
1706 insert_has_identity = (id_column is not None) and (
1707 not isinstance(id_column.default, Sequence)
1708 )
1710 if insert_has_identity:
1711 compile_state = self.compiled.dml_compile_state
1712 self._enable_identity_insert = (
1713 id_column.key in self.compiled_parameters[0]
1714 ) or (
1715 compile_state._dict_parameters
1716 and (id_column.key in compile_state._insert_col_keys)
1717 )
1719 else:
1720 self._enable_identity_insert = False
1722 self._select_lastrowid = (
1723 not self.compiled.inline
1724 and insert_has_identity
1725 and not self.compiled.returning
1726 and not self._enable_identity_insert
1727 and not self.executemany
1728 )
1730 if self._enable_identity_insert:
1731 self.root_connection._cursor_execute(
1732 self.cursor,
1733 self._opt_encode(
1734 "SET IDENTITY_INSERT %s ON"
1735 % self.identifier_preparer.format_table(tbl)
1736 ),
1737 (),
1738 self,
1739 )
1741 def post_exec(self):
1742 """Disable IDENTITY_INSERT if enabled."""
1744 conn = self.root_connection
1746 if self.isinsert or self.isupdate or self.isdelete:
1747 self._rowcount = self.cursor.rowcount
1749 if self._select_lastrowid:
1750 if self.dialect.use_scope_identity:
1751 conn._cursor_execute(
1752 self.cursor,
1753 "SELECT scope_identity() AS lastrowid",
1754 (),
1755 self,
1756 )
1757 else:
1758 conn._cursor_execute(
1759 self.cursor, "SELECT @@identity AS lastrowid", (), self
1760 )
1761 # fetchall() ensures the cursor is consumed without closing it
1762 row = self.cursor.fetchall()[0]
1763 self._lastrowid = int(row[0])
1765 elif (
1766 self.isinsert or self.isupdate or self.isdelete
1767 ) and self.compiled.returning:
1768 self.cursor_fetch_strategy = (
1769 _cursor.FullyBufferedCursorFetchStrategy(
1770 self.cursor,
1771 self.cursor.description,
1772 self.cursor.fetchall(),
1773 )
1774 )
1776 if self._enable_identity_insert:
1777 conn._cursor_execute(
1778 self.cursor,
1779 self._opt_encode(
1780 "SET IDENTITY_INSERT %s OFF"
1781 % self.identifier_preparer.format_table(
1782 self.compiled.compile_state.dml_table
1783 )
1784 ),
1785 (),
1786 self,
1787 )
1789 def get_lastrowid(self):
1790 return self._lastrowid
1792 @property
1793 def rowcount(self):
1794 if self._rowcount is not None:
1795 return self._rowcount
1796 else:
1797 return self.cursor.rowcount
1799 def handle_dbapi_exception(self, e):
1800 if self._enable_identity_insert:
1801 try:
1802 self.cursor.execute(
1803 self._opt_encode(
1804 "SET IDENTITY_INSERT %s OFF"
1805 % self.identifier_preparer.format_table(
1806 self.compiled.compile_state.dml_table
1807 )
1808 )
1809 )
1810 except Exception:
1811 pass
1813 def fire_sequence(self, seq, type_):
1814 return self._execute_scalar(
1815 (
1816 "SELECT NEXT VALUE FOR %s"
1817 % self.identifier_preparer.format_sequence(seq)
1818 ),
1819 type_,
1820 )
1822 def get_insert_default(self, column):
1823 if (
1824 isinstance(column, sa_schema.Column)
1825 and column is column.table._autoincrement_column
1826 and isinstance(column.default, sa_schema.Sequence)
1827 and column.default.optional
1828 ):
1829 return None
1830 return super(MSExecutionContext, self).get_insert_default(column)
1833class MSSQLCompiler(compiler.SQLCompiler):
1834 returning_precedes_values = True
1836 extract_map = util.update_copy(
1837 compiler.SQLCompiler.extract_map,
1838 {
1839 "doy": "dayofyear",
1840 "dow": "weekday",
1841 "milliseconds": "millisecond",
1842 "microseconds": "microsecond",
1843 },
1844 )
1846 def __init__(self, *args, **kwargs):
1847 self.tablealiases = {}
1848 super(MSSQLCompiler, self).__init__(*args, **kwargs)
1850 def _with_legacy_schema_aliasing(fn):
1851 def decorate(self, *arg, **kw):
1852 if self.dialect.legacy_schema_aliasing:
1853 return fn(self, *arg, **kw)
1854 else:
1855 super_ = getattr(super(MSSQLCompiler, self), fn.__name__)
1856 return super_(*arg, **kw)
1858 return decorate
1860 def visit_now_func(self, fn, **kw):
1861 return "CURRENT_TIMESTAMP"
1863 def visit_current_date_func(self, fn, **kw):
1864 return "GETDATE()"
1866 def visit_length_func(self, fn, **kw):
1867 return "LEN%s" % self.function_argspec(fn, **kw)
1869 def visit_char_length_func(self, fn, **kw):
1870 return "LEN%s" % self.function_argspec(fn, **kw)
1872 def visit_concat_op_binary(self, binary, operator, **kw):
1873 return "%s + %s" % (
1874 self.process(binary.left, **kw),
1875 self.process(binary.right, **kw),
1876 )
1878 def visit_true(self, expr, **kw):
1879 return "1"
1881 def visit_false(self, expr, **kw):
1882 return "0"
1884 def visit_match_op_binary(self, binary, operator, **kw):
1885 return "CONTAINS (%s, %s)" % (
1886 self.process(binary.left, **kw),
1887 self.process(binary.right, **kw),
1888 )
1890 def get_select_precolumns(self, select, **kw):
1891 """MS-SQL puts TOP, it's version of LIMIT here"""
1893 s = super(MSSQLCompiler, self).get_select_precolumns(select, **kw)
1895 if select._has_row_limiting_clause and self._use_top(select):
1896 # ODBC drivers and possibly others
1897 # don't support bind params in the SELECT clause on SQL Server.
1898 # so have to use literal here.
1899 kw["literal_execute"] = True
1900 s += "TOP %s " % self.process(
1901 self._get_limit_or_fetch(select), **kw
1902 )
1903 if select._fetch_clause is not None:
1904 if select._fetch_clause_options["percent"]:
1905 s += "PERCENT "
1906 if select._fetch_clause_options["with_ties"]:
1907 s += "WITH TIES "
1909 return s
1911 def get_from_hint_text(self, table, text):
1912 return text
1914 def get_crud_hint_text(self, table, text):
1915 return text
1917 def _get_limit_or_fetch(self, select):
1918 if select._fetch_clause is None:
1919 return select._limit_clause
1920 else:
1921 return select._fetch_clause
1923 def _use_top(self, select):
1924 return (select._offset_clause is None) and (
1925 select._simple_int_clause(select._limit_clause)
1926 or (
1927 # limit can use TOP with is by itself. fetch only uses TOP
1928 # when it needs to because of PERCENT and/or WITH TIES
1929 select._simple_int_clause(select._fetch_clause)
1930 and (
1931 select._fetch_clause_options["percent"]
1932 or select._fetch_clause_options["with_ties"]
1933 )
1934 )
1935 )
1937 def fetch_clause(self, cs, **kwargs):
1938 return ""
1940 def limit_clause(self, cs, **kwargs):
1941 return ""
1943 def _check_can_use_fetch_limit(self, select):
1944 # to use ROW_NUMBER(), an ORDER BY is required.
1945 # OFFSET are FETCH are options of the ORDER BY clause
1946 if not select._order_by_clause.clauses:
1947 raise exc.CompileError(
1948 "MSSQL requires an order_by when "
1949 "using an OFFSET or a non-simple "
1950 "LIMIT clause"
1951 )
1953 if select._fetch_clause_options is not None and (
1954 select._fetch_clause_options["percent"]
1955 or select._fetch_clause_options["with_ties"]
1956 ):
1957 raise exc.CompileError(
1958 "MSSQL needs TOP to use PERCENT and/or WITH TIES. "
1959 "Only simple fetch without offset can be used."
1960 )
1962 def _row_limit_clause(self, select, **kw):
1963 """MSSQL 2012 supports OFFSET/FETCH operators
1964 Use it instead subquery with row_number
1966 """
1968 if self.dialect._supports_offset_fetch and not self._use_top(select):
1969 self._check_can_use_fetch_limit(select)
1971 text = ""
1973 if select._offset_clause is not None:
1974 offset_str = self.process(select._offset_clause, **kw)
1975 else:
1976 offset_str = "0"
1977 text += "\n OFFSET %s ROWS" % offset_str
1979 limit = self._get_limit_or_fetch(select)
1981 if limit is not None:
1982 text += "\n FETCH FIRST %s ROWS ONLY" % self.process(
1983 limit, **kw
1984 )
1985 return text
1986 else:
1987 return ""
1989 def visit_try_cast(self, element, **kw):
1990 return "TRY_CAST (%s AS %s)" % (
1991 self.process(element.clause, **kw),
1992 self.process(element.typeclause, **kw),
1993 )
1995 def translate_select_structure(self, select_stmt, **kwargs):
1996 """Look for ``LIMIT`` and OFFSET in a select statement, and if
1997 so tries to wrap it in a subquery with ``row_number()`` criterion.
1998 MSSQL 2012 and above are excluded
2000 """
2001 select = select_stmt
2003 if (
2004 select._has_row_limiting_clause
2005 and not self.dialect._supports_offset_fetch
2006 and not self._use_top(select)
2007 and not getattr(select, "_mssql_visit", None)
2008 ):
2009 self._check_can_use_fetch_limit(select)
2011 _order_by_clauses = [
2012 sql_util.unwrap_label_reference(elem)
2013 for elem in select._order_by_clause.clauses
2014 ]
2016 limit_clause = self._get_limit_or_fetch(select)
2017 offset_clause = select._offset_clause
2019 select = select._generate()
2020 select._mssql_visit = True
2021 select = (
2022 select.add_columns(
2023 sql.func.ROW_NUMBER()
2024 .over(order_by=_order_by_clauses)
2025 .label("mssql_rn")
2026 )
2027 .order_by(None)
2028 .alias()
2029 )
2031 mssql_rn = sql.column("mssql_rn")
2032 limitselect = sql.select(
2033 *[c for c in select.c if c.key != "mssql_rn"]
2034 )
2035 if offset_clause is not None:
2036 limitselect = limitselect.where(mssql_rn > offset_clause)
2037 if limit_clause is not None:
2038 limitselect = limitselect.where(
2039 mssql_rn <= (limit_clause + offset_clause)
2040 )
2041 else:
2042 limitselect = limitselect.where(mssql_rn <= (limit_clause))
2043 return limitselect
2044 else:
2045 return select
2047 @_with_legacy_schema_aliasing
2048 def visit_table(self, table, mssql_aliased=False, iscrud=False, **kwargs):
2049 if mssql_aliased is table or iscrud:
2050 return super(MSSQLCompiler, self).visit_table(table, **kwargs)
2052 # alias schema-qualified tables
2053 alias = self._schema_aliased_table(table)
2054 if alias is not None:
2055 return self.process(alias, mssql_aliased=table, **kwargs)
2056 else:
2057 return super(MSSQLCompiler, self).visit_table(table, **kwargs)
2059 @_with_legacy_schema_aliasing
2060 def visit_alias(self, alias, **kw):
2061 # translate for schema-qualified table aliases
2062 kw["mssql_aliased"] = alias.element
2063 return super(MSSQLCompiler, self).visit_alias(alias, **kw)
2065 @_with_legacy_schema_aliasing
2066 def visit_column(self, column, add_to_result_map=None, **kw):
2067 if (
2068 column.table is not None
2069 and (not self.isupdate and not self.isdelete)
2070 or self.is_subquery()
2071 ):
2072 # translate for schema-qualified table aliases
2073 t = self._schema_aliased_table(column.table)
2074 if t is not None:
2075 converted = elements._corresponding_column_or_error(t, column)
2076 if add_to_result_map is not None:
2077 add_to_result_map(
2078 column.name,
2079 column.name,
2080 (column, column.name, column.key),
2081 column.type,
2082 )
2084 return super(MSSQLCompiler, self).visit_column(converted, **kw)
2086 return super(MSSQLCompiler, self).visit_column(
2087 column, add_to_result_map=add_to_result_map, **kw
2088 )
2090 def _schema_aliased_table(self, table):
2091 if getattr(table, "schema", None) is not None:
2092 if table not in self.tablealiases:
2093 self.tablealiases[table] = table.alias()
2094 return self.tablealiases[table]
2095 else:
2096 return None
2098 def visit_extract(self, extract, **kw):
2099 field = self.extract_map.get(extract.field, extract.field)
2100 return "DATEPART(%s, %s)" % (field, self.process(extract.expr, **kw))
2102 def visit_savepoint(self, savepoint_stmt):
2103 return "SAVE TRANSACTION %s" % self.preparer.format_savepoint(
2104 savepoint_stmt
2105 )
2107 def visit_rollback_to_savepoint(self, savepoint_stmt):
2108 return "ROLLBACK TRANSACTION %s" % self.preparer.format_savepoint(
2109 savepoint_stmt
2110 )
2112 def visit_binary(self, binary, **kwargs):
2113 """Move bind parameters to the right-hand side of an operator, where
2114 possible.
2116 """
2117 if (
2118 isinstance(binary.left, expression.BindParameter)
2119 and binary.operator == operator.eq
2120 and not isinstance(binary.right, expression.BindParameter)
2121 ):
2122 return self.process(
2123 expression.BinaryExpression(
2124 binary.right, binary.left, binary.operator
2125 ),
2126 **kwargs
2127 )
2128 return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
2130 def returning_clause(self, stmt, returning_cols):
2131 # SQL server returning clause requires that the columns refer to
2132 # the virtual table names "inserted" or "deleted". Here, we make
2133 # a simple alias of our table with that name, and then adapt the
2134 # columns we have from the list of RETURNING columns to that new name
2135 # so that they render as "inserted.<colname>" / "deleted.<colname>".
2137 if self.isinsert or self.isupdate:
2138 target = stmt.table.alias("inserted")
2139 else:
2140 target = stmt.table.alias("deleted")
2142 adapter = sql_util.ClauseAdapter(target)
2144 # adapter.traverse() takes a column from our target table and returns
2145 # the one that is linked to the "inserted" / "deleted" tables. So in
2146 # order to retrieve these values back from the result (e.g. like
2147 # row[column]), tell the compiler to also add the original unadapted
2148 # column to the result map. Before #4877, these were (unknowingly)
2149 # falling back using string name matching in the result set which
2150 # necessarily used an expensive KeyError in order to match.
2152 columns = [
2153 self._label_returning_column(
2154 stmt,
2155 adapter.traverse(c),
2156 {"result_map_targets": (c,)},
2157 fallback_label_name=c._non_anon_label,
2158 )
2159 for c in expression._select_iterables(returning_cols)
2160 ]
2162 return "OUTPUT " + ", ".join(columns)
2164 def get_cte_preamble(self, recursive):
2165 # SQL Server finds it too inconvenient to accept
2166 # an entirely optional, SQL standard specified,
2167 # "RECURSIVE" word with their "WITH",
2168 # so here we go
2169 return "WITH"
2171 def label_select_column(self, select, column, asfrom):
2172 if isinstance(column, expression.Function):
2173 return column.label(None)
2174 else:
2175 return super(MSSQLCompiler, self).label_select_column(
2176 select, column, asfrom
2177 )
2179 def for_update_clause(self, select, **kw):
2180 # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which
2181 # SQLAlchemy doesn't use
2182 return ""
2184 def order_by_clause(self, select, **kw):
2185 # MSSQL only allows ORDER BY in subqueries if there is a LIMIT
2186 if (
2187 self.is_subquery()
2188 and not select._limit
2189 and (
2190 select._offset is None
2191 or not self.dialect._supports_offset_fetch
2192 )
2193 ):
2194 # avoid processing the order by clause if we won't end up
2195 # using it, because we don't want all the bind params tacked
2196 # onto the positional list if that is what the dbapi requires
2197 return ""
2199 order_by = self.process(select._order_by_clause, **kw)
2201 if order_by:
2202 return " ORDER BY " + order_by
2203 else:
2204 return ""
2206 def update_from_clause(
2207 self, update_stmt, from_table, extra_froms, from_hints, **kw
2208 ):
2209 """Render the UPDATE..FROM clause specific to MSSQL.
2211 In MSSQL, if the UPDATE statement involves an alias of the table to
2212 be updated, then the table itself must be added to the FROM list as
2213 well. Otherwise, it is optional. Here, we add it regardless.
2215 """
2216 return "FROM " + ", ".join(
2217 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw)
2218 for t in [from_table] + extra_froms
2219 )
2221 def delete_table_clause(self, delete_stmt, from_table, extra_froms):
2222 """If we have extra froms make sure we render any alias as hint."""
2223 ashint = False
2224 if extra_froms:
2225 ashint = True
2226 return from_table._compiler_dispatch(
2227 self, asfrom=True, iscrud=True, ashint=ashint
2228 )
2230 def delete_extra_from_clause(
2231 self, delete_stmt, from_table, extra_froms, from_hints, **kw
2232 ):
2233 """Render the DELETE .. FROM clause specific to MSSQL.
2235 Yes, it has the FROM keyword twice.
2237 """
2238 return "FROM " + ", ".join(
2239 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw)
2240 for t in [from_table] + extra_froms
2241 )
2243 def visit_empty_set_expr(self, type_):
2244 return "SELECT 1 WHERE 1!=1"
2246 def visit_is_distinct_from_binary(self, binary, operator, **kw):
2247 return "NOT EXISTS (SELECT %s INTERSECT SELECT %s)" % (
2248 self.process(binary.left),
2249 self.process(binary.right),
2250 )
2252 def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
2253 return "EXISTS (SELECT %s INTERSECT SELECT %s)" % (
2254 self.process(binary.left),
2255 self.process(binary.right),
2256 )
2258 def _render_json_extract_from_binary(self, binary, operator, **kw):
2259 # note we are intentionally calling upon the process() calls in the
2260 # order in which they appear in the SQL String as this is used
2261 # by positional parameter rendering
2263 if binary.type._type_affinity is sqltypes.JSON:
2264 return "JSON_QUERY(%s, %s)" % (
2265 self.process(binary.left, **kw),
2266 self.process(binary.right, **kw),
2267 )
2269 # as with other dialects, start with an explicit test for NULL
2270 case_expression = "CASE JSON_VALUE(%s, %s) WHEN NULL THEN NULL" % (
2271 self.process(binary.left, **kw),
2272 self.process(binary.right, **kw),
2273 )
2275 if binary.type._type_affinity is sqltypes.Integer:
2276 type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS INTEGER)" % (
2277 self.process(binary.left, **kw),
2278 self.process(binary.right, **kw),
2279 )
2280 elif binary.type._type_affinity is sqltypes.Numeric:
2281 type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS %s)" % (
2282 self.process(binary.left, **kw),
2283 self.process(binary.right, **kw),
2284 "FLOAT"
2285 if isinstance(binary.type, sqltypes.Float)
2286 else "NUMERIC(%s, %s)"
2287 % (binary.type.precision, binary.type.scale),
2288 )
2289 elif binary.type._type_affinity is sqltypes.Boolean:
2290 # the NULL handling is particularly weird with boolean, so
2291 # explicitly return numeric (BIT) constants
2292 type_expression = (
2293 "WHEN 'true' THEN 1 WHEN 'false' THEN 0 ELSE NULL"
2294 )
2295 elif binary.type._type_affinity is sqltypes.String:
2296 # TODO: does this comment (from mysql) apply to here, too?
2297 # this fails with a JSON value that's a four byte unicode
2298 # string. SQLite has the same problem at the moment
2299 type_expression = "ELSE JSON_VALUE(%s, %s)" % (
2300 self.process(binary.left, **kw),
2301 self.process(binary.right, **kw),
2302 )
2303 else:
2304 # other affinity....this is not expected right now
2305 type_expression = "ELSE JSON_QUERY(%s, %s)" % (
2306 self.process(binary.left, **kw),
2307 self.process(binary.right, **kw),
2308 )
2310 return case_expression + " " + type_expression + " END"
2312 def visit_json_getitem_op_binary(self, binary, operator, **kw):
2313 return self._render_json_extract_from_binary(binary, operator, **kw)
2315 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
2316 return self._render_json_extract_from_binary(binary, operator, **kw)
2318 def visit_sequence(self, seq, **kw):
2319 return "NEXT VALUE FOR %s" % self.preparer.format_sequence(seq)
2322class MSSQLStrictCompiler(MSSQLCompiler):
2324 """A subclass of MSSQLCompiler which disables the usage of bind
2325 parameters where not allowed natively by MS-SQL.
2327 A dialect may use this compiler on a platform where native
2328 binds are used.
2330 """
2332 ansi_bind_rules = True
2334 def visit_in_op_binary(self, binary, operator, **kw):
2335 kw["literal_execute"] = True
2336 return "%s IN %s" % (
2337 self.process(binary.left, **kw),
2338 self.process(binary.right, **kw),
2339 )
2341 def visit_not_in_op_binary(self, binary, operator, **kw):
2342 kw["literal_execute"] = True
2343 return "%s NOT IN %s" % (
2344 self.process(binary.left, **kw),
2345 self.process(binary.right, **kw),
2346 )
2348 def render_literal_value(self, value, type_):
2349 """
2350 For date and datetime values, convert to a string
2351 format acceptable to MSSQL. That seems to be the
2352 so-called ODBC canonical date format which looks
2353 like this:
2355 yyyy-mm-dd hh:mi:ss.mmm(24h)
2357 For other data types, call the base class implementation.
2358 """
2359 # datetime and date are both subclasses of datetime.date
2360 if issubclass(type(value), datetime.date):
2361 # SQL Server wants single quotes around the date string.
2362 return "'" + str(value) + "'"
2363 else:
2364 return super(MSSQLStrictCompiler, self).render_literal_value(
2365 value, type_
2366 )
2369class MSDDLCompiler(compiler.DDLCompiler):
2370 def get_column_specification(self, column, **kwargs):
2371 colspec = self.preparer.format_column(column)
2373 # type is not accepted in a computed column
2374 if column.computed is not None:
2375 colspec += " " + self.process(column.computed)
2376 else:
2377 colspec += " " + self.dialect.type_compiler.process(
2378 column.type, type_expression=column
2379 )
2381 if column.nullable is not None:
2382 if (
2383 not column.nullable
2384 or column.primary_key
2385 or isinstance(column.default, sa_schema.Sequence)
2386 or column.autoincrement is True
2387 or column.identity
2388 ):
2389 colspec += " NOT NULL"
2390 elif column.computed is None:
2391 # don't specify "NULL" for computed columns
2392 colspec += " NULL"
2394 if column.table is None:
2395 raise exc.CompileError(
2396 "mssql requires Table-bound columns "
2397 "in order to generate DDL"
2398 )
2400 d_opt = column.dialect_options["mssql"]
2401 start = d_opt["identity_start"]
2402 increment = d_opt["identity_increment"]
2403 if start is not None or increment is not None:
2404 if column.identity:
2405 raise exc.CompileError(
2406 "Cannot specify options 'mssql_identity_start' and/or "
2407 "'mssql_identity_increment' while also using the "
2408 "'Identity' construct."
2409 )
2410 util.warn_deprecated(
2411 "The dialect options 'mssql_identity_start' and "
2412 "'mssql_identity_increment' are deprecated. "
2413 "Use the 'Identity' object instead.",
2414 "1.4",
2415 )
2417 if column.identity:
2418 colspec += self.process(column.identity, **kwargs)
2419 elif (
2420 column is column.table._autoincrement_column
2421 or column.autoincrement is True
2422 ) and (
2423 not isinstance(column.default, Sequence) or column.default.optional
2424 ):
2425 colspec += self.process(Identity(start=start, increment=increment))
2426 else:
2427 default = self.get_column_default_string(column)
2428 if default is not None:
2429 colspec += " DEFAULT " + default
2431 return colspec
2433 def visit_create_index(self, create, include_schema=False):
2434 index = create.element
2435 self._verify_index_table(index)
2436 preparer = self.preparer
2437 text = "CREATE "
2438 if index.unique:
2439 text += "UNIQUE "
2441 # handle clustering option
2442 clustered = index.dialect_options["mssql"]["clustered"]
2443 if clustered is not None:
2444 if clustered:
2445 text += "CLUSTERED "
2446 else:
2447 text += "NONCLUSTERED "
2449 text += "INDEX %s ON %s (%s)" % (
2450 self._prepared_index_name(index, include_schema=include_schema),
2451 preparer.format_table(index.table),
2452 ", ".join(
2453 self.sql_compiler.process(
2454 expr, include_table=False, literal_binds=True
2455 )
2456 for expr in index.expressions
2457 ),
2458 )
2460 # handle other included columns
2461 if index.dialect_options["mssql"]["include"]:
2462 inclusions = [
2463 index.table.c[col]
2464 if isinstance(col, util.string_types)
2465 else col
2466 for col in index.dialect_options["mssql"]["include"]
2467 ]
2469 text += " INCLUDE (%s)" % ", ".join(
2470 [preparer.quote(c.name) for c in inclusions]
2471 )
2473 whereclause = index.dialect_options["mssql"]["where"]
2475 if whereclause is not None:
2476 whereclause = coercions.expect(
2477 roles.DDLExpressionRole, whereclause
2478 )
2480 where_compiled = self.sql_compiler.process(
2481 whereclause, include_table=False, literal_binds=True
2482 )
2483 text += " WHERE " + where_compiled
2485 return text
2487 def visit_drop_index(self, drop):
2488 return "\nDROP INDEX %s ON %s" % (
2489 self._prepared_index_name(drop.element, include_schema=False),
2490 self.preparer.format_table(drop.element.table),
2491 )
2493 def visit_primary_key_constraint(self, constraint):
2494 if len(constraint) == 0:
2495 return ""
2496 text = ""
2497 if constraint.name is not None:
2498 text += "CONSTRAINT %s " % self.preparer.format_constraint(
2499 constraint
2500 )
2501 text += "PRIMARY KEY "
2503 clustered = constraint.dialect_options["mssql"]["clustered"]
2504 if clustered is not None:
2505 if clustered:
2506 text += "CLUSTERED "
2507 else:
2508 text += "NONCLUSTERED "
2510 text += "(%s)" % ", ".join(
2511 self.preparer.quote(c.name) for c in constraint
2512 )
2513 text += self.define_constraint_deferrability(constraint)
2514 return text
2516 def visit_unique_constraint(self, constraint):
2517 if len(constraint) == 0:
2518 return ""
2519 text = ""
2520 if constraint.name is not None:
2521 formatted_name = self.preparer.format_constraint(constraint)
2522 if formatted_name is not None:
2523 text += "CONSTRAINT %s " % formatted_name
2524 text += "UNIQUE "
2526 clustered = constraint.dialect_options["mssql"]["clustered"]
2527 if clustered is not None:
2528 if clustered:
2529 text += "CLUSTERED "
2530 else:
2531 text += "NONCLUSTERED "
2533 text += "(%s)" % ", ".join(
2534 self.preparer.quote(c.name) for c in constraint
2535 )
2536 text += self.define_constraint_deferrability(constraint)
2537 return text
2539 def visit_computed_column(self, generated):
2540 text = "AS (%s)" % self.sql_compiler.process(
2541 generated.sqltext, include_table=False, literal_binds=True
2542 )
2543 # explicitly check for True|False since None means server default
2544 if generated.persisted is True:
2545 text += " PERSISTED"
2546 return text
2548 def visit_create_sequence(self, create, **kw):
2549 prefix = None
2550 if create.element.data_type is not None:
2551 data_type = create.element.data_type
2552 prefix = " AS %s" % self.type_compiler.process(data_type)
2553 return super(MSDDLCompiler, self).visit_create_sequence(
2554 create, prefix=prefix, **kw
2555 )
2557 def visit_identity_column(self, identity, **kw):
2558 text = " IDENTITY"
2559 if identity.start is not None or identity.increment is not None:
2560 start = 1 if identity.start is None else identity.start
2561 increment = 1 if identity.increment is None else identity.increment
2562 text += "(%s,%s)" % (start, increment)
2563 return text
2566class MSIdentifierPreparer(compiler.IdentifierPreparer):
2567 reserved_words = RESERVED_WORDS
2569 def __init__(self, dialect):
2570 super(MSIdentifierPreparer, self).__init__(
2571 dialect,
2572 initial_quote="[",
2573 final_quote="]",
2574 quote_case_sensitive_collations=False,
2575 )
2577 def _escape_identifier(self, value):
2578 return value.replace("]", "]]")
2580 def _unescape_identifier(self, value):
2581 return value.replace("]]", "]")
2583 def quote_schema(self, schema, force=None):
2584 """Prepare a quoted table and schema name."""
2586 # need to re-implement the deprecation warning entirely
2587 if force is not None:
2588 # not using the util.deprecated_params() decorator in this
2589 # case because of the additional function call overhead on this
2590 # very performance-critical spot.
2591 util.warn_deprecated(
2592 "The IdentifierPreparer.quote_schema.force parameter is "
2593 "deprecated and will be removed in a future release. This "
2594 "flag has no effect on the behavior of the "
2595 "IdentifierPreparer.quote method; please refer to "
2596 "quoted_name().",
2597 version="1.3",
2598 )
2600 dbname, owner = _schema_elements(schema)
2601 if dbname:
2602 result = "%s.%s" % (self.quote(dbname), self.quote(owner))
2603 elif owner:
2604 result = self.quote(owner)
2605 else:
2606 result = ""
2607 return result
2610def _db_plus_owner_listing(fn):
2611 def wrap(dialect, connection, schema=None, **kw):
2612 dbname, owner = _owner_plus_db(dialect, schema)
2613 return _switch_db(
2614 dbname,
2615 connection,
2616 fn,
2617 dialect,
2618 connection,
2619 dbname,
2620 owner,
2621 schema,
2622 **kw
2623 )
2625 return update_wrapper(wrap, fn)
2628def _db_plus_owner(fn):
2629 def wrap(dialect, connection, tablename, schema=None, **kw):
2630 dbname, owner = _owner_plus_db(dialect, schema)
2631 return _switch_db(
2632 dbname,
2633 connection,
2634 fn,
2635 dialect,
2636 connection,
2637 tablename,
2638 dbname,
2639 owner,
2640 schema,
2641 **kw
2642 )
2644 return update_wrapper(wrap, fn)
2647def _switch_db(dbname, connection, fn, *arg, **kw):
2648 if dbname:
2649 current_db = connection.exec_driver_sql("select db_name()").scalar()
2650 if current_db != dbname:
2651 connection.exec_driver_sql(
2652 "use %s" % connection.dialect.identifier_preparer.quote(dbname)
2653 )
2654 try:
2655 return fn(*arg, **kw)
2656 finally:
2657 if dbname and current_db != dbname:
2658 connection.exec_driver_sql(
2659 "use %s"
2660 % connection.dialect.identifier_preparer.quote(current_db)
2661 )
2664def _owner_plus_db(dialect, schema):
2665 if not schema:
2666 return None, dialect.default_schema_name
2667 elif "." in schema:
2668 return _schema_elements(schema)
2669 else:
2670 return None, schema
2673_memoized_schema = util.LRUCache()
2676def _schema_elements(schema):
2677 if isinstance(schema, quoted_name) and schema.quote:
2678 return None, schema
2680 if schema in _memoized_schema:
2681 return _memoized_schema[schema]
2683 # tests for this function are in:
2684 # test/dialect/mssql/test_reflection.py ->
2685 # OwnerPlusDBTest.test_owner_database_pairs
2686 # test/dialect/mssql/test_compiler.py -> test_force_schema_*
2687 # test/dialect/mssql/test_compiler.py -> test_schema_many_tokens_*
2688 #
2690 if schema.startswith("__[SCHEMA_"):
2691 return None, schema
2693 push = []
2694 symbol = ""
2695 bracket = False
2696 has_brackets = False
2697 for token in re.split(r"(\[|\]|\.)", schema):
2698 if not token:
2699 continue
2700 if token == "[":
2701 bracket = True
2702 has_brackets = True
2703 elif token == "]":
2704 bracket = False
2705 elif not bracket and token == ".":
2706 if has_brackets:
2707 push.append("[%s]" % symbol)
2708 else:
2709 push.append(symbol)
2710 symbol = ""
2711 has_brackets = False
2712 else:
2713 symbol += token
2714 if symbol:
2715 push.append(symbol)
2716 if len(push) > 1:
2717 dbname, owner = ".".join(push[0:-1]), push[-1]
2719 # test for internal brackets
2720 if re.match(r".*\].*\[.*", dbname[1:-1]):
2721 dbname = quoted_name(dbname, quote=False)
2722 else:
2723 dbname = dbname.lstrip("[").rstrip("]")
2725 elif len(push):
2726 dbname, owner = None, push[0]
2727 else:
2728 dbname, owner = None, None
2730 _memoized_schema[schema] = dbname, owner
2731 return dbname, owner
2734class MSDialect(default.DefaultDialect):
2735 # will assume it's at least mssql2005
2736 name = "mssql"
2737 supports_statement_cache = True
2738 supports_default_values = True
2739 supports_empty_insert = False
2740 execution_ctx_cls = MSExecutionContext
2741 use_scope_identity = True
2742 max_identifier_length = 128
2743 schema_name = "dbo"
2745 implicit_returning = True
2746 full_returning = True
2748 colspecs = {
2749 sqltypes.DateTime: _MSDateTime,
2750 sqltypes.Date: _MSDate,
2751 sqltypes.JSON: JSON,
2752 sqltypes.JSON.JSONIndexType: JSONIndexType,
2753 sqltypes.JSON.JSONPathType: JSONPathType,
2754 sqltypes.Time: _BASETIMEIMPL,
2755 sqltypes.Unicode: _MSUnicode,
2756 sqltypes.UnicodeText: _MSUnicodeText,
2757 DATETIMEOFFSET: DATETIMEOFFSET,
2758 DATETIME2: DATETIME2,
2759 SMALLDATETIME: SMALLDATETIME,
2760 DATETIME: DATETIME,
2761 }
2763 engine_config_types = default.DefaultDialect.engine_config_types.union(
2764 {"legacy_schema_aliasing": util.asbool}
2765 )
2767 ischema_names = ischema_names
2769 supports_sequences = True
2770 sequences_optional = True
2771 # T-SQL's actual default is -9223372036854775808
2772 default_sequence_base = 1
2774 supports_native_boolean = False
2775 non_native_boolean_check_constraint = False
2776 supports_unicode_binds = True
2777 postfetch_lastrowid = True
2778 _supports_offset_fetch = False
2779 _supports_nvarchar_max = False
2781 legacy_schema_aliasing = False
2783 server_version_info = ()
2785 statement_compiler = MSSQLCompiler
2786 ddl_compiler = MSDDLCompiler
2787 type_compiler = MSTypeCompiler
2788 preparer = MSIdentifierPreparer
2790 construct_arguments = [
2791 (sa_schema.PrimaryKeyConstraint, {"clustered": None}),
2792 (sa_schema.UniqueConstraint, {"clustered": None}),
2793 (sa_schema.Index, {"clustered": None, "include": None, "where": None}),
2794 (
2795 sa_schema.Column,
2796 {"identity_start": None, "identity_increment": None},
2797 ),
2798 ]
2800 def __init__(
2801 self,
2802 query_timeout=None,
2803 use_scope_identity=True,
2804 schema_name="dbo",
2805 isolation_level=None,
2806 deprecate_large_types=None,
2807 json_serializer=None,
2808 json_deserializer=None,
2809 legacy_schema_aliasing=None,
2810 ignore_no_transaction_on_rollback=False,
2811 **opts
2812 ):
2813 self.query_timeout = int(query_timeout or 0)
2814 self.schema_name = schema_name
2816 self.use_scope_identity = use_scope_identity
2817 self.deprecate_large_types = deprecate_large_types
2818 self.ignore_no_transaction_on_rollback = (
2819 ignore_no_transaction_on_rollback
2820 )
2822 if legacy_schema_aliasing is not None:
2823 util.warn_deprecated(
2824 "The legacy_schema_aliasing parameter is "
2825 "deprecated and will be removed in a future release.",
2826 "1.4",
2827 )
2828 self.legacy_schema_aliasing = legacy_schema_aliasing
2830 super(MSDialect, self).__init__(**opts)
2832 self.isolation_level = isolation_level
2833 self._json_serializer = json_serializer
2834 self._json_deserializer = json_deserializer
2836 def do_savepoint(self, connection, name):
2837 # give the DBAPI a push
2838 connection.exec_driver_sql("IF @@TRANCOUNT = 0 BEGIN TRANSACTION")
2839 super(MSDialect, self).do_savepoint(connection, name)
2841 def do_release_savepoint(self, connection, name):
2842 # SQL Server does not support RELEASE SAVEPOINT
2843 pass
2845 def do_rollback(self, dbapi_connection):
2846 try:
2847 super(MSDialect, self).do_rollback(dbapi_connection)
2848 except self.dbapi.ProgrammingError as e:
2849 if self.ignore_no_transaction_on_rollback and re.match(
2850 r".*\b111214\b", str(e)
2851 ):
2852 util.warn(
2853 "ProgrammingError 111214 "
2854 "'No corresponding transaction found.' "
2855 "has been suppressed via "
2856 "ignore_no_transaction_on_rollback=True"
2857 )
2858 else:
2859 raise
2861 _isolation_lookup = set(
2862 [
2863 "SERIALIZABLE",
2864 "READ UNCOMMITTED",
2865 "READ COMMITTED",
2866 "REPEATABLE READ",
2867 "SNAPSHOT",
2868 ]
2869 )
2871 def set_isolation_level(self, connection, level):
2872 level = level.replace("_", " ")
2873 if level not in self._isolation_lookup:
2874 raise exc.ArgumentError(
2875 "Invalid value '%s' for isolation_level. "
2876 "Valid isolation levels for %s are %s"
2877 % (level, self.name, ", ".join(self._isolation_lookup))
2878 )
2879 cursor = connection.cursor()
2880 cursor.execute("SET TRANSACTION ISOLATION LEVEL %s" % level)
2881 cursor.close()
2882 if level == "SNAPSHOT":
2883 connection.commit()
2885 def get_isolation_level(self, dbapi_connection):
2886 cursor = dbapi_connection.cursor()
2887 view_name = "sys.system_views"
2888 try:
2889 cursor.execute(
2890 (
2891 "SELECT name FROM {} WHERE name IN "
2892 "('dm_exec_sessions', 'dm_pdw_nodes_exec_sessions')"
2893 ).format(view_name)
2894 )
2895 row = cursor.fetchone()
2896 if not row:
2897 raise NotImplementedError(
2898 "Can't fetch isolation level on this particular "
2899 "SQL Server version."
2900 )
2902 view_name = "sys.{}".format(row[0])
2904 cursor.execute(
2905 """
2906 SELECT CASE transaction_isolation_level
2907 WHEN 0 THEN NULL
2908 WHEN 1 THEN 'READ UNCOMMITTED'
2909 WHEN 2 THEN 'READ COMMITTED'
2910 WHEN 3 THEN 'REPEATABLE READ'
2911 WHEN 4 THEN 'SERIALIZABLE'
2912 WHEN 5 THEN 'SNAPSHOT' END
2913 AS TRANSACTION_ISOLATION_LEVEL
2914 FROM {}
2915 where session_id = @@SPID
2916 """.format(
2917 view_name
2918 )
2919 )
2920 except self.dbapi.Error as err:
2921 util.raise_(
2922 NotImplementedError(
2923 "Can't fetch isolation level; encountered error {} when "
2924 'attempting to query the "{}" view.'.format(err, view_name)
2925 ),
2926 from_=err,
2927 )
2928 else:
2929 row = cursor.fetchone()
2930 return row[0].upper()
2931 finally:
2932 cursor.close()
2934 def initialize(self, connection):
2935 super(MSDialect, self).initialize(connection)
2936 self._setup_version_attributes()
2937 self._setup_supports_nvarchar_max(connection)
2939 def on_connect(self):
2940 if self.isolation_level is not None:
2942 def connect(conn):
2943 self.set_isolation_level(conn, self.isolation_level)
2945 return connect
2946 else:
2947 return None
2949 def _setup_version_attributes(self):
2950 if self.server_version_info[0] not in list(range(8, 17)):
2951 util.warn(
2952 "Unrecognized server version info '%s'. Some SQL Server "
2953 "features may not function properly."
2954 % ".".join(str(x) for x in self.server_version_info)
2955 )
2957 if self.server_version_info >= MS_2008_VERSION:
2958 self.supports_multivalues_insert = True
2959 if self.deprecate_large_types is None:
2960 self.deprecate_large_types = (
2961 self.server_version_info >= MS_2012_VERSION
2962 )
2964 self._supports_offset_fetch = (
2965 self.server_version_info and self.server_version_info[0] >= 11
2966 )
2968 def _setup_supports_nvarchar_max(self, connection):
2969 try:
2970 connection.scalar(
2971 sql.text("SELECT CAST('test max support' AS NVARCHAR(max))")
2972 )
2973 except exc.DBAPIError:
2974 self._supports_nvarchar_max = False
2975 else:
2976 self._supports_nvarchar_max = True
2978 def _get_default_schema_name(self, connection):
2979 query = sql.text("SELECT schema_name()")
2980 default_schema_name = connection.scalar(query)
2981 if default_schema_name is not None:
2982 # guard against the case where the default_schema_name is being
2983 # fed back into a table reflection function.
2984 return quoted_name(default_schema_name, quote=True)
2985 else:
2986 return self.schema_name
2988 @_db_plus_owner
2989 def has_table(self, connection, tablename, dbname, owner, schema):
2990 self._ensure_has_table_connection(connection)
2992 if tablename.startswith("#"): # temporary table
2993 # mssql does not support temporary views
2994 # SQL Error [4103] [S0001]: "#v": Temporary views are not allowed
2995 return bool(
2996 connection.scalar(
2997 # U filters on user tables only.
2998 text("SELECT object_id(:table_name, 'U')"),
2999 {"table_name": "tempdb.dbo.[{}]".format(tablename)},
3000 )
3001 )
3003 else:
3004 tables = ischema.tables
3006 s = sql.select(tables.c.table_name, tables.c.table_type).where(
3007 tables.c.table_name == tablename,
3008 )
3010 if owner:
3011 s = s.where(tables.c.table_schema == owner)
3013 c = connection.execute(s)
3015 return c.first() is not None
3017 @_db_plus_owner
3018 def has_sequence(self, connection, sequencename, dbname, owner, schema):
3019 sequences = ischema.sequences
3021 s = sql.select(sequences.c.sequence_name).where(
3022 sequences.c.sequence_name == sequencename
3023 )
3025 if owner:
3026 s = s.where(sequences.c.sequence_schema == owner)
3028 c = connection.execute(s)
3030 return c.first() is not None
3032 @reflection.cache
3033 @_db_plus_owner_listing
3034 def get_sequence_names(self, connection, dbname, owner, schema, **kw):
3035 sequences = ischema.sequences
3037 s = sql.select(sequences.c.sequence_name)
3038 if owner:
3039 s = s.where(sequences.c.sequence_schema == owner)
3041 c = connection.execute(s)
3043 return [row[0] for row in c]
3045 @reflection.cache
3046 def get_schema_names(self, connection, **kw):
3047 s = sql.select(ischema.schemata.c.schema_name).order_by(
3048 ischema.schemata.c.schema_name
3049 )
3050 schema_names = [r[0] for r in connection.execute(s)]
3051 return schema_names
3053 @reflection.cache
3054 @_db_plus_owner_listing
3055 def get_table_names(self, connection, dbname, owner, schema, **kw):
3056 tables = ischema.tables
3057 s = (
3058 sql.select(tables.c.table_name)
3059 .where(
3060 sql.and_(
3061 tables.c.table_schema == owner,
3062 tables.c.table_type == "BASE TABLE",
3063 )
3064 )
3065 .order_by(tables.c.table_name)
3066 )
3067 table_names = [r[0] for r in connection.execute(s)]
3068 return table_names
3070 @reflection.cache
3071 @_db_plus_owner_listing
3072 def get_view_names(self, connection, dbname, owner, schema, **kw):
3073 tables = ischema.tables
3074 s = (
3075 sql.select(tables.c.table_name)
3076 .where(
3077 sql.and_(
3078 tables.c.table_schema == owner,
3079 tables.c.table_type == "VIEW",
3080 )
3081 )
3082 .order_by(tables.c.table_name)
3083 )
3084 view_names = [r[0] for r in connection.execute(s)]
3085 return view_names
3087 @reflection.cache
3088 @_db_plus_owner
3089 def get_indexes(self, connection, tablename, dbname, owner, schema, **kw):
3090 filter_definition = (
3091 "ind.filter_definition"
3092 if self.server_version_info >= MS_2008_VERSION
3093 else "NULL as filter_definition"
3094 )
3095 rp = connection.execution_options(future_result=True).execute(
3096 sql.text(
3097 "select ind.index_id, ind.is_unique, ind.name, "
3098 "%s "
3099 "from sys.indexes as ind join sys.tables as tab on "
3100 "ind.object_id=tab.object_id "
3101 "join sys.schemas as sch on sch.schema_id=tab.schema_id "
3102 "where tab.name = :tabname "
3103 "and sch.name=:schname "
3104 "and ind.is_primary_key=0 and ind.type != 0"
3105 % filter_definition
3106 )
3107 .bindparams(
3108 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
3109 sql.bindparam("schname", owner, ischema.CoerceUnicode()),
3110 )
3111 .columns(name=sqltypes.Unicode())
3112 )
3113 indexes = {}
3114 for row in rp.mappings():
3115 indexes[row["index_id"]] = {
3116 "name": row["name"],
3117 "unique": row["is_unique"] == 1,
3118 "column_names": [],
3119 "include_columns": [],
3120 }
3122 if row["filter_definition"] is not None:
3123 indexes[row["index_id"]].setdefault("dialect_options", {})[
3124 "mssql_where"
3125 ] = row["filter_definition"]
3127 rp = connection.execution_options(future_result=True).execute(
3128 sql.text(
3129 "select ind_col.index_id, ind_col.object_id, col.name, "
3130 "ind_col.is_included_column "
3131 "from sys.columns as col "
3132 "join sys.tables as tab on tab.object_id=col.object_id "
3133 "join sys.index_columns as ind_col on "
3134 "(ind_col.column_id=col.column_id and "
3135 "ind_col.object_id=tab.object_id) "
3136 "join sys.schemas as sch on sch.schema_id=tab.schema_id "
3137 "where tab.name=:tabname "
3138 "and sch.name=:schname"
3139 )
3140 .bindparams(
3141 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
3142 sql.bindparam("schname", owner, ischema.CoerceUnicode()),
3143 )
3144 .columns(name=sqltypes.Unicode())
3145 )
3146 for row in rp.mappings():
3147 if row["index_id"] in indexes:
3148 if row["is_included_column"]:
3149 indexes[row["index_id"]]["include_columns"].append(
3150 row["name"]
3151 )
3152 else:
3153 indexes[row["index_id"]]["column_names"].append(
3154 row["name"]
3155 )
3156 for index_info in indexes.values():
3157 # NOTE: "root level" include_columns is legacy, now part of
3158 # dialect_options (issue #7382)
3159 index_info.setdefault("dialect_options", {})[
3160 "mssql_include"
3161 ] = index_info["include_columns"]
3163 return list(indexes.values())
3165 @reflection.cache
3166 @_db_plus_owner
3167 def get_view_definition(
3168 self, connection, viewname, dbname, owner, schema, **kw
3169 ):
3170 rp = connection.execute(
3171 sql.text(
3172 "select definition from sys.sql_modules as mod, "
3173 "sys.views as views, "
3174 "sys.schemas as sch"
3175 " where "
3176 "mod.object_id=views.object_id and "
3177 "views.schema_id=sch.schema_id and "
3178 "views.name=:viewname and sch.name=:schname"
3179 ).bindparams(
3180 sql.bindparam("viewname", viewname, ischema.CoerceUnicode()),
3181 sql.bindparam("schname", owner, ischema.CoerceUnicode()),
3182 )
3183 )
3185 if rp:
3186 view_def = rp.scalar()
3187 return view_def
3189 def _temp_table_name_like_pattern(self, tablename):
3190 # LIKE uses '%' to match zero or more characters and '_' to match any
3191 # single character. We want to match literal underscores, so T-SQL
3192 # requires that we enclose them in square brackets.
3193 return tablename + (
3194 ("[_][_][_]%") if not tablename.startswith("##") else ""
3195 )
3197 def _get_internal_temp_table_name(self, connection, tablename):
3198 # it's likely that schema is always "dbo", but since we can
3199 # get it here, let's get it.
3200 # see https://stackoverflow.com/questions/8311959/
3201 # specifying-schema-for-temporary-tables
3203 try:
3204 return connection.execute(
3205 sql.text(
3206 "select table_schema, table_name "
3207 "from tempdb.information_schema.tables "
3208 "where table_name like :p1"
3209 ),
3210 {"p1": self._temp_table_name_like_pattern(tablename)},
3211 ).one()
3212 except exc.MultipleResultsFound as me:
3213 util.raise_(
3214 exc.UnreflectableTableError(
3215 "Found more than one temporary table named '%s' in tempdb "
3216 "at this time. Cannot reliably resolve that name to its "
3217 "internal table name." % tablename
3218 ),
3219 replace_context=me,
3220 )
3221 except exc.NoResultFound as ne:
3222 util.raise_(
3223 exc.NoSuchTableError(
3224 "Unable to find a temporary table named '%s' in tempdb."
3225 % tablename
3226 ),
3227 replace_context=ne,
3228 )
3230 @reflection.cache
3231 @_db_plus_owner
3232 def get_columns(self, connection, tablename, dbname, owner, schema, **kw):
3233 is_temp_table = tablename.startswith("#")
3234 if is_temp_table:
3235 owner, tablename = self._get_internal_temp_table_name(
3236 connection, tablename
3237 )
3239 columns = ischema.mssql_temp_table_columns
3240 else:
3241 columns = ischema.columns
3243 computed_cols = ischema.computed_columns
3244 identity_cols = ischema.identity_columns
3245 if owner:
3246 whereclause = sql.and_(
3247 columns.c.table_name == tablename,
3248 columns.c.table_schema == owner,
3249 )
3250 full_name = columns.c.table_schema + "." + columns.c.table_name
3251 else:
3252 whereclause = columns.c.table_name == tablename
3253 full_name = columns.c.table_name
3255 join = columns.join(
3256 computed_cols,
3257 onclause=sql.and_(
3258 computed_cols.c.object_id == func.object_id(full_name),
3259 computed_cols.c.name
3260 == columns.c.column_name.collate("DATABASE_DEFAULT"),
3261 ),
3262 isouter=True,
3263 ).join(
3264 identity_cols,
3265 onclause=sql.and_(
3266 identity_cols.c.object_id == func.object_id(full_name),
3267 identity_cols.c.name
3268 == columns.c.column_name.collate("DATABASE_DEFAULT"),
3269 ),
3270 isouter=True,
3271 )
3273 if self._supports_nvarchar_max:
3274 computed_definition = computed_cols.c.definition
3275 else:
3276 # tds_version 4.2 does not support NVARCHAR(MAX)
3277 computed_definition = sql.cast(
3278 computed_cols.c.definition, NVARCHAR(4000)
3279 )
3281 s = (
3282 sql.select(
3283 columns,
3284 computed_definition,
3285 computed_cols.c.is_persisted,
3286 identity_cols.c.is_identity,
3287 identity_cols.c.seed_value,
3288 identity_cols.c.increment_value,
3289 )
3290 .where(whereclause)
3291 .select_from(join)
3292 .order_by(columns.c.ordinal_position)
3293 )
3295 c = connection.execution_options(future_result=True).execute(s)
3297 cols = []
3298 for row in c.mappings():
3299 name = row[columns.c.column_name]
3300 type_ = row[columns.c.data_type]
3301 nullable = row[columns.c.is_nullable] == "YES"
3302 charlen = row[columns.c.character_maximum_length]
3303 numericprec = row[columns.c.numeric_precision]
3304 numericscale = row[columns.c.numeric_scale]
3305 default = row[columns.c.column_default]
3306 collation = row[columns.c.collation_name]
3307 definition = row[computed_definition]
3308 is_persisted = row[computed_cols.c.is_persisted]
3309 is_identity = row[identity_cols.c.is_identity]
3310 identity_start = row[identity_cols.c.seed_value]
3311 identity_increment = row[identity_cols.c.increment_value]
3313 coltype = self.ischema_names.get(type_, None)
3315 kwargs = {}
3316 if coltype in (
3317 MSString,
3318 MSChar,
3319 MSNVarchar,
3320 MSNChar,
3321 MSText,
3322 MSNText,
3323 MSBinary,
3324 MSVarBinary,
3325 sqltypes.LargeBinary,
3326 ):
3327 if charlen == -1:
3328 charlen = None
3329 kwargs["length"] = charlen
3330 if collation:
3331 kwargs["collation"] = collation
3333 if coltype is None:
3334 util.warn(
3335 "Did not recognize type '%s' of column '%s'"
3336 % (type_, name)
3337 )
3338 coltype = sqltypes.NULLTYPE
3339 else:
3340 if issubclass(coltype, sqltypes.Numeric):
3341 kwargs["precision"] = numericprec
3343 if not issubclass(coltype, sqltypes.Float):
3344 kwargs["scale"] = numericscale
3346 coltype = coltype(**kwargs)
3347 cdict = {
3348 "name": name,
3349 "type": coltype,
3350 "nullable": nullable,
3351 "default": default,
3352 "autoincrement": is_identity is not None,
3353 }
3355 if definition is not None and is_persisted is not None:
3356 cdict["computed"] = {
3357 "sqltext": definition,
3358 "persisted": is_persisted,
3359 }
3361 if is_identity is not None:
3362 # identity_start and identity_increment are Decimal or None
3363 if identity_start is None or identity_increment is None:
3364 cdict["identity"] = {}
3365 else:
3366 if isinstance(coltype, sqltypes.BigInteger):
3367 start = compat.long_type(identity_start)
3368 increment = compat.long_type(identity_increment)
3369 elif isinstance(coltype, sqltypes.Integer):
3370 start = int(identity_start)
3371 increment = int(identity_increment)
3372 else:
3373 start = identity_start
3374 increment = identity_increment
3376 cdict["identity"] = {
3377 "start": start,
3378 "increment": increment,
3379 }
3381 cols.append(cdict)
3383 return cols
3385 @reflection.cache
3386 @_db_plus_owner
3387 def get_pk_constraint(
3388 self, connection, tablename, dbname, owner, schema, **kw
3389 ):
3390 pkeys = []
3391 TC = ischema.constraints
3392 C = ischema.key_constraints.alias("C")
3394 # Primary key constraints
3395 s = (
3396 sql.select(
3397 C.c.column_name, TC.c.constraint_type, C.c.constraint_name
3398 )
3399 .where(
3400 sql.and_(
3401 TC.c.constraint_name == C.c.constraint_name,
3402 TC.c.table_schema == C.c.table_schema,
3403 C.c.table_name == tablename,
3404 C.c.table_schema == owner,
3405 ),
3406 )
3407 .order_by(TC.c.constraint_name, C.c.ordinal_position)
3408 )
3409 c = connection.execution_options(future_result=True).execute(s)
3410 constraint_name = None
3411 for row in c.mappings():
3412 if "PRIMARY" in row[TC.c.constraint_type.name]:
3413 pkeys.append(row["COLUMN_NAME"])
3414 if constraint_name is None:
3415 constraint_name = row[C.c.constraint_name.name]
3416 return {"constrained_columns": pkeys, "name": constraint_name}
3418 @reflection.cache
3419 @_db_plus_owner
3420 def get_foreign_keys(
3421 self, connection, tablename, dbname, owner, schema, **kw
3422 ):
3423 # Foreign key constraints
3424 s = (
3425 text(
3426 """\
3427WITH fk_info AS (
3428 SELECT
3429 ischema_ref_con.constraint_schema,
3430 ischema_ref_con.constraint_name,
3431 ischema_key_col.ordinal_position,
3432 ischema_key_col.table_schema,
3433 ischema_key_col.table_name,
3434 ischema_ref_con.unique_constraint_schema,
3435 ischema_ref_con.unique_constraint_name,
3436 ischema_ref_con.match_option,
3437 ischema_ref_con.update_rule,
3438 ischema_ref_con.delete_rule,
3439 ischema_key_col.column_name AS constrained_column
3440 FROM
3441 INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS ischema_ref_con
3442 INNER JOIN
3443 INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col ON
3444 ischema_key_col.table_schema = ischema_ref_con.constraint_schema
3445 AND ischema_key_col.constraint_name =
3446 ischema_ref_con.constraint_name
3447 WHERE ischema_key_col.table_name = :tablename
3448 AND ischema_key_col.table_schema = :owner
3449),
3450constraint_info AS (
3451 SELECT
3452 ischema_key_col.constraint_schema,
3453 ischema_key_col.constraint_name,
3454 ischema_key_col.ordinal_position,
3455 ischema_key_col.table_schema,
3456 ischema_key_col.table_name,
3457 ischema_key_col.column_name
3458 FROM
3459 INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col
3460),
3461index_info AS (
3462 SELECT
3463 sys.schemas.name AS index_schema,
3464 sys.indexes.name AS index_name,
3465 sys.index_columns.key_ordinal AS ordinal_position,
3466 sys.schemas.name AS table_schema,
3467 sys.objects.name AS table_name,
3468 sys.columns.name AS column_name
3469 FROM
3470 sys.indexes
3471 INNER JOIN
3472 sys.objects ON
3473 sys.objects.object_id = sys.indexes.object_id
3474 INNER JOIN
3475 sys.schemas ON
3476 sys.schemas.schema_id = sys.objects.schema_id
3477 INNER JOIN
3478 sys.index_columns ON
3479 sys.index_columns.object_id = sys.objects.object_id
3480 AND sys.index_columns.index_id = sys.indexes.index_id
3481 INNER JOIN
3482 sys.columns ON
3483 sys.columns.object_id = sys.indexes.object_id
3484 AND sys.columns.column_id = sys.index_columns.column_id
3485)
3486 SELECT
3487 fk_info.constraint_schema,
3488 fk_info.constraint_name,
3489 fk_info.ordinal_position,
3490 fk_info.constrained_column,
3491 constraint_info.table_schema AS referred_table_schema,
3492 constraint_info.table_name AS referred_table_name,
3493 constraint_info.column_name AS referred_column,
3494 fk_info.match_option,
3495 fk_info.update_rule,
3496 fk_info.delete_rule
3497 FROM
3498 fk_info INNER JOIN constraint_info ON
3499 constraint_info.constraint_schema =
3500 fk_info.unique_constraint_schema
3501 AND constraint_info.constraint_name =
3502 fk_info.unique_constraint_name
3503 AND constraint_info.ordinal_position = fk_info.ordinal_position
3504 UNION
3505 SELECT
3506 fk_info.constraint_schema,
3507 fk_info.constraint_name,
3508 fk_info.ordinal_position,
3509 fk_info.constrained_column,
3510 index_info.table_schema AS referred_table_schema,
3511 index_info.table_name AS referred_table_name,
3512 index_info.column_name AS referred_column,
3513 fk_info.match_option,
3514 fk_info.update_rule,
3515 fk_info.delete_rule
3516 FROM
3517 fk_info INNER JOIN index_info ON
3518 index_info.index_schema = fk_info.unique_constraint_schema
3519 AND index_info.index_name = fk_info.unique_constraint_name
3520 AND index_info.ordinal_position = fk_info.ordinal_position
3522 ORDER BY fk_info.constraint_schema, fk_info.constraint_name,
3523 fk_info.ordinal_position
3524"""
3525 )
3526 .bindparams(
3527 sql.bindparam("tablename", tablename, ischema.CoerceUnicode()),
3528 sql.bindparam("owner", owner, ischema.CoerceUnicode()),
3529 )
3530 .columns(
3531 constraint_schema=sqltypes.Unicode(),
3532 constraint_name=sqltypes.Unicode(),
3533 table_schema=sqltypes.Unicode(),
3534 table_name=sqltypes.Unicode(),
3535 constrained_column=sqltypes.Unicode(),
3536 referred_table_schema=sqltypes.Unicode(),
3537 referred_table_name=sqltypes.Unicode(),
3538 referred_column=sqltypes.Unicode(),
3539 )
3540 )
3542 # group rows by constraint ID, to handle multi-column FKs
3543 fkeys = []
3545 def fkey_rec():
3546 return {
3547 "name": None,
3548 "constrained_columns": [],
3549 "referred_schema": None,
3550 "referred_table": None,
3551 "referred_columns": [],
3552 "options": {},
3553 }
3555 fkeys = util.defaultdict(fkey_rec)
3557 for r in connection.execute(s).fetchall():
3558 (
3559 _, # constraint schema
3560 rfknm,
3561 _, # ordinal position
3562 scol,
3563 rschema,
3564 rtbl,
3565 rcol,
3566 # TODO: we support match=<keyword> for foreign keys so
3567 # we can support this also, PG has match=FULL for example
3568 # but this seems to not be a valid value for SQL Server
3569 _, # match rule
3570 fkuprule,
3571 fkdelrule,
3572 ) = r
3574 rec = fkeys[rfknm]
3575 rec["name"] = rfknm
3577 if fkuprule != "NO ACTION":
3578 rec["options"]["onupdate"] = fkuprule
3580 if fkdelrule != "NO ACTION":
3581 rec["options"]["ondelete"] = fkdelrule
3583 if not rec["referred_table"]:
3584 rec["referred_table"] = rtbl
3585 if schema is not None or owner != rschema:
3586 if dbname:
3587 rschema = dbname + "." + rschema
3588 rec["referred_schema"] = rschema
3590 local_cols, remote_cols = (
3591 rec["constrained_columns"],
3592 rec["referred_columns"],
3593 )
3595 local_cols.append(scol)
3596 remote_cols.append(rcol)
3598 return list(fkeys.values())