1# dialects/mssql/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9"""
10.. dialect:: mssql
11 :name: Microsoft SQL Server
12 :normal_support: 2012+
13 :best_effort: 2005+
14
15.. _mssql_external_dialects:
16
17External Dialects
18-----------------
19
20In addition to the above DBAPI layers with native SQLAlchemy support, there
21are third-party dialects for other DBAPI layers that are compatible
22with SQL Server. See the "External Dialects" list on the
23:ref:`dialect_toplevel` page.
24
25.. _mssql_identity:
26
27Auto Increment Behavior / IDENTITY Columns
28------------------------------------------
29
30SQL Server provides so-called "auto incrementing" behavior using the
31``IDENTITY`` construct, which can be placed on any single integer column in a
32table. SQLAlchemy considers ``IDENTITY`` within its default "autoincrement"
33behavior for an integer primary key column, described at
34:paramref:`_schema.Column.autoincrement`. This means that by default,
35the first integer primary key column in a :class:`_schema.Table` will be
36considered to be the identity column - unless it is associated with a
37:class:`.Sequence` - and will generate DDL as such::
38
39 from sqlalchemy import Table, MetaData, Column, Integer
40
41 m = MetaData()
42 t = Table(
43 "t",
44 m,
45 Column("id", Integer, primary_key=True),
46 Column("x", Integer),
47 )
48 m.create_all(engine)
49
50The above example will generate DDL as:
51
52.. sourcecode:: sql
53
54 CREATE TABLE t (
55 id INTEGER NOT NULL IDENTITY,
56 x INTEGER NULL,
57 PRIMARY KEY (id)
58 )
59
60For the case where this default generation of ``IDENTITY`` is not desired,
61specify ``False`` for the :paramref:`_schema.Column.autoincrement` flag,
62on the first integer primary key column::
63
64 m = MetaData()
65 t = Table(
66 "t",
67 m,
68 Column("id", Integer, primary_key=True, autoincrement=False),
69 Column("x", Integer),
70 )
71 m.create_all(engine)
72
73To add the ``IDENTITY`` keyword to a non-primary key column, specify
74``True`` for the :paramref:`_schema.Column.autoincrement` flag on the desired
75:class:`_schema.Column` object, and ensure that
76:paramref:`_schema.Column.autoincrement`
77is set to ``False`` on any integer primary key column::
78
79 m = MetaData()
80 t = Table(
81 "t",
82 m,
83 Column("id", Integer, primary_key=True, autoincrement=False),
84 Column("x", Integer, autoincrement=True),
85 )
86 m.create_all(engine)
87
88.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
89 in a :class:`_schema.Column` to specify the start and increment
90 parameters of an IDENTITY. These replace
91 the use of the :class:`.Sequence` object in order to specify these values.
92
93.. deprecated:: 1.4
94
95 The ``mssql_identity_start`` and ``mssql_identity_increment`` parameters
96 to :class:`_schema.Column` are deprecated and should we replaced by
97 an :class:`_schema.Identity` object. Specifying both ways of configuring
98 an IDENTITY will result in a compile error.
99 These options are also no longer returned as part of the
100 ``dialect_options`` key in :meth:`_reflection.Inspector.get_columns`.
101 Use the information in the ``identity`` key instead.
102
103.. deprecated:: 1.3
104
105 The use of :class:`.Sequence` to specify IDENTITY characteristics is
106 deprecated and will be removed in a future release. Please use
107 the :class:`_schema.Identity` object parameters
108 :paramref:`_schema.Identity.start` and
109 :paramref:`_schema.Identity.increment`.
110
111.. versionchanged:: 1.4 Removed the ability to use a :class:`.Sequence`
112 object to modify IDENTITY characteristics. :class:`.Sequence` objects
113 now only manipulate true T-SQL SEQUENCE types.
114
115.. note::
116
117 There can only be one IDENTITY column on the table. When using
118 ``autoincrement=True`` to enable the IDENTITY keyword, SQLAlchemy does not
119 guard against multiple columns specifying the option simultaneously. The
120 SQL Server database will instead reject the ``CREATE TABLE`` statement.
121
122.. note::
123
124 An INSERT statement which attempts to provide a value for a column that is
125 marked with IDENTITY will be rejected by SQL Server. In order for the
126 value to be accepted, a session-level option "SET IDENTITY_INSERT" must be
127 enabled. The SQLAlchemy SQL Server dialect will perform this operation
128 automatically when using a core :class:`_expression.Insert`
129 construct; if the
130 execution specifies a value for the IDENTITY column, the "IDENTITY_INSERT"
131 option will be enabled for the span of that statement's invocation.However,
132 this scenario is not high performing and should not be relied upon for
133 normal use. If a table doesn't actually require IDENTITY behavior in its
134 integer primary key column, the keyword should be disabled when creating
135 the table by ensuring that ``autoincrement=False`` is set.
136
137Controlling "Start" and "Increment"
138^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
139
140Specific control over the "start" and "increment" values for
141the ``IDENTITY`` generator are provided using the
142:paramref:`_schema.Identity.start` and :paramref:`_schema.Identity.increment`
143parameters passed to the :class:`_schema.Identity` object::
144
145 from sqlalchemy import Table, Integer, Column, Identity
146
147 test = Table(
148 "test",
149 metadata,
150 Column(
151 "id", Integer, primary_key=True, Identity(start=100, increment=10)
152 ),
153 Column("name", String(20)),
154 )
155
156The CREATE TABLE for the above :class:`_schema.Table` object would be:
157
158.. sourcecode:: sql
159
160 CREATE TABLE test (
161 id INTEGER NOT NULL IDENTITY(100,10) PRIMARY KEY,
162 name VARCHAR(20) NULL,
163 )
164
165.. note::
166
167 The :class:`_schema.Identity` object supports many other parameter in
168 addition to ``start`` and ``increment``. These are not supported by
169 SQL Server and will be ignored when generating the CREATE TABLE ddl.
170
171.. versionchanged:: 1.3.19 The :class:`_schema.Identity` object is
172 now used to affect the
173 ``IDENTITY`` generator for a :class:`_schema.Column` under SQL Server.
174 Previously, the :class:`.Sequence` object was used. As SQL Server now
175 supports real sequences as a separate construct, :class:`.Sequence` will be
176 functional in the normal way starting from SQLAlchemy version 1.4.
177
178
179Using IDENTITY with Non-Integer numeric types
180^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
181
182SQL Server also allows ``IDENTITY`` to be used with ``NUMERIC`` columns. To
183implement this pattern smoothly in SQLAlchemy, the primary datatype of the
184column should remain as ``Integer``, however the underlying implementation
185type deployed to the SQL Server database can be specified as ``Numeric`` using
186:meth:`.TypeEngine.with_variant`::
187
188 from sqlalchemy import Column
189 from sqlalchemy import Integer
190 from sqlalchemy import Numeric
191 from sqlalchemy import String
192 from sqlalchemy.ext.declarative import declarative_base
193
194 Base = declarative_base()
195
196
197 class TestTable(Base):
198 __tablename__ = "test"
199 id = Column(
200 Integer().with_variant(Numeric(10, 0), "mssql"),
201 primary_key=True,
202 autoincrement=True,
203 )
204 name = Column(String)
205
206In the above example, ``Integer().with_variant()`` provides clear usage
207information that accurately describes the intent of the code. The general
208restriction that ``autoincrement`` only applies to ``Integer`` is established
209at the metadata level and not at the per-dialect level.
210
211When using the above pattern, the primary key identifier that comes back from
212the insertion of a row, which is also the value that would be assigned to an
213ORM object such as ``TestTable`` above, will be an instance of ``Decimal()``
214and not ``int`` when using SQL Server. The numeric return type of the
215:class:`_types.Numeric` type can be changed to return floats by passing False
216to :paramref:`_types.Numeric.asdecimal`. To normalize the return type of the
217above ``Numeric(10, 0)`` to return Python ints (which also support "long"
218integer values in Python 3), use :class:`_types.TypeDecorator` as follows::
219
220 from sqlalchemy import TypeDecorator
221
222
223 class NumericAsInteger(TypeDecorator):
224 "normalize floating point return values into ints"
225
226 impl = Numeric(10, 0, asdecimal=False)
227 cache_ok = True
228
229 def process_result_value(self, value, dialect):
230 if value is not None:
231 value = int(value)
232 return value
233
234
235 class TestTable(Base):
236 __tablename__ = "test"
237 id = Column(
238 Integer().with_variant(NumericAsInteger, "mssql"),
239 primary_key=True,
240 autoincrement=True,
241 )
242 name = Column(String)
243
244.. _mssql_insert_behavior:
245
246INSERT behavior
247^^^^^^^^^^^^^^^^
248
249Handling of the ``IDENTITY`` column at INSERT time involves two key
250techniques. The most common is being able to fetch the "last inserted value"
251for a given ``IDENTITY`` column, a process which SQLAlchemy performs
252implicitly in many cases, most importantly within the ORM.
253
254The process for fetching this value has several variants:
255
256* In the vast majority of cases, RETURNING is used in conjunction with INSERT
257 statements on SQL Server in order to get newly generated primary key values:
258
259 .. sourcecode:: sql
260
261 INSERT INTO t (x) OUTPUT inserted.id VALUES (?)
262
263 As of SQLAlchemy 2.0, the :ref:`engine_insertmanyvalues` feature is also
264 used by default to optimize many-row INSERT statements; for SQL Server
265 the feature takes place for both RETURNING and-non RETURNING
266 INSERT statements.
267
268 .. versionchanged:: 2.0.10 The :ref:`engine_insertmanyvalues` feature for
269 SQL Server was temporarily disabled for SQLAlchemy version 2.0.9 due to
270 issues with row ordering. As of 2.0.10 the feature is re-enabled, with
271 special case handling for the unit of work's requirement for RETURNING to
272 be ordered.
273
274* When RETURNING is not available or has been disabled via
275 ``implicit_returning=False``, either the ``scope_identity()`` function or
276 the ``@@identity`` variable is used; behavior varies by backend:
277
278 * when using PyODBC, the phrase ``; select scope_identity()`` will be
279 appended to the end of the INSERT statement; a second result set will be
280 fetched in order to receive the value. Given a table as::
281
282 t = Table(
283 "t",
284 metadata,
285 Column("id", Integer, primary_key=True),
286 Column("x", Integer),
287 implicit_returning=False,
288 )
289
290 an INSERT will look like:
291
292 .. sourcecode:: sql
293
294 INSERT INTO t (x) VALUES (?); select scope_identity()
295
296 * Other dialects such as pymssql will call upon
297 ``SELECT scope_identity() AS lastrowid`` subsequent to an INSERT
298 statement. If the flag ``use_scope_identity=False`` is passed to
299 :func:`_sa.create_engine`,
300 the statement ``SELECT @@identity AS lastrowid``
301 is used instead.
302
303A table that contains an ``IDENTITY`` column will prohibit an INSERT statement
304that refers to the identity column explicitly. The SQLAlchemy dialect will
305detect when an INSERT construct, created using a core
306:func:`_expression.insert`
307construct (not a plain string SQL), refers to the identity column, and
308in this case will emit ``SET IDENTITY_INSERT ON`` prior to the insert
309statement proceeding, and ``SET IDENTITY_INSERT OFF`` subsequent to the
310execution. Given this example::
311
312 m = MetaData()
313 t = Table(
314 "t", m, Column("id", Integer, primary_key=True), Column("x", Integer)
315 )
316 m.create_all(engine)
317
318 with engine.begin() as conn:
319 conn.execute(t.insert(), {"id": 1, "x": 1}, {"id": 2, "x": 2})
320
321The above column will be created with IDENTITY, however the INSERT statement
322we emit is specifying explicit values. In the echo output we can see
323how SQLAlchemy handles this:
324
325.. sourcecode:: sql
326
327 CREATE TABLE t (
328 id INTEGER NOT NULL IDENTITY(1,1),
329 x INTEGER NULL,
330 PRIMARY KEY (id)
331 )
332
333 COMMIT
334 SET IDENTITY_INSERT t ON
335 INSERT INTO t (id, x) VALUES (?, ?)
336 ((1, 1), (2, 2))
337 SET IDENTITY_INSERT t OFF
338 COMMIT
339
340
341
342This is an auxiliary use case suitable for testing and bulk insert scenarios.
343
344SEQUENCE support
345----------------
346
347The :class:`.Sequence` object creates "real" sequences, i.e.,
348``CREATE SEQUENCE``:
349
350.. sourcecode:: pycon+sql
351
352 >>> from sqlalchemy import Sequence
353 >>> from sqlalchemy.schema import CreateSequence
354 >>> from sqlalchemy.dialects import mssql
355 >>> print(
356 ... CreateSequence(Sequence("my_seq", start=1)).compile(
357 ... dialect=mssql.dialect()
358 ... )
359 ... )
360 {printsql}CREATE SEQUENCE my_seq START WITH 1
361
362For integer primary key generation, SQL Server's ``IDENTITY`` construct should
363generally be preferred vs. sequence.
364
365.. tip::
366
367 The default start value for T-SQL is ``-2**63`` instead of 1 as
368 in most other SQL databases. Users should explicitly set the
369 :paramref:`.Sequence.start` to 1 if that's the expected default::
370
371 seq = Sequence("my_sequence", start=1)
372
373.. versionadded:: 1.4 added SQL Server support for :class:`.Sequence`
374
375.. versionchanged:: 2.0 The SQL Server dialect will no longer implicitly
376 render "START WITH 1" for ``CREATE SEQUENCE``, which was the behavior
377 first implemented in version 1.4.
378
379MAX on VARCHAR / NVARCHAR
380-------------------------
381
382SQL Server supports the special string "MAX" within the
383:class:`_types.VARCHAR` and :class:`_types.NVARCHAR` datatypes,
384to indicate "maximum length possible". The dialect currently handles this as
385a length of "None" in the base type, rather than supplying a
386dialect-specific version of these types, so that a base type
387specified such as ``VARCHAR(None)`` can assume "unlengthed" behavior on
388more than one backend without using dialect-specific types.
389
390To build a SQL Server VARCHAR or NVARCHAR with MAX length, use None::
391
392 my_table = Table(
393 "my_table",
394 metadata,
395 Column("my_data", VARCHAR(None)),
396 Column("my_n_data", NVARCHAR(None)),
397 )
398
399Collation Support
400-----------------
401
402Character collations are supported by the base string types,
403specified by the string argument "collation"::
404
405 from sqlalchemy import VARCHAR
406
407 Column("login", VARCHAR(32, collation="Latin1_General_CI_AS"))
408
409When such a column is associated with a :class:`_schema.Table`, the
410CREATE TABLE statement for this column will yield:
411
412.. sourcecode:: sql
413
414 login VARCHAR(32) COLLATE Latin1_General_CI_AS NULL
415
416LIMIT/OFFSET Support
417--------------------
418
419MSSQL has added support for LIMIT / OFFSET as of SQL Server 2012, via the
420"OFFSET n ROWS" and "FETCH NEXT n ROWS" clauses. SQLAlchemy supports these
421syntaxes automatically if SQL Server 2012 or greater is detected.
422
423.. versionchanged:: 1.4 support added for SQL Server "OFFSET n ROWS" and
424 "FETCH NEXT n ROWS" syntax.
425
426For statements that specify only LIMIT and no OFFSET, all versions of SQL
427Server support the TOP keyword. This syntax is used for all SQL Server
428versions when no OFFSET clause is present. A statement such as::
429
430 select(some_table).limit(5)
431
432will render similarly to:
433
434.. sourcecode:: sql
435
436 SELECT TOP 5 col1, col2.. FROM table
437
438For versions of SQL Server prior to SQL Server 2012, a statement that uses
439LIMIT and OFFSET, or just OFFSET alone, will be rendered using the
440``ROW_NUMBER()`` window function. A statement such as::
441
442 select(some_table).order_by(some_table.c.col3).limit(5).offset(10)
443
444will render similarly to:
445
446.. sourcecode:: sql
447
448 SELECT anon_1.col1, anon_1.col2 FROM (SELECT col1, col2,
449 ROW_NUMBER() OVER (ORDER BY col3) AS
450 mssql_rn FROM table WHERE t.x = :x_1) AS
451 anon_1 WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1
452
453Note that when using LIMIT and/or OFFSET, whether using the older
454or newer SQL Server syntaxes, the statement must have an ORDER BY as well,
455else a :class:`.CompileError` is raised.
456
457.. _mssql_comment_support:
458
459DDL Comment Support
460--------------------
461
462Comment support, which includes DDL rendering for attributes such as
463:paramref:`_schema.Table.comment` and :paramref:`_schema.Column.comment`, as
464well as the ability to reflect these comments, is supported assuming a
465supported version of SQL Server is in use. If a non-supported version such as
466Azure Synapse is detected at first-connect time (based on the presence
467of the ``fn_listextendedproperty`` SQL function), comment support including
468rendering and table-comment reflection is disabled, as both features rely upon
469SQL Server stored procedures and functions that are not available on all
470backend types.
471
472To force comment support to be on or off, bypassing autodetection, set the
473parameter ``supports_comments`` within :func:`_sa.create_engine`::
474
475 e = create_engine("mssql+pyodbc://u:p@dsn", supports_comments=False)
476
477.. versionadded:: 2.0 Added support for table and column comments for
478 the SQL Server dialect, including DDL generation and reflection.
479
480.. _mssql_isolation_level:
481
482Transaction Isolation Level
483---------------------------
484
485All SQL Server dialects support setting of transaction isolation level
486both via a dialect-specific parameter
487:paramref:`_sa.create_engine.isolation_level`
488accepted by :func:`_sa.create_engine`,
489as well as the :paramref:`.Connection.execution_options.isolation_level`
490argument as passed to
491:meth:`_engine.Connection.execution_options`.
492This feature works by issuing the
493command ``SET TRANSACTION ISOLATION LEVEL <level>`` for
494each new connection.
495
496To set isolation level using :func:`_sa.create_engine`::
497
498 engine = create_engine(
499 "mssql+pyodbc://scott:tiger@ms_2008", isolation_level="REPEATABLE READ"
500 )
501
502To set using per-connection execution options::
503
504 connection = engine.connect()
505 connection = connection.execution_options(isolation_level="READ COMMITTED")
506
507Valid values for ``isolation_level`` include:
508
509* ``AUTOCOMMIT`` - pyodbc / pymssql-specific
510* ``READ COMMITTED``
511* ``READ UNCOMMITTED``
512* ``REPEATABLE READ``
513* ``SERIALIZABLE``
514* ``SNAPSHOT`` - specific to SQL Server
515
516There are also more options for isolation level configurations, such as
517"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply
518different isolation level settings. See the discussion at
519:ref:`dbapi_autocommit` for background.
520
521.. seealso::
522
523 :ref:`dbapi_autocommit`
524
525.. _mssql_reset_on_return:
526
527Temporary Table / Resource Reset for Connection Pooling
528-------------------------------------------------------
529
530The :class:`.QueuePool` connection pool implementation used
531by the SQLAlchemy :class:`.Engine` object includes
532:ref:`reset on return <pool_reset_on_return>` behavior that will invoke
533the DBAPI ``.rollback()`` method when connections are returned to the pool.
534While this rollback will clear out the immediate state used by the previous
535transaction, it does not cover a wider range of session-level state, including
536temporary tables as well as other server state such as prepared statement
537handles and statement caches. An undocumented SQL Server procedure known
538as ``sp_reset_connection`` is known to be a workaround for this issue which
539will reset most of the session state that builds up on a connection, including
540temporary tables.
541
542To install ``sp_reset_connection`` as the means of performing reset-on-return,
543the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated in the
544example below. The :paramref:`_sa.create_engine.pool_reset_on_return` parameter
545is set to ``None`` so that the custom scheme can replace the default behavior
546completely. The custom hook implementation calls ``.rollback()`` in any case,
547as it's usually important that the DBAPI's own tracking of commit/rollback
548will remain consistent with the state of the transaction::
549
550 from sqlalchemy import create_engine
551 from sqlalchemy import event
552
553 mssql_engine = create_engine(
554 "mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server",
555 # disable default reset-on-return scheme
556 pool_reset_on_return=None,
557 )
558
559
560 @event.listens_for(mssql_engine, "reset")
561 def _reset_mssql(dbapi_connection, connection_record, reset_state):
562 if not reset_state.terminate_only:
563 dbapi_connection.execute("{call sys.sp_reset_connection}")
564
565 # so that the DBAPI itself knows that the connection has been
566 # reset
567 dbapi_connection.rollback()
568
569.. versionchanged:: 2.0.0b3 Added additional state arguments to
570 the :meth:`.PoolEvents.reset` event and additionally ensured the event
571 is invoked for all "reset" occurrences, so that it's appropriate
572 as a place for custom "reset" handlers. Previous schemes which
573 use the :meth:`.PoolEvents.checkin` handler remain usable as well.
574
575.. seealso::
576
577 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation
578
579Nullability
580-----------
581MSSQL has support for three levels of column nullability. The default
582nullability allows nulls and is explicit in the CREATE TABLE
583construct:
584
585.. sourcecode:: sql
586
587 name VARCHAR(20) NULL
588
589If ``nullable=None`` is specified then no specification is made. In
590other words the database's configured default is used. This will
591render:
592
593.. sourcecode:: sql
594
595 name VARCHAR(20)
596
597If ``nullable`` is ``True`` or ``False`` then the column will be
598``NULL`` or ``NOT NULL`` respectively.
599
600Date / Time Handling
601--------------------
602DATE and TIME are supported. Bind parameters are converted
603to datetime.datetime() objects as required by most MSSQL drivers,
604and results are processed from strings if needed.
605The DATE and TIME types are not available for MSSQL 2005 and
606previous - if a server version below 2008 is detected, DDL
607for these types will be issued as DATETIME.
608
609.. _mssql_large_type_deprecation:
610
611Large Text/Binary Type Deprecation
612----------------------------------
613
614Per
615`SQL Server 2012/2014 Documentation <https://technet.microsoft.com/en-us/library/ms187993.aspx>`_,
616the ``NTEXT``, ``TEXT`` and ``IMAGE`` datatypes are to be removed from SQL
617Server in a future release. SQLAlchemy normally relates these types to the
618:class:`.UnicodeText`, :class:`_expression.TextClause` and
619:class:`.LargeBinary` datatypes.
620
621In order to accommodate this change, a new flag ``deprecate_large_types``
622is added to the dialect, which will be automatically set based on detection
623of the server version in use, if not otherwise set by the user. The
624behavior of this flag is as follows:
625
626* When this flag is ``True``, the :class:`.UnicodeText`,
627 :class:`_expression.TextClause` and
628 :class:`.LargeBinary` datatypes, when used to render DDL, will render the
629 types ``NVARCHAR(max)``, ``VARCHAR(max)``, and ``VARBINARY(max)``,
630 respectively. This is a new behavior as of the addition of this flag.
631
632* When this flag is ``False``, the :class:`.UnicodeText`,
633 :class:`_expression.TextClause` and
634 :class:`.LargeBinary` datatypes, when used to render DDL, will render the
635 types ``NTEXT``, ``TEXT``, and ``IMAGE``,
636 respectively. This is the long-standing behavior of these types.
637
638* The flag begins with the value ``None``, before a database connection is
639 established. If the dialect is used to render DDL without the flag being
640 set, it is interpreted the same as ``False``.
641
642* On first connection, the dialect detects if SQL Server version 2012 or
643 greater is in use; if the flag is still at ``None``, it sets it to ``True``
644 or ``False`` based on whether 2012 or greater is detected.
645
646* The flag can be set to either ``True`` or ``False`` when the dialect
647 is created, typically via :func:`_sa.create_engine`::
648
649 eng = create_engine(
650 "mssql+pymssql://user:pass@host/db", deprecate_large_types=True
651 )
652
653* Complete control over whether the "old" or "new" types are rendered is
654 available in all SQLAlchemy versions by using the UPPERCASE type objects
655 instead: :class:`_types.NVARCHAR`, :class:`_types.VARCHAR`,
656 :class:`_types.VARBINARY`, :class:`_types.TEXT`, :class:`_mssql.NTEXT`,
657 :class:`_mssql.IMAGE`
658 will always remain fixed and always output exactly that
659 type.
660
661.. _multipart_schema_names:
662
663Multipart Schema Names
664----------------------
665
666SQL Server schemas sometimes require multiple parts to their "schema"
667qualifier, that is, including the database name and owner name as separate
668tokens, such as ``mydatabase.dbo.some_table``. These multipart names can be set
669at once using the :paramref:`_schema.Table.schema` argument of
670:class:`_schema.Table`::
671
672 Table(
673 "some_table",
674 metadata,
675 Column("q", String(50)),
676 schema="mydatabase.dbo",
677 )
678
679When performing operations such as table or component reflection, a schema
680argument that contains a dot will be split into separate
681"database" and "owner" components in order to correctly query the SQL
682Server information schema tables, as these two values are stored separately.
683Additionally, when rendering the schema name for DDL or SQL, the two
684components will be quoted separately for case sensitive names and other
685special characters. Given an argument as below::
686
687 Table(
688 "some_table",
689 metadata,
690 Column("q", String(50)),
691 schema="MyDataBase.dbo",
692 )
693
694The above schema would be rendered as ``[MyDataBase].dbo``, and also in
695reflection, would be reflected using "dbo" as the owner and "MyDataBase"
696as the database name.
697
698To control how the schema name is broken into database / owner,
699specify brackets (which in SQL Server are quoting characters) in the name.
700Below, the "owner" will be considered as ``MyDataBase.dbo`` and the
701"database" will be None::
702
703 Table(
704 "some_table",
705 metadata,
706 Column("q", String(50)),
707 schema="[MyDataBase.dbo]",
708 )
709
710To individually specify both database and owner name with special characters
711or embedded dots, use two sets of brackets::
712
713 Table(
714 "some_table",
715 metadata,
716 Column("q", String(50)),
717 schema="[MyDataBase.Period].[MyOwner.Dot]",
718 )
719
720.. versionchanged:: 1.2 the SQL Server dialect now treats brackets as
721 identifier delimiters splitting the schema into separate database
722 and owner tokens, to allow dots within either name itself.
723
724.. _legacy_schema_rendering:
725
726Legacy Schema Mode
727------------------
728
729Very old versions of the MSSQL dialect introduced the behavior such that a
730schema-qualified table would be auto-aliased when used in a
731SELECT statement; given a table::
732
733 account_table = Table(
734 "account",
735 metadata,
736 Column("id", Integer, primary_key=True),
737 Column("info", String(100)),
738 schema="customer_schema",
739 )
740
741this legacy mode of rendering would assume that "customer_schema.account"
742would not be accepted by all parts of the SQL statement, as illustrated
743below:
744
745.. sourcecode:: pycon+sql
746
747 >>> eng = create_engine("mssql+pymssql://mydsn", legacy_schema_aliasing=True)
748 >>> print(account_table.select().compile(eng))
749 {printsql}SELECT account_1.id, account_1.info
750 FROM customer_schema.account AS account_1
751
752This mode of behavior is now off by default, as it appears to have served
753no purpose; however in the case that legacy applications rely upon it,
754it is available using the ``legacy_schema_aliasing`` argument to
755:func:`_sa.create_engine` as illustrated above.
756
757.. deprecated:: 1.4
758
759 The ``legacy_schema_aliasing`` flag is now
760 deprecated and will be removed in a future release.
761
762.. _mssql_indexes:
763
764Clustered Index Support
765-----------------------
766
767The MSSQL dialect supports clustered indexes (and primary keys) via the
768``mssql_clustered`` option. This option is available to :class:`.Index`,
769:class:`.UniqueConstraint`. and :class:`.PrimaryKeyConstraint`.
770For indexes this option can be combined with the ``mssql_columnstore`` one
771to create a clustered columnstore index.
772
773To generate a clustered index::
774
775 Index("my_index", table.c.x, mssql_clustered=True)
776
777which renders the index as ``CREATE CLUSTERED INDEX my_index ON table (x)``.
778
779To generate a clustered primary key use::
780
781 Table(
782 "my_table",
783 metadata,
784 Column("x", ...),
785 Column("y", ...),
786 PrimaryKeyConstraint("x", "y", mssql_clustered=True),
787 )
788
789which will render the table, for example, as:
790
791.. sourcecode:: sql
792
793 CREATE TABLE my_table (
794 x INTEGER NOT NULL,
795 y INTEGER NOT NULL,
796 PRIMARY KEY CLUSTERED (x, y)
797 )
798
799Similarly, we can generate a clustered unique constraint using::
800
801 Table(
802 "my_table",
803 metadata,
804 Column("x", ...),
805 Column("y", ...),
806 PrimaryKeyConstraint("x"),
807 UniqueConstraint("y", mssql_clustered=True),
808 )
809
810To explicitly request a non-clustered primary key (for example, when
811a separate clustered index is desired), use::
812
813 Table(
814 "my_table",
815 metadata,
816 Column("x", ...),
817 Column("y", ...),
818 PrimaryKeyConstraint("x", "y", mssql_clustered=False),
819 )
820
821which will render the table, for example, as:
822
823.. sourcecode:: sql
824
825 CREATE TABLE my_table (
826 x INTEGER NOT NULL,
827 y INTEGER NOT NULL,
828 PRIMARY KEY NONCLUSTERED (x, y)
829 )
830
831Columnstore Index Support
832-------------------------
833
834The MSSQL dialect supports columnstore indexes via the ``mssql_columnstore``
835option. This option is available to :class:`.Index`. It be combined with
836the ``mssql_clustered`` option to create a clustered columnstore index.
837
838To generate a columnstore index::
839
840 Index("my_index", table.c.x, mssql_columnstore=True)
841
842which renders the index as ``CREATE COLUMNSTORE INDEX my_index ON table (x)``.
843
844To generate a clustered columnstore index provide no columns::
845
846 idx = Index("my_index", mssql_clustered=True, mssql_columnstore=True)
847 # required to associate the index with the table
848 table.append_constraint(idx)
849
850the above renders the index as
851``CREATE CLUSTERED COLUMNSTORE INDEX my_index ON table``.
852
853.. versionadded:: 2.0.18
854
855MSSQL-Specific Index Options
856-----------------------------
857
858In addition to clustering, the MSSQL dialect supports other special options
859for :class:`.Index`.
860
861INCLUDE
862^^^^^^^
863
864The ``mssql_include`` option renders INCLUDE(colname) for the given string
865names::
866
867 Index("my_index", table.c.x, mssql_include=["y"])
868
869would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)``
870
871.. _mssql_index_where:
872
873Filtered Indexes
874^^^^^^^^^^^^^^^^
875
876The ``mssql_where`` option renders WHERE(condition) for the given string
877names::
878
879 Index("my_index", table.c.x, mssql_where=table.c.x > 10)
880
881would render the index as ``CREATE INDEX my_index ON table (x) WHERE x > 10``.
882
883.. versionadded:: 1.3.4
884
885Index ordering
886^^^^^^^^^^^^^^
887
888Index ordering is available via functional expressions, such as::
889
890 Index("my_index", table.c.x.desc())
891
892would render the index as ``CREATE INDEX my_index ON table (x DESC)``
893
894.. seealso::
895
896 :ref:`schema_indexes_functional`
897
898Compatibility Levels
899--------------------
900MSSQL supports the notion of setting compatibility levels at the
901database level. This allows, for instance, to run a database that
902is compatible with SQL2000 while running on a SQL2005 database
903server. ``server_version_info`` will always return the database
904server version information (in this case SQL2005) and not the
905compatibility level information. Because of this, if running under
906a backwards compatibility mode SQLAlchemy may attempt to use T-SQL
907statements that are unable to be parsed by the database server.
908
909.. _mssql_triggers:
910
911Triggers
912--------
913
914SQLAlchemy by default uses OUTPUT INSERTED to get at newly
915generated primary key values via IDENTITY columns or other
916server side defaults. MS-SQL does not
917allow the usage of OUTPUT INSERTED on tables that have triggers.
918To disable the usage of OUTPUT INSERTED on a per-table basis,
919specify ``implicit_returning=False`` for each :class:`_schema.Table`
920which has triggers::
921
922 Table(
923 "mytable",
924 metadata,
925 Column("id", Integer, primary_key=True),
926 # ...,
927 implicit_returning=False,
928 )
929
930Declarative form::
931
932 class MyClass(Base):
933 # ...
934 __table_args__ = {"implicit_returning": False}
935
936.. _mssql_rowcount_versioning:
937
938Rowcount Support / ORM Versioning
939---------------------------------
940
941The SQL Server drivers may have limited ability to return the number
942of rows updated from an UPDATE or DELETE statement.
943
944As of this writing, the PyODBC driver is not able to return a rowcount when
945OUTPUT INSERTED is used. Previous versions of SQLAlchemy therefore had
946limitations for features such as the "ORM Versioning" feature that relies upon
947accurate rowcounts in order to match version numbers with matched rows.
948
949SQLAlchemy 2.0 now retrieves the "rowcount" manually for these particular use
950cases based on counting the rows that arrived back within RETURNING; so while
951the driver still has this limitation, the ORM Versioning feature is no longer
952impacted by it. As of SQLAlchemy 2.0.5, ORM versioning has been fully
953re-enabled for the pyodbc driver.
954
955.. versionchanged:: 2.0.5 ORM versioning support is restored for the pyodbc
956 driver. Previously, a warning would be emitted during ORM flush that
957 versioning was not supported.
958
959
960Enabling Snapshot Isolation
961---------------------------
962
963SQL Server has a default transaction
964isolation mode that locks entire tables, and causes even mildly concurrent
965applications to have long held locks and frequent deadlocks.
966Enabling snapshot isolation for the database as a whole is recommended
967for modern levels of concurrency support. This is accomplished via the
968following ALTER DATABASE commands executed at the SQL prompt:
969
970.. sourcecode:: sql
971
972 ALTER DATABASE MyDatabase SET ALLOW_SNAPSHOT_ISOLATION ON
973
974 ALTER DATABASE MyDatabase SET READ_COMMITTED_SNAPSHOT ON
975
976Background on SQL Server snapshot isolation is available at
977https://msdn.microsoft.com/en-us/library/ms175095.aspx.
978
979""" # noqa
980
981from __future__ import annotations
982
983import codecs
984import datetime
985import operator
986import re
987from typing import Any
988from typing import overload
989from typing import TYPE_CHECKING
990from uuid import UUID as _python_UUID
991
992from . import information_schema as ischema
993from .json import JSON
994from .json import JSONIndexType
995from .json import JSONPathType
996from ... import exc
997from ... import Identity
998from ... import schema as sa_schema
999from ... import Sequence
1000from ... import sql
1001from ... import text
1002from ... import util
1003from ...engine import cursor as _cursor
1004from ...engine import default
1005from ...engine import reflection
1006from ...engine.reflection import ReflectionDefaults
1007from ...sql import coercions
1008from ...sql import compiler
1009from ...sql import elements
1010from ...sql import expression
1011from ...sql import func
1012from ...sql import quoted_name
1013from ...sql import roles
1014from ...sql import sqltypes
1015from ...sql import try_cast as try_cast # noqa: F401
1016from ...sql import util as sql_util
1017from ...sql._typing import is_sql_compiler
1018from ...sql.compiler import InsertmanyvaluesSentinelOpts
1019from ...sql.elements import TryCast as TryCast # noqa: F401
1020from ...types import BIGINT
1021from ...types import BINARY
1022from ...types import CHAR
1023from ...types import DATE
1024from ...types import DATETIME
1025from ...types import DECIMAL
1026from ...types import FLOAT
1027from ...types import INTEGER
1028from ...types import NCHAR
1029from ...types import NUMERIC
1030from ...types import NVARCHAR
1031from ...types import SMALLINT
1032from ...types import TEXT
1033from ...types import VARCHAR
1034from ...util import update_wrapper
1035from ...util.typing import Literal
1036
1037if TYPE_CHECKING:
1038 from ...sql.ddl import DropIndex
1039 from ...sql.dml import DMLState
1040 from ...sql.selectable import TableClause
1041
1042# https://sqlserverbuilds.blogspot.com/
1043MS_2017_VERSION = (14,)
1044MS_2016_VERSION = (13,)
1045MS_2014_VERSION = (12,)
1046MS_2012_VERSION = (11,)
1047MS_2008_VERSION = (10,)
1048MS_2005_VERSION = (9,)
1049MS_2000_VERSION = (8,)
1050
1051RESERVED_WORDS = {
1052 "add",
1053 "all",
1054 "alter",
1055 "and",
1056 "any",
1057 "as",
1058 "asc",
1059 "authorization",
1060 "backup",
1061 "begin",
1062 "between",
1063 "break",
1064 "browse",
1065 "bulk",
1066 "by",
1067 "cascade",
1068 "case",
1069 "check",
1070 "checkpoint",
1071 "close",
1072 "clustered",
1073 "coalesce",
1074 "collate",
1075 "column",
1076 "commit",
1077 "compute",
1078 "constraint",
1079 "contains",
1080 "containstable",
1081 "continue",
1082 "convert",
1083 "create",
1084 "cross",
1085 "current",
1086 "current_date",
1087 "current_time",
1088 "current_timestamp",
1089 "current_user",
1090 "cursor",
1091 "database",
1092 "dbcc",
1093 "deallocate",
1094 "declare",
1095 "default",
1096 "delete",
1097 "deny",
1098 "desc",
1099 "disk",
1100 "distinct",
1101 "distributed",
1102 "double",
1103 "drop",
1104 "dump",
1105 "else",
1106 "end",
1107 "errlvl",
1108 "escape",
1109 "except",
1110 "exec",
1111 "execute",
1112 "exists",
1113 "exit",
1114 "external",
1115 "fetch",
1116 "file",
1117 "fillfactor",
1118 "for",
1119 "foreign",
1120 "freetext",
1121 "freetexttable",
1122 "from",
1123 "full",
1124 "function",
1125 "goto",
1126 "grant",
1127 "group",
1128 "having",
1129 "holdlock",
1130 "identity",
1131 "identity_insert",
1132 "identitycol",
1133 "if",
1134 "in",
1135 "index",
1136 "inner",
1137 "insert",
1138 "intersect",
1139 "into",
1140 "is",
1141 "join",
1142 "key",
1143 "kill",
1144 "left",
1145 "like",
1146 "lineno",
1147 "load",
1148 "merge",
1149 "national",
1150 "nocheck",
1151 "nonclustered",
1152 "not",
1153 "null",
1154 "nullif",
1155 "of",
1156 "off",
1157 "offsets",
1158 "on",
1159 "open",
1160 "opendatasource",
1161 "openquery",
1162 "openrowset",
1163 "openxml",
1164 "option",
1165 "or",
1166 "order",
1167 "outer",
1168 "over",
1169 "percent",
1170 "pivot",
1171 "plan",
1172 "precision",
1173 "primary",
1174 "print",
1175 "proc",
1176 "procedure",
1177 "public",
1178 "raiserror",
1179 "read",
1180 "readtext",
1181 "reconfigure",
1182 "references",
1183 "replication",
1184 "restore",
1185 "restrict",
1186 "return",
1187 "revert",
1188 "revoke",
1189 "right",
1190 "rollback",
1191 "rowcount",
1192 "rowguidcol",
1193 "rule",
1194 "save",
1195 "schema",
1196 "securityaudit",
1197 "select",
1198 "session_user",
1199 "set",
1200 "setuser",
1201 "shutdown",
1202 "some",
1203 "statistics",
1204 "system_user",
1205 "table",
1206 "tablesample",
1207 "textsize",
1208 "then",
1209 "to",
1210 "top",
1211 "tran",
1212 "transaction",
1213 "trigger",
1214 "truncate",
1215 "tsequal",
1216 "union",
1217 "unique",
1218 "unpivot",
1219 "update",
1220 "updatetext",
1221 "use",
1222 "user",
1223 "values",
1224 "varying",
1225 "view",
1226 "waitfor",
1227 "when",
1228 "where",
1229 "while",
1230 "with",
1231 "writetext",
1232}
1233
1234
1235class REAL(sqltypes.REAL):
1236 """the SQL Server REAL datatype."""
1237
1238 def __init__(self, **kw):
1239 # REAL is a synonym for FLOAT(24) on SQL server.
1240 # it is only accepted as the word "REAL" in DDL, the numeric
1241 # precision value is not allowed to be present
1242 kw.setdefault("precision", 24)
1243 super().__init__(**kw)
1244
1245
1246class DOUBLE_PRECISION(sqltypes.DOUBLE_PRECISION):
1247 """the SQL Server DOUBLE PRECISION datatype.
1248
1249 .. versionadded:: 2.0.11
1250
1251 """
1252
1253 def __init__(self, **kw):
1254 # DOUBLE PRECISION is a synonym for FLOAT(53) on SQL server.
1255 # it is only accepted as the word "DOUBLE PRECISION" in DDL,
1256 # the numeric precision value is not allowed to be present
1257 kw.setdefault("precision", 53)
1258 super().__init__(**kw)
1259
1260
1261class TINYINT(sqltypes.Integer):
1262 __visit_name__ = "TINYINT"
1263
1264
1265# MSSQL DATE/TIME types have varied behavior, sometimes returning
1266# strings. MSDate/TIME check for everything, and always
1267# filter bind parameters into datetime objects (required by pyodbc,
1268# not sure about other dialects).
1269
1270
1271class _MSDate(sqltypes.Date):
1272 def bind_processor(self, dialect):
1273 def process(value):
1274 if type(value) == datetime.date:
1275 return datetime.datetime(value.year, value.month, value.day)
1276 else:
1277 return value
1278
1279 return process
1280
1281 _reg = re.compile(r"(\d+)-(\d+)-(\d+)")
1282
1283 def result_processor(self, dialect, coltype):
1284 def process(value):
1285 if isinstance(value, datetime.datetime):
1286 return value.date()
1287 elif isinstance(value, str):
1288 m = self._reg.match(value)
1289 if not m:
1290 raise ValueError(
1291 "could not parse %r as a date value" % (value,)
1292 )
1293 return datetime.date(*[int(x or 0) for x in m.groups()])
1294 else:
1295 return value
1296
1297 return process
1298
1299
1300class TIME(sqltypes.TIME):
1301 def __init__(self, precision=None, **kwargs):
1302 self.precision = precision
1303 super().__init__()
1304
1305 __zero_date = datetime.date(1900, 1, 1)
1306
1307 def bind_processor(self, dialect):
1308 def process(value):
1309 if isinstance(value, datetime.datetime):
1310 value = datetime.datetime.combine(
1311 self.__zero_date, value.time()
1312 )
1313 elif isinstance(value, datetime.time):
1314 """issue #5339
1315 per: https://github.com/mkleehammer/pyodbc/wiki/Tips-and-Tricks-by-Database-Platform#time-columns
1316 pass TIME value as string
1317 """ # noqa
1318 value = str(value)
1319 return value
1320
1321 return process
1322
1323 _reg = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d{0,6}))?")
1324
1325 def result_processor(self, dialect, coltype):
1326 def process(value):
1327 if isinstance(value, datetime.datetime):
1328 return value.time()
1329 elif isinstance(value, str):
1330 m = self._reg.match(value)
1331 if not m:
1332 raise ValueError(
1333 "could not parse %r as a time value" % (value,)
1334 )
1335 return datetime.time(*[int(x or 0) for x in m.groups()])
1336 else:
1337 return value
1338
1339 return process
1340
1341
1342_MSTime = TIME
1343
1344
1345class _BASETIMEIMPL(TIME):
1346 __visit_name__ = "_BASETIMEIMPL"
1347
1348
1349class _DateTimeBase:
1350 def bind_processor(self, dialect):
1351 def process(value):
1352 if type(value) == datetime.date:
1353 return datetime.datetime(value.year, value.month, value.day)
1354 else:
1355 return value
1356
1357 return process
1358
1359
1360class _MSDateTime(_DateTimeBase, sqltypes.DateTime):
1361 pass
1362
1363
1364class SMALLDATETIME(_DateTimeBase, sqltypes.DateTime):
1365 __visit_name__ = "SMALLDATETIME"
1366
1367
1368class DATETIME2(_DateTimeBase, sqltypes.DateTime):
1369 __visit_name__ = "DATETIME2"
1370
1371 def __init__(self, precision=None, **kw):
1372 super().__init__(**kw)
1373 self.precision = precision
1374
1375
1376class DATETIMEOFFSET(_DateTimeBase, sqltypes.DateTime):
1377 __visit_name__ = "DATETIMEOFFSET"
1378
1379 def __init__(self, precision=None, **kw):
1380 super().__init__(**kw)
1381 self.precision = precision
1382
1383
1384class _UnicodeLiteral:
1385 def literal_processor(self, dialect):
1386 def process(value):
1387 value = value.replace("'", "''")
1388
1389 if dialect.identifier_preparer._double_percents:
1390 value = value.replace("%", "%%")
1391
1392 return "N'%s'" % value
1393
1394 return process
1395
1396
1397class _MSUnicode(_UnicodeLiteral, sqltypes.Unicode):
1398 pass
1399
1400
1401class _MSUnicodeText(_UnicodeLiteral, sqltypes.UnicodeText):
1402 pass
1403
1404
1405class TIMESTAMP(sqltypes._Binary):
1406 """Implement the SQL Server TIMESTAMP type.
1407
1408 Note this is **completely different** than the SQL Standard
1409 TIMESTAMP type, which is not supported by SQL Server. It
1410 is a read-only datatype that does not support INSERT of values.
1411
1412 .. versionadded:: 1.2
1413
1414 .. seealso::
1415
1416 :class:`_mssql.ROWVERSION`
1417
1418 """
1419
1420 __visit_name__ = "TIMESTAMP"
1421
1422 # expected by _Binary to be present
1423 length = None
1424
1425 def __init__(self, convert_int=False):
1426 """Construct a TIMESTAMP or ROWVERSION type.
1427
1428 :param convert_int: if True, binary integer values will
1429 be converted to integers on read.
1430
1431 .. versionadded:: 1.2
1432
1433 """
1434 self.convert_int = convert_int
1435
1436 def result_processor(self, dialect, coltype):
1437 super_ = super().result_processor(dialect, coltype)
1438 if self.convert_int:
1439
1440 def process(value):
1441 if super_:
1442 value = super_(value)
1443 if value is not None:
1444 # https://stackoverflow.com/a/30403242/34549
1445 value = int(codecs.encode(value, "hex"), 16)
1446 return value
1447
1448 return process
1449 else:
1450 return super_
1451
1452
1453class ROWVERSION(TIMESTAMP):
1454 """Implement the SQL Server ROWVERSION type.
1455
1456 The ROWVERSION datatype is a SQL Server synonym for the TIMESTAMP
1457 datatype, however current SQL Server documentation suggests using
1458 ROWVERSION for new datatypes going forward.
1459
1460 The ROWVERSION datatype does **not** reflect (e.g. introspect) from the
1461 database as itself; the returned datatype will be
1462 :class:`_mssql.TIMESTAMP`.
1463
1464 This is a read-only datatype that does not support INSERT of values.
1465
1466 .. versionadded:: 1.2
1467
1468 .. seealso::
1469
1470 :class:`_mssql.TIMESTAMP`
1471
1472 """
1473
1474 __visit_name__ = "ROWVERSION"
1475
1476
1477class NTEXT(sqltypes.UnicodeText):
1478 """MSSQL NTEXT type, for variable-length unicode text up to 2^30
1479 characters."""
1480
1481 __visit_name__ = "NTEXT"
1482
1483
1484class VARBINARY(sqltypes.VARBINARY, sqltypes.LargeBinary):
1485 """The MSSQL VARBINARY type.
1486
1487 This type adds additional features to the core :class:`_types.VARBINARY`
1488 type, including "deprecate_large_types" mode where
1489 either ``VARBINARY(max)`` or IMAGE is rendered, as well as the SQL
1490 Server ``FILESTREAM`` option.
1491
1492 .. seealso::
1493
1494 :ref:`mssql_large_type_deprecation`
1495
1496 """
1497
1498 __visit_name__ = "VARBINARY"
1499
1500 def __init__(self, length=None, filestream=False):
1501 """
1502 Construct a VARBINARY type.
1503
1504 :param length: optional, a length for the column for use in
1505 DDL statements, for those binary types that accept a length,
1506 such as the MySQL BLOB type.
1507
1508 :param filestream=False: if True, renders the ``FILESTREAM`` keyword
1509 in the table definition. In this case ``length`` must be ``None``
1510 or ``'max'``.
1511
1512 .. versionadded:: 1.4.31
1513
1514 """
1515
1516 self.filestream = filestream
1517 if self.filestream and length not in (None, "max"):
1518 raise ValueError(
1519 "length must be None or 'max' when setting filestream"
1520 )
1521 super().__init__(length=length)
1522
1523
1524class IMAGE(sqltypes.LargeBinary):
1525 __visit_name__ = "IMAGE"
1526
1527
1528class XML(sqltypes.Text):
1529 """MSSQL XML type.
1530
1531 This is a placeholder type for reflection purposes that does not include
1532 any Python-side datatype support. It also does not currently support
1533 additional arguments, such as "CONTENT", "DOCUMENT",
1534 "xml_schema_collection".
1535
1536 """
1537
1538 __visit_name__ = "XML"
1539
1540
1541class BIT(sqltypes.Boolean):
1542 """MSSQL BIT type.
1543
1544 Both pyodbc and pymssql return values from BIT columns as
1545 Python <class 'bool'> so just subclass Boolean.
1546
1547 """
1548
1549 __visit_name__ = "BIT"
1550
1551
1552class MONEY(sqltypes.TypeEngine):
1553 __visit_name__ = "MONEY"
1554
1555
1556class SMALLMONEY(sqltypes.TypeEngine):
1557 __visit_name__ = "SMALLMONEY"
1558
1559
1560class MSUUid(sqltypes.Uuid):
1561 def bind_processor(self, dialect):
1562 if self.native_uuid:
1563 # this is currently assuming pyodbc; might not work for
1564 # some other mssql driver
1565 return None
1566 else:
1567 if self.as_uuid:
1568
1569 def process(value):
1570 if value is not None:
1571 value = value.hex
1572 return value
1573
1574 return process
1575 else:
1576
1577 def process(value):
1578 if value is not None:
1579 value = value.replace("-", "").replace("''", "'")
1580 return value
1581
1582 return process
1583
1584 def literal_processor(self, dialect):
1585 if self.native_uuid:
1586
1587 def process(value):
1588 return f"""'{str(value).replace("''", "'")}'"""
1589
1590 return process
1591 else:
1592 if self.as_uuid:
1593
1594 def process(value):
1595 return f"""'{value.hex}'"""
1596
1597 return process
1598 else:
1599
1600 def process(value):
1601 return f"""'{
1602 value.replace("-", "").replace("'", "''")
1603 }'"""
1604
1605 return process
1606
1607
1608class UNIQUEIDENTIFIER(sqltypes.Uuid[sqltypes._UUID_RETURN]):
1609 __visit_name__ = "UNIQUEIDENTIFIER"
1610
1611 @overload
1612 def __init__(
1613 self: UNIQUEIDENTIFIER[_python_UUID], as_uuid: Literal[True] = ...
1614 ): ...
1615
1616 @overload
1617 def __init__(
1618 self: UNIQUEIDENTIFIER[str], as_uuid: Literal[False] = ...
1619 ): ...
1620
1621 def __init__(self, as_uuid: bool = True):
1622 """Construct a :class:`_mssql.UNIQUEIDENTIFIER` type.
1623
1624
1625 :param as_uuid=True: if True, values will be interpreted
1626 as Python uuid objects, converting to/from string via the
1627 DBAPI.
1628
1629 .. versionchanged: 2.0 Added direct "uuid" support to the
1630 :class:`_mssql.UNIQUEIDENTIFIER` datatype; uuid interpretation
1631 defaults to ``True``.
1632
1633 """
1634 self.as_uuid = as_uuid
1635 self.native_uuid = True
1636
1637
1638class SQL_VARIANT(sqltypes.TypeEngine):
1639 __visit_name__ = "SQL_VARIANT"
1640
1641
1642# old names.
1643MSDateTime = _MSDateTime
1644MSDate = _MSDate
1645MSReal = REAL
1646MSTinyInteger = TINYINT
1647MSTime = TIME
1648MSSmallDateTime = SMALLDATETIME
1649MSDateTime2 = DATETIME2
1650MSDateTimeOffset = DATETIMEOFFSET
1651MSText = TEXT
1652MSNText = NTEXT
1653MSString = VARCHAR
1654MSNVarchar = NVARCHAR
1655MSChar = CHAR
1656MSNChar = NCHAR
1657MSBinary = BINARY
1658MSVarBinary = VARBINARY
1659MSImage = IMAGE
1660MSBit = BIT
1661MSMoney = MONEY
1662MSSmallMoney = SMALLMONEY
1663MSUniqueIdentifier = UNIQUEIDENTIFIER
1664MSVariant = SQL_VARIANT
1665
1666ischema_names = {
1667 "int": INTEGER,
1668 "bigint": BIGINT,
1669 "smallint": SMALLINT,
1670 "tinyint": TINYINT,
1671 "varchar": VARCHAR,
1672 "nvarchar": NVARCHAR,
1673 "char": CHAR,
1674 "nchar": NCHAR,
1675 "text": TEXT,
1676 "ntext": NTEXT,
1677 "decimal": DECIMAL,
1678 "numeric": NUMERIC,
1679 "float": FLOAT,
1680 "datetime": DATETIME,
1681 "datetime2": DATETIME2,
1682 "datetimeoffset": DATETIMEOFFSET,
1683 "date": DATE,
1684 "time": TIME,
1685 "smalldatetime": SMALLDATETIME,
1686 "binary": BINARY,
1687 "varbinary": VARBINARY,
1688 "bit": BIT,
1689 "real": REAL,
1690 "double precision": DOUBLE_PRECISION,
1691 "image": IMAGE,
1692 "xml": XML,
1693 "timestamp": TIMESTAMP,
1694 "money": MONEY,
1695 "smallmoney": SMALLMONEY,
1696 "uniqueidentifier": UNIQUEIDENTIFIER,
1697 "sql_variant": SQL_VARIANT,
1698}
1699
1700
1701class MSTypeCompiler(compiler.GenericTypeCompiler):
1702 def _extend(self, spec, type_, length=None):
1703 """Extend a string-type declaration with standard SQL
1704 COLLATE annotations.
1705
1706 """
1707
1708 if getattr(type_, "collation", None):
1709 collation = "COLLATE %s" % type_.collation
1710 else:
1711 collation = None
1712
1713 if not length:
1714 length = type_.length
1715
1716 if length:
1717 spec = spec + "(%s)" % length
1718
1719 return " ".join([c for c in (spec, collation) if c is not None])
1720
1721 def visit_double(self, type_, **kw):
1722 return self.visit_DOUBLE_PRECISION(type_, **kw)
1723
1724 def visit_FLOAT(self, type_, **kw):
1725 precision = getattr(type_, "precision", None)
1726 if precision is None:
1727 return "FLOAT"
1728 else:
1729 return "FLOAT(%(precision)s)" % {"precision": precision}
1730
1731 def visit_TINYINT(self, type_, **kw):
1732 return "TINYINT"
1733
1734 def visit_TIME(self, type_, **kw):
1735 precision = getattr(type_, "precision", None)
1736 if precision is not None:
1737 return "TIME(%s)" % precision
1738 else:
1739 return "TIME"
1740
1741 def visit_TIMESTAMP(self, type_, **kw):
1742 return "TIMESTAMP"
1743
1744 def visit_ROWVERSION(self, type_, **kw):
1745 return "ROWVERSION"
1746
1747 def visit_datetime(self, type_, **kw):
1748 if type_.timezone:
1749 return self.visit_DATETIMEOFFSET(type_, **kw)
1750 else:
1751 return self.visit_DATETIME(type_, **kw)
1752
1753 def visit_DATETIMEOFFSET(self, type_, **kw):
1754 precision = getattr(type_, "precision", None)
1755 if precision is not None:
1756 return "DATETIMEOFFSET(%s)" % type_.precision
1757 else:
1758 return "DATETIMEOFFSET"
1759
1760 def visit_DATETIME2(self, type_, **kw):
1761 precision = getattr(type_, "precision", None)
1762 if precision is not None:
1763 return "DATETIME2(%s)" % precision
1764 else:
1765 return "DATETIME2"
1766
1767 def visit_SMALLDATETIME(self, type_, **kw):
1768 return "SMALLDATETIME"
1769
1770 def visit_unicode(self, type_, **kw):
1771 return self.visit_NVARCHAR(type_, **kw)
1772
1773 def visit_text(self, type_, **kw):
1774 if self.dialect.deprecate_large_types:
1775 return self.visit_VARCHAR(type_, **kw)
1776 else:
1777 return self.visit_TEXT(type_, **kw)
1778
1779 def visit_unicode_text(self, type_, **kw):
1780 if self.dialect.deprecate_large_types:
1781 return self.visit_NVARCHAR(type_, **kw)
1782 else:
1783 return self.visit_NTEXT(type_, **kw)
1784
1785 def visit_NTEXT(self, type_, **kw):
1786 return self._extend("NTEXT", type_)
1787
1788 def visit_TEXT(self, type_, **kw):
1789 return self._extend("TEXT", type_)
1790
1791 def visit_VARCHAR(self, type_, **kw):
1792 return self._extend("VARCHAR", type_, length=type_.length or "max")
1793
1794 def visit_CHAR(self, type_, **kw):
1795 return self._extend("CHAR", type_)
1796
1797 def visit_NCHAR(self, type_, **kw):
1798 return self._extend("NCHAR", type_)
1799
1800 def visit_NVARCHAR(self, type_, **kw):
1801 return self._extend("NVARCHAR", type_, length=type_.length or "max")
1802
1803 def visit_date(self, type_, **kw):
1804 if self.dialect.server_version_info < MS_2008_VERSION:
1805 return self.visit_DATETIME(type_, **kw)
1806 else:
1807 return self.visit_DATE(type_, **kw)
1808
1809 def visit__BASETIMEIMPL(self, type_, **kw):
1810 return self.visit_time(type_, **kw)
1811
1812 def visit_time(self, type_, **kw):
1813 if self.dialect.server_version_info < MS_2008_VERSION:
1814 return self.visit_DATETIME(type_, **kw)
1815 else:
1816 return self.visit_TIME(type_, **kw)
1817
1818 def visit_large_binary(self, type_, **kw):
1819 if self.dialect.deprecate_large_types:
1820 return self.visit_VARBINARY(type_, **kw)
1821 else:
1822 return self.visit_IMAGE(type_, **kw)
1823
1824 def visit_IMAGE(self, type_, **kw):
1825 return "IMAGE"
1826
1827 def visit_XML(self, type_, **kw):
1828 return "XML"
1829
1830 def visit_VARBINARY(self, type_, **kw):
1831 text = self._extend("VARBINARY", type_, length=type_.length or "max")
1832 if getattr(type_, "filestream", False):
1833 text += " FILESTREAM"
1834 return text
1835
1836 def visit_boolean(self, type_, **kw):
1837 return self.visit_BIT(type_)
1838
1839 def visit_BIT(self, type_, **kw):
1840 return "BIT"
1841
1842 def visit_JSON(self, type_, **kw):
1843 # this is a bit of a break with SQLAlchemy's convention of
1844 # "UPPERCASE name goes to UPPERCASE type name with no modification"
1845 return self._extend("NVARCHAR", type_, length="max")
1846
1847 def visit_MONEY(self, type_, **kw):
1848 return "MONEY"
1849
1850 def visit_SMALLMONEY(self, type_, **kw):
1851 return "SMALLMONEY"
1852
1853 def visit_uuid(self, type_, **kw):
1854 if type_.native_uuid:
1855 return self.visit_UNIQUEIDENTIFIER(type_, **kw)
1856 else:
1857 return super().visit_uuid(type_, **kw)
1858
1859 def visit_UNIQUEIDENTIFIER(self, type_, **kw):
1860 return "UNIQUEIDENTIFIER"
1861
1862 def visit_SQL_VARIANT(self, type_, **kw):
1863 return "SQL_VARIANT"
1864
1865
1866class MSExecutionContext(default.DefaultExecutionContext):
1867 _enable_identity_insert = False
1868 _select_lastrowid = False
1869 _lastrowid = None
1870
1871 dialect: MSDialect
1872
1873 def _opt_encode(self, statement):
1874 if self.compiled and self.compiled.schema_translate_map:
1875 rst = self.compiled.preparer._render_schema_translates
1876 statement = rst(statement, self.compiled.schema_translate_map)
1877
1878 return statement
1879
1880 def pre_exec(self):
1881 """Activate IDENTITY_INSERT if needed."""
1882
1883 if self.isinsert:
1884 if TYPE_CHECKING:
1885 assert is_sql_compiler(self.compiled)
1886 assert isinstance(self.compiled.compile_state, DMLState)
1887 assert isinstance(
1888 self.compiled.compile_state.dml_table, TableClause
1889 )
1890
1891 tbl = self.compiled.compile_state.dml_table
1892 id_column = tbl._autoincrement_column
1893
1894 if id_column is not None and (
1895 not isinstance(id_column.default, Sequence)
1896 ):
1897 insert_has_identity = True
1898 compile_state = self.compiled.dml_compile_state
1899 self._enable_identity_insert = (
1900 id_column.key in self.compiled_parameters[0]
1901 ) or (
1902 compile_state._dict_parameters
1903 and (id_column.key in compile_state._insert_col_keys)
1904 )
1905
1906 else:
1907 insert_has_identity = False
1908 self._enable_identity_insert = False
1909
1910 self._select_lastrowid = (
1911 not self.compiled.inline
1912 and insert_has_identity
1913 and not self.compiled.effective_returning
1914 and not self._enable_identity_insert
1915 and not self.executemany
1916 )
1917
1918 if self._enable_identity_insert:
1919 self.root_connection._cursor_execute(
1920 self.cursor,
1921 self._opt_encode(
1922 "SET IDENTITY_INSERT %s ON"
1923 % self.identifier_preparer.format_table(tbl)
1924 ),
1925 (),
1926 self,
1927 )
1928
1929 def post_exec(self):
1930 """Disable IDENTITY_INSERT if enabled."""
1931
1932 conn = self.root_connection
1933
1934 if self.isinsert or self.isupdate or self.isdelete:
1935 self._rowcount = self.cursor.rowcount
1936
1937 if self._select_lastrowid:
1938 if self.dialect.use_scope_identity:
1939 conn._cursor_execute(
1940 self.cursor,
1941 "SELECT scope_identity() AS lastrowid",
1942 (),
1943 self,
1944 )
1945 else:
1946 conn._cursor_execute(
1947 self.cursor, "SELECT @@identity AS lastrowid", (), self
1948 )
1949 # fetchall() ensures the cursor is consumed without closing it
1950 row = self.cursor.fetchall()[0]
1951 self._lastrowid = int(row[0])
1952
1953 self.cursor_fetch_strategy = _cursor._NO_CURSOR_DML
1954 elif (
1955 self.compiled is not None
1956 and is_sql_compiler(self.compiled)
1957 and self.compiled.effective_returning
1958 ):
1959 self.cursor_fetch_strategy = (
1960 _cursor.FullyBufferedCursorFetchStrategy(
1961 self.cursor,
1962 self.cursor.description,
1963 self.cursor.fetchall(),
1964 )
1965 )
1966
1967 if self._enable_identity_insert:
1968 if TYPE_CHECKING:
1969 assert is_sql_compiler(self.compiled)
1970 assert isinstance(self.compiled.compile_state, DMLState)
1971 assert isinstance(
1972 self.compiled.compile_state.dml_table, TableClause
1973 )
1974 conn._cursor_execute(
1975 self.cursor,
1976 self._opt_encode(
1977 "SET IDENTITY_INSERT %s OFF"
1978 % self.identifier_preparer.format_table(
1979 self.compiled.compile_state.dml_table
1980 )
1981 ),
1982 (),
1983 self,
1984 )
1985
1986 def get_lastrowid(self):
1987 return self._lastrowid
1988
1989 def handle_dbapi_exception(self, e):
1990 if self._enable_identity_insert:
1991 try:
1992 self.cursor.execute(
1993 self._opt_encode(
1994 "SET IDENTITY_INSERT %s OFF"
1995 % self.identifier_preparer.format_table(
1996 self.compiled.compile_state.dml_table
1997 )
1998 )
1999 )
2000 except Exception:
2001 pass
2002
2003 def fire_sequence(self, seq, type_):
2004 return self._execute_scalar(
2005 (
2006 "SELECT NEXT VALUE FOR %s"
2007 % self.identifier_preparer.format_sequence(seq)
2008 ),
2009 type_,
2010 )
2011
2012 def get_insert_default(self, column):
2013 if (
2014 isinstance(column, sa_schema.Column)
2015 and column is column.table._autoincrement_column
2016 and isinstance(column.default, sa_schema.Sequence)
2017 and column.default.optional
2018 ):
2019 return None
2020 return super().get_insert_default(column)
2021
2022
2023class MSSQLCompiler(compiler.SQLCompiler):
2024 returning_precedes_values = True
2025
2026 extract_map = util.update_copy(
2027 compiler.SQLCompiler.extract_map,
2028 {
2029 "doy": "dayofyear",
2030 "dow": "weekday",
2031 "milliseconds": "millisecond",
2032 "microseconds": "microsecond",
2033 },
2034 )
2035
2036 def __init__(self, *args, **kwargs):
2037 self.tablealiases = {}
2038 super().__init__(*args, **kwargs)
2039
2040 def _format_frame_clause(self, range_, **kw):
2041 kw["literal_execute"] = True
2042 return super()._format_frame_clause(range_, **kw)
2043
2044 def _with_legacy_schema_aliasing(fn):
2045 def decorate(self, *arg, **kw):
2046 if self.dialect.legacy_schema_aliasing:
2047 return fn(self, *arg, **kw)
2048 else:
2049 super_ = getattr(super(MSSQLCompiler, self), fn.__name__)
2050 return super_(*arg, **kw)
2051
2052 return decorate
2053
2054 def visit_now_func(self, fn, **kw):
2055 return "CURRENT_TIMESTAMP"
2056
2057 def visit_current_date_func(self, fn, **kw):
2058 return "GETDATE()"
2059
2060 def visit_length_func(self, fn, **kw):
2061 return "LEN%s" % self.function_argspec(fn, **kw)
2062
2063 def visit_char_length_func(self, fn, **kw):
2064 return "LEN%s" % self.function_argspec(fn, **kw)
2065
2066 def visit_aggregate_strings_func(self, fn, **kw):
2067 expr = fn.clauses.clauses[0]._compiler_dispatch(self, **kw)
2068 kw["literal_execute"] = True
2069 delimiter = fn.clauses.clauses[1]._compiler_dispatch(self, **kw)
2070 return f"string_agg({expr}, {delimiter})"
2071
2072 def visit_concat_op_expression_clauselist(
2073 self, clauselist, operator, **kw
2074 ):
2075 return " + ".join(self.process(elem, **kw) for elem in clauselist)
2076
2077 def visit_concat_op_binary(self, binary, operator, **kw):
2078 return "%s + %s" % (
2079 self.process(binary.left, **kw),
2080 self.process(binary.right, **kw),
2081 )
2082
2083 def visit_true(self, expr, **kw):
2084 return "1"
2085
2086 def visit_false(self, expr, **kw):
2087 return "0"
2088
2089 def visit_match_op_binary(self, binary, operator, **kw):
2090 return "CONTAINS (%s, %s)" % (
2091 self.process(binary.left, **kw),
2092 self.process(binary.right, **kw),
2093 )
2094
2095 def get_select_precolumns(self, select, **kw):
2096 """MS-SQL puts TOP, it's version of LIMIT here"""
2097
2098 s = super().get_select_precolumns(select, **kw)
2099
2100 if select._has_row_limiting_clause and self._use_top(select):
2101 # ODBC drivers and possibly others
2102 # don't support bind params in the SELECT clause on SQL Server.
2103 # so have to use literal here.
2104 kw["literal_execute"] = True
2105 s += "TOP %s " % self.process(
2106 self._get_limit_or_fetch(select), **kw
2107 )
2108 if select._fetch_clause is not None:
2109 if select._fetch_clause_options["percent"]:
2110 s += "PERCENT "
2111 if select._fetch_clause_options["with_ties"]:
2112 s += "WITH TIES "
2113
2114 return s
2115
2116 def get_from_hint_text(self, table, text):
2117 return text
2118
2119 def get_crud_hint_text(self, table, text):
2120 return text
2121
2122 def _get_limit_or_fetch(self, select):
2123 if select._fetch_clause is None:
2124 return select._limit_clause
2125 else:
2126 return select._fetch_clause
2127
2128 def _use_top(self, select):
2129 return (select._offset_clause is None) and (
2130 select._simple_int_clause(select._limit_clause)
2131 or (
2132 # limit can use TOP with is by itself. fetch only uses TOP
2133 # when it needs to because of PERCENT and/or WITH TIES
2134 # TODO: Why? shouldn't we use TOP always ?
2135 select._simple_int_clause(select._fetch_clause)
2136 and (
2137 select._fetch_clause_options["percent"]
2138 or select._fetch_clause_options["with_ties"]
2139 )
2140 )
2141 )
2142
2143 def limit_clause(self, cs, **kwargs):
2144 return ""
2145
2146 def _check_can_use_fetch_limit(self, select):
2147 # to use ROW_NUMBER(), an ORDER BY is required.
2148 # OFFSET are FETCH are options of the ORDER BY clause
2149 if not select._order_by_clause.clauses:
2150 raise exc.CompileError(
2151 "MSSQL requires an order_by when "
2152 "using an OFFSET or a non-simple "
2153 "LIMIT clause"
2154 )
2155
2156 if select._fetch_clause_options is not None and (
2157 select._fetch_clause_options["percent"]
2158 or select._fetch_clause_options["with_ties"]
2159 ):
2160 raise exc.CompileError(
2161 "MSSQL needs TOP to use PERCENT and/or WITH TIES. "
2162 "Only simple fetch without offset can be used."
2163 )
2164
2165 def _row_limit_clause(self, select, **kw):
2166 """MSSQL 2012 supports OFFSET/FETCH operators
2167 Use it instead subquery with row_number
2168
2169 """
2170
2171 if self.dialect._supports_offset_fetch and not self._use_top(select):
2172 self._check_can_use_fetch_limit(select)
2173
2174 return self.fetch_clause(
2175 select,
2176 fetch_clause=self._get_limit_or_fetch(select),
2177 require_offset=True,
2178 **kw,
2179 )
2180
2181 else:
2182 return ""
2183
2184 def visit_try_cast(self, element, **kw):
2185 return "TRY_CAST (%s AS %s)" % (
2186 self.process(element.clause, **kw),
2187 self.process(element.typeclause, **kw),
2188 )
2189
2190 def translate_select_structure(self, select_stmt, **kwargs):
2191 """Look for ``LIMIT`` and OFFSET in a select statement, and if
2192 so tries to wrap it in a subquery with ``row_number()`` criterion.
2193 MSSQL 2012 and above are excluded
2194
2195 """
2196 select = select_stmt
2197
2198 if (
2199 select._has_row_limiting_clause
2200 and not self.dialect._supports_offset_fetch
2201 and not self._use_top(select)
2202 and not getattr(select, "_mssql_visit", None)
2203 ):
2204 self._check_can_use_fetch_limit(select)
2205
2206 _order_by_clauses = [
2207 sql_util.unwrap_label_reference(elem)
2208 for elem in select._order_by_clause.clauses
2209 ]
2210
2211 limit_clause = self._get_limit_or_fetch(select)
2212 offset_clause = select._offset_clause
2213
2214 select = select._generate()
2215 select._mssql_visit = True
2216 select = (
2217 select.add_columns(
2218 sql.func.ROW_NUMBER()
2219 .over(order_by=_order_by_clauses)
2220 .label("mssql_rn")
2221 )
2222 .order_by(None)
2223 .alias()
2224 )
2225
2226 mssql_rn = sql.column("mssql_rn")
2227 limitselect = sql.select(
2228 *[c for c in select.c if c.key != "mssql_rn"]
2229 )
2230 if offset_clause is not None:
2231 limitselect = limitselect.where(mssql_rn > offset_clause)
2232 if limit_clause is not None:
2233 limitselect = limitselect.where(
2234 mssql_rn <= (limit_clause + offset_clause)
2235 )
2236 else:
2237 limitselect = limitselect.where(mssql_rn <= (limit_clause))
2238 return limitselect
2239 else:
2240 return select
2241
2242 @_with_legacy_schema_aliasing
2243 def visit_table(self, table, mssql_aliased=False, iscrud=False, **kwargs):
2244 if mssql_aliased is table or iscrud:
2245 return super().visit_table(table, **kwargs)
2246
2247 # alias schema-qualified tables
2248 alias = self._schema_aliased_table(table)
2249 if alias is not None:
2250 return self.process(alias, mssql_aliased=table, **kwargs)
2251 else:
2252 return super().visit_table(table, **kwargs)
2253
2254 @_with_legacy_schema_aliasing
2255 def visit_alias(self, alias, **kw):
2256 # translate for schema-qualified table aliases
2257 kw["mssql_aliased"] = alias.element
2258 return super().visit_alias(alias, **kw)
2259
2260 @_with_legacy_schema_aliasing
2261 def visit_column(self, column, add_to_result_map=None, **kw):
2262 if (
2263 column.table is not None
2264 and (not self.isupdate and not self.isdelete)
2265 or self.is_subquery()
2266 ):
2267 # translate for schema-qualified table aliases
2268 t = self._schema_aliased_table(column.table)
2269 if t is not None:
2270 converted = elements._corresponding_column_or_error(t, column)
2271 if add_to_result_map is not None:
2272 add_to_result_map(
2273 column.name,
2274 column.name,
2275 (column, column.name, column.key),
2276 column.type,
2277 )
2278
2279 return super().visit_column(converted, **kw)
2280
2281 return super().visit_column(
2282 column, add_to_result_map=add_to_result_map, **kw
2283 )
2284
2285 def _schema_aliased_table(self, table):
2286 if getattr(table, "schema", None) is not None:
2287 if table not in self.tablealiases:
2288 self.tablealiases[table] = table.alias()
2289 return self.tablealiases[table]
2290 else:
2291 return None
2292
2293 def visit_extract(self, extract, **kw):
2294 field = self.extract_map.get(extract.field, extract.field)
2295 return "DATEPART(%s, %s)" % (field, self.process(extract.expr, **kw))
2296
2297 def visit_savepoint(self, savepoint_stmt, **kw):
2298 return "SAVE TRANSACTION %s" % self.preparer.format_savepoint(
2299 savepoint_stmt
2300 )
2301
2302 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw):
2303 return "ROLLBACK TRANSACTION %s" % self.preparer.format_savepoint(
2304 savepoint_stmt
2305 )
2306
2307 def visit_binary(self, binary, **kwargs):
2308 """Move bind parameters to the right-hand side of an operator, where
2309 possible.
2310
2311 """
2312 if (
2313 isinstance(binary.left, expression.BindParameter)
2314 and binary.operator == operator.eq
2315 and not isinstance(binary.right, expression.BindParameter)
2316 ):
2317 return self.process(
2318 expression.BinaryExpression(
2319 binary.right, binary.left, binary.operator
2320 ),
2321 **kwargs,
2322 )
2323 return super().visit_binary(binary, **kwargs)
2324
2325 def returning_clause(
2326 self, stmt, returning_cols, *, populate_result_map, **kw
2327 ):
2328 # SQL server returning clause requires that the columns refer to
2329 # the virtual table names "inserted" or "deleted". Here, we make
2330 # a simple alias of our table with that name, and then adapt the
2331 # columns we have from the list of RETURNING columns to that new name
2332 # so that they render as "inserted.<colname>" / "deleted.<colname>".
2333
2334 if stmt.is_insert or stmt.is_update:
2335 target = stmt.table.alias("inserted")
2336 elif stmt.is_delete:
2337 target = stmt.table.alias("deleted")
2338 else:
2339 assert False, "expected Insert, Update or Delete statement"
2340
2341 adapter = sql_util.ClauseAdapter(target)
2342
2343 # adapter.traverse() takes a column from our target table and returns
2344 # the one that is linked to the "inserted" / "deleted" tables. So in
2345 # order to retrieve these values back from the result (e.g. like
2346 # row[column]), tell the compiler to also add the original unadapted
2347 # column to the result map. Before #4877, these were (unknowingly)
2348 # falling back using string name matching in the result set which
2349 # necessarily used an expensive KeyError in order to match.
2350
2351 columns = [
2352 self._label_returning_column(
2353 stmt,
2354 adapter.traverse(column),
2355 populate_result_map,
2356 {"result_map_targets": (column,)},
2357 fallback_label_name=fallback_label_name,
2358 column_is_repeated=repeated,
2359 name=name,
2360 proxy_name=proxy_name,
2361 **kw,
2362 )
2363 for (
2364 name,
2365 proxy_name,
2366 fallback_label_name,
2367 column,
2368 repeated,
2369 ) in stmt._generate_columns_plus_names(
2370 True, cols=expression._select_iterables(returning_cols)
2371 )
2372 ]
2373
2374 return "OUTPUT " + ", ".join(columns)
2375
2376 def get_cte_preamble(self, recursive):
2377 # SQL Server finds it too inconvenient to accept
2378 # an entirely optional, SQL standard specified,
2379 # "RECURSIVE" word with their "WITH",
2380 # so here we go
2381 return "WITH"
2382
2383 def label_select_column(self, select, column, asfrom):
2384 if isinstance(column, expression.Function):
2385 return column.label(None)
2386 else:
2387 return super().label_select_column(select, column, asfrom)
2388
2389 def for_update_clause(self, select, **kw):
2390 # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which
2391 # SQLAlchemy doesn't use
2392 return ""
2393
2394 def order_by_clause(self, select, **kw):
2395 # MSSQL only allows ORDER BY in subqueries if there is a LIMIT:
2396 # "The ORDER BY clause is invalid in views, inline functions,
2397 # derived tables, subqueries, and common table expressions,
2398 # unless TOP, OFFSET or FOR XML is also specified."
2399 if (
2400 self.is_subquery()
2401 and not self._use_top(select)
2402 and (
2403 select._offset is None
2404 or not self.dialect._supports_offset_fetch
2405 )
2406 ):
2407 # avoid processing the order by clause if we won't end up
2408 # using it, because we don't want all the bind params tacked
2409 # onto the positional list if that is what the dbapi requires
2410 return ""
2411
2412 order_by = self.process(select._order_by_clause, **kw)
2413
2414 if order_by:
2415 return " ORDER BY " + order_by
2416 else:
2417 return ""
2418
2419 def update_from_clause(
2420 self, update_stmt, from_table, extra_froms, from_hints, **kw
2421 ):
2422 """Render the UPDATE..FROM clause specific to MSSQL.
2423
2424 In MSSQL, if the UPDATE statement involves an alias of the table to
2425 be updated, then the table itself must be added to the FROM list as
2426 well. Otherwise, it is optional. Here, we add it regardless.
2427
2428 """
2429 return "FROM " + ", ".join(
2430 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw)
2431 for t in [from_table] + extra_froms
2432 )
2433
2434 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw):
2435 """If we have extra froms make sure we render any alias as hint."""
2436 ashint = False
2437 if extra_froms:
2438 ashint = True
2439 return from_table._compiler_dispatch(
2440 self, asfrom=True, iscrud=True, ashint=ashint, **kw
2441 )
2442
2443 def delete_extra_from_clause(
2444 self, delete_stmt, from_table, extra_froms, from_hints, **kw
2445 ):
2446 """Render the DELETE .. FROM clause specific to MSSQL.
2447
2448 Yes, it has the FROM keyword twice.
2449
2450 """
2451 return "FROM " + ", ".join(
2452 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw)
2453 for t in [from_table] + extra_froms
2454 )
2455
2456 def visit_empty_set_expr(self, type_, **kw):
2457 return "SELECT 1 WHERE 1!=1"
2458
2459 def visit_is_distinct_from_binary(self, binary, operator, **kw):
2460 return "NOT EXISTS (SELECT %s INTERSECT SELECT %s)" % (
2461 self.process(binary.left),
2462 self.process(binary.right),
2463 )
2464
2465 def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
2466 return "EXISTS (SELECT %s INTERSECT SELECT %s)" % (
2467 self.process(binary.left),
2468 self.process(binary.right),
2469 )
2470
2471 def _render_json_extract_from_binary(self, binary, operator, **kw):
2472 # note we are intentionally calling upon the process() calls in the
2473 # order in which they appear in the SQL String as this is used
2474 # by positional parameter rendering
2475
2476 if binary.type._type_affinity is sqltypes.JSON:
2477 return "JSON_QUERY(%s, %s)" % (
2478 self.process(binary.left, **kw),
2479 self.process(binary.right, **kw),
2480 )
2481
2482 # as with other dialects, start with an explicit test for NULL
2483 case_expression = "CASE JSON_VALUE(%s, %s) WHEN NULL THEN NULL" % (
2484 self.process(binary.left, **kw),
2485 self.process(binary.right, **kw),
2486 )
2487
2488 if binary.type._type_affinity is sqltypes.Integer:
2489 type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS INTEGER)" % (
2490 self.process(binary.left, **kw),
2491 self.process(binary.right, **kw),
2492 )
2493 elif binary.type._type_affinity is sqltypes.Numeric:
2494 type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS %s)" % (
2495 self.process(binary.left, **kw),
2496 self.process(binary.right, **kw),
2497 (
2498 "FLOAT"
2499 if isinstance(binary.type, sqltypes.Float)
2500 else "NUMERIC(%s, %s)"
2501 % (binary.type.precision, binary.type.scale)
2502 ),
2503 )
2504 elif binary.type._type_affinity is sqltypes.Boolean:
2505 # the NULL handling is particularly weird with boolean, so
2506 # explicitly return numeric (BIT) constants
2507 type_expression = (
2508 "WHEN 'true' THEN 1 WHEN 'false' THEN 0 ELSE NULL"
2509 )
2510 elif binary.type._type_affinity is sqltypes.String:
2511 # TODO: does this comment (from mysql) apply to here, too?
2512 # this fails with a JSON value that's a four byte unicode
2513 # string. SQLite has the same problem at the moment
2514 type_expression = "ELSE JSON_VALUE(%s, %s)" % (
2515 self.process(binary.left, **kw),
2516 self.process(binary.right, **kw),
2517 )
2518 else:
2519 # other affinity....this is not expected right now
2520 type_expression = "ELSE JSON_QUERY(%s, %s)" % (
2521 self.process(binary.left, **kw),
2522 self.process(binary.right, **kw),
2523 )
2524
2525 return case_expression + " " + type_expression + " END"
2526
2527 def visit_json_getitem_op_binary(self, binary, operator, **kw):
2528 return self._render_json_extract_from_binary(binary, operator, **kw)
2529
2530 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
2531 return self._render_json_extract_from_binary(binary, operator, **kw)
2532
2533 def visit_sequence(self, seq, **kw):
2534 return "NEXT VALUE FOR %s" % self.preparer.format_sequence(seq)
2535
2536
2537class MSSQLStrictCompiler(MSSQLCompiler):
2538 """A subclass of MSSQLCompiler which disables the usage of bind
2539 parameters where not allowed natively by MS-SQL.
2540
2541 A dialect may use this compiler on a platform where native
2542 binds are used.
2543
2544 """
2545
2546 ansi_bind_rules = True
2547
2548 def visit_in_op_binary(self, binary, operator, **kw):
2549 kw["literal_execute"] = True
2550 return "%s IN %s" % (
2551 self.process(binary.left, **kw),
2552 self.process(binary.right, **kw),
2553 )
2554
2555 def visit_not_in_op_binary(self, binary, operator, **kw):
2556 kw["literal_execute"] = True
2557 return "%s NOT IN %s" % (
2558 self.process(binary.left, **kw),
2559 self.process(binary.right, **kw),
2560 )
2561
2562 def render_literal_value(self, value, type_):
2563 """
2564 For date and datetime values, convert to a string
2565 format acceptable to MSSQL. That seems to be the
2566 so-called ODBC canonical date format which looks
2567 like this:
2568
2569 yyyy-mm-dd hh:mi:ss.mmm(24h)
2570
2571 For other data types, call the base class implementation.
2572 """
2573 # datetime and date are both subclasses of datetime.date
2574 if issubclass(type(value), datetime.date):
2575 # SQL Server wants single quotes around the date string.
2576 return "'" + str(value) + "'"
2577 else:
2578 return super().render_literal_value(value, type_)
2579
2580
2581class MSDDLCompiler(compiler.DDLCompiler):
2582 def get_column_specification(self, column, **kwargs):
2583 colspec = self.preparer.format_column(column)
2584
2585 # type is not accepted in a computed column
2586 if column.computed is not None:
2587 colspec += " " + self.process(column.computed)
2588 else:
2589 colspec += " " + self.dialect.type_compiler_instance.process(
2590 column.type, type_expression=column
2591 )
2592
2593 if column.nullable is not None:
2594 if (
2595 not column.nullable
2596 or column.primary_key
2597 or isinstance(column.default, sa_schema.Sequence)
2598 or column.autoincrement is True
2599 or column.identity
2600 ):
2601 colspec += " NOT NULL"
2602 elif column.computed is None:
2603 # don't specify "NULL" for computed columns
2604 colspec += " NULL"
2605
2606 if column.table is None:
2607 raise exc.CompileError(
2608 "mssql requires Table-bound columns "
2609 "in order to generate DDL"
2610 )
2611
2612 d_opt = column.dialect_options["mssql"]
2613 start = d_opt["identity_start"]
2614 increment = d_opt["identity_increment"]
2615 if start is not None or increment is not None:
2616 if column.identity:
2617 raise exc.CompileError(
2618 "Cannot specify options 'mssql_identity_start' and/or "
2619 "'mssql_identity_increment' while also using the "
2620 "'Identity' construct."
2621 )
2622 util.warn_deprecated(
2623 "The dialect options 'mssql_identity_start' and "
2624 "'mssql_identity_increment' are deprecated. "
2625 "Use the 'Identity' object instead.",
2626 "1.4",
2627 )
2628
2629 if column.identity:
2630 colspec += self.process(column.identity, **kwargs)
2631 elif (
2632 column is column.table._autoincrement_column
2633 or column.autoincrement is True
2634 ) and (
2635 not isinstance(column.default, Sequence) or column.default.optional
2636 ):
2637 colspec += self.process(Identity(start=start, increment=increment))
2638 else:
2639 default = self.get_column_default_string(column)
2640 if default is not None:
2641 colspec += " DEFAULT " + default
2642
2643 return colspec
2644
2645 def visit_create_index(self, create, include_schema=False, **kw):
2646 index = create.element
2647 self._verify_index_table(index)
2648 preparer = self.preparer
2649 text = "CREATE "
2650 if index.unique:
2651 text += "UNIQUE "
2652
2653 # handle clustering option
2654 clustered = index.dialect_options["mssql"]["clustered"]
2655 if clustered is not None:
2656 if clustered:
2657 text += "CLUSTERED "
2658 else:
2659 text += "NONCLUSTERED "
2660
2661 # handle columnstore option (has no negative value)
2662 columnstore = index.dialect_options["mssql"]["columnstore"]
2663 if columnstore:
2664 text += "COLUMNSTORE "
2665
2666 text += "INDEX %s ON %s" % (
2667 self._prepared_index_name(index, include_schema=include_schema),
2668 preparer.format_table(index.table),
2669 )
2670
2671 # in some case mssql allows indexes with no columns defined
2672 if len(index.expressions) > 0:
2673 text += " (%s)" % ", ".join(
2674 self.sql_compiler.process(
2675 expr, include_table=False, literal_binds=True
2676 )
2677 for expr in index.expressions
2678 )
2679
2680 # handle other included columns
2681 if index.dialect_options["mssql"]["include"]:
2682 inclusions = [
2683 index.table.c[col] if isinstance(col, str) else col
2684 for col in index.dialect_options["mssql"]["include"]
2685 ]
2686
2687 text += " INCLUDE (%s)" % ", ".join(
2688 [preparer.quote(c.name) for c in inclusions]
2689 )
2690
2691 whereclause = index.dialect_options["mssql"]["where"]
2692
2693 if whereclause is not None:
2694 whereclause = coercions.expect(
2695 roles.DDLExpressionRole, whereclause
2696 )
2697
2698 where_compiled = self.sql_compiler.process(
2699 whereclause, include_table=False, literal_binds=True
2700 )
2701 text += " WHERE " + where_compiled
2702
2703 return text
2704
2705 def visit_drop_index(self, drop: DropIndex, **kw: Any) -> str:
2706 index_name = self._prepared_index_name(
2707 drop.element, include_schema=False
2708 )
2709 table_name = self.preparer.format_table(drop.element.table)
2710 if_exists = " IF EXISTS" if drop.if_exists else ""
2711 return f"\nDROP INDEX{if_exists} {index_name} ON {table_name}"
2712
2713 def visit_primary_key_constraint(self, constraint, **kw):
2714 if len(constraint) == 0:
2715 return ""
2716 text = ""
2717 if constraint.name is not None:
2718 text += "CONSTRAINT %s " % self.preparer.format_constraint(
2719 constraint
2720 )
2721 text += "PRIMARY KEY "
2722
2723 clustered = constraint.dialect_options["mssql"]["clustered"]
2724 if clustered is not None:
2725 if clustered:
2726 text += "CLUSTERED "
2727 else:
2728 text += "NONCLUSTERED "
2729
2730 text += "(%s)" % ", ".join(
2731 self.preparer.quote(c.name) for c in constraint
2732 )
2733 text += self.define_constraint_deferrability(constraint)
2734 return text
2735
2736 def visit_unique_constraint(self, constraint, **kw):
2737 if len(constraint) == 0:
2738 return ""
2739 text = ""
2740 if constraint.name is not None:
2741 formatted_name = self.preparer.format_constraint(constraint)
2742 if formatted_name is not None:
2743 text += "CONSTRAINT %s " % formatted_name
2744 text += "UNIQUE %s" % self.define_unique_constraint_distinct(
2745 constraint, **kw
2746 )
2747 clustered = constraint.dialect_options["mssql"]["clustered"]
2748 if clustered is not None:
2749 if clustered:
2750 text += "CLUSTERED "
2751 else:
2752 text += "NONCLUSTERED "
2753
2754 text += "(%s)" % ", ".join(
2755 self.preparer.quote(c.name) for c in constraint
2756 )
2757 text += self.define_constraint_deferrability(constraint)
2758 return text
2759
2760 def visit_computed_column(self, generated, **kw):
2761 text = "AS (%s)" % self.sql_compiler.process(
2762 generated.sqltext, include_table=False, literal_binds=True
2763 )
2764 # explicitly check for True|False since None means server default
2765 if generated.persisted is True:
2766 text += " PERSISTED"
2767 return text
2768
2769 def visit_set_table_comment(self, create, **kw):
2770 schema = self.preparer.schema_for_object(create.element)
2771 schema_name = schema if schema else self.dialect.default_schema_name
2772 return (
2773 "execute sp_addextendedproperty 'MS_Description', "
2774 "{}, 'schema', {}, 'table', {}".format(
2775 self.sql_compiler.render_literal_value(
2776 create.element.comment, sqltypes.NVARCHAR()
2777 ),
2778 self.preparer.quote_schema(schema_name),
2779 self.preparer.format_table(create.element, use_schema=False),
2780 )
2781 )
2782
2783 def visit_drop_table_comment(self, drop, **kw):
2784 schema = self.preparer.schema_for_object(drop.element)
2785 schema_name = schema if schema else self.dialect.default_schema_name
2786 return (
2787 "execute sp_dropextendedproperty 'MS_Description', 'schema', "
2788 "{}, 'table', {}".format(
2789 self.preparer.quote_schema(schema_name),
2790 self.preparer.format_table(drop.element, use_schema=False),
2791 )
2792 )
2793
2794 def visit_set_column_comment(self, create, **kw):
2795 schema = self.preparer.schema_for_object(create.element.table)
2796 schema_name = schema if schema else self.dialect.default_schema_name
2797 return (
2798 "execute sp_addextendedproperty 'MS_Description', "
2799 "{}, 'schema', {}, 'table', {}, 'column', {}".format(
2800 self.sql_compiler.render_literal_value(
2801 create.element.comment, sqltypes.NVARCHAR()
2802 ),
2803 self.preparer.quote_schema(schema_name),
2804 self.preparer.format_table(
2805 create.element.table, use_schema=False
2806 ),
2807 self.preparer.format_column(create.element),
2808 )
2809 )
2810
2811 def visit_drop_column_comment(self, drop, **kw):
2812 schema = self.preparer.schema_for_object(drop.element.table)
2813 schema_name = schema if schema else self.dialect.default_schema_name
2814 return (
2815 "execute sp_dropextendedproperty 'MS_Description', 'schema', "
2816 "{}, 'table', {}, 'column', {}".format(
2817 self.preparer.quote_schema(schema_name),
2818 self.preparer.format_table(
2819 drop.element.table, use_schema=False
2820 ),
2821 self.preparer.format_column(drop.element),
2822 )
2823 )
2824
2825 def visit_create_sequence(self, create, **kw):
2826 prefix = None
2827 if create.element.data_type is not None:
2828 data_type = create.element.data_type
2829 prefix = " AS %s" % self.type_compiler.process(data_type)
2830 return super().visit_create_sequence(create, prefix=prefix, **kw)
2831
2832 def visit_identity_column(self, identity, **kw):
2833 text = " IDENTITY"
2834 if identity.start is not None or identity.increment is not None:
2835 start = 1 if identity.start is None else identity.start
2836 increment = 1 if identity.increment is None else identity.increment
2837 text += "(%s,%s)" % (start, increment)
2838 return text
2839
2840
2841class MSIdentifierPreparer(compiler.IdentifierPreparer):
2842 reserved_words = RESERVED_WORDS
2843
2844 def __init__(self, dialect):
2845 super().__init__(
2846 dialect,
2847 initial_quote="[",
2848 final_quote="]",
2849 quote_case_sensitive_collations=False,
2850 )
2851
2852 def _escape_identifier(self, value):
2853 return value.replace("]", "]]")
2854
2855 def _unescape_identifier(self, value):
2856 return value.replace("]]", "]")
2857
2858 def quote_schema(self, schema, force=None):
2859 """Prepare a quoted table and schema name."""
2860
2861 # need to re-implement the deprecation warning entirely
2862 if force is not None:
2863 # not using the util.deprecated_params() decorator in this
2864 # case because of the additional function call overhead on this
2865 # very performance-critical spot.
2866 util.warn_deprecated(
2867 "The IdentifierPreparer.quote_schema.force parameter is "
2868 "deprecated and will be removed in a future release. This "
2869 "flag has no effect on the behavior of the "
2870 "IdentifierPreparer.quote method; please refer to "
2871 "quoted_name().",
2872 version="1.3",
2873 )
2874
2875 dbname, owner = _schema_elements(schema)
2876 if dbname:
2877 result = "%s.%s" % (self.quote(dbname), self.quote(owner))
2878 elif owner:
2879 result = self.quote(owner)
2880 else:
2881 result = ""
2882 return result
2883
2884
2885def _db_plus_owner_listing(fn):
2886 def wrap(dialect, connection, schema=None, **kw):
2887 dbname, owner = _owner_plus_db(dialect, schema)
2888 return _switch_db(
2889 dbname,
2890 connection,
2891 fn,
2892 dialect,
2893 connection,
2894 dbname,
2895 owner,
2896 schema,
2897 **kw,
2898 )
2899
2900 return update_wrapper(wrap, fn)
2901
2902
2903def _db_plus_owner(fn):
2904 def wrap(dialect, connection, tablename, schema=None, **kw):
2905 dbname, owner = _owner_plus_db(dialect, schema)
2906 return _switch_db(
2907 dbname,
2908 connection,
2909 fn,
2910 dialect,
2911 connection,
2912 tablename,
2913 dbname,
2914 owner,
2915 schema,
2916 **kw,
2917 )
2918
2919 return update_wrapper(wrap, fn)
2920
2921
2922def _switch_db(dbname, connection, fn, *arg, **kw):
2923 if dbname:
2924 current_db = connection.exec_driver_sql("select db_name()").scalar()
2925 if current_db != dbname:
2926 connection.exec_driver_sql(
2927 "use %s" % connection.dialect.identifier_preparer.quote(dbname)
2928 )
2929 try:
2930 return fn(*arg, **kw)
2931 finally:
2932 if dbname and current_db != dbname:
2933 connection.exec_driver_sql(
2934 "use %s"
2935 % connection.dialect.identifier_preparer.quote(current_db)
2936 )
2937
2938
2939def _owner_plus_db(dialect, schema):
2940 if not schema:
2941 return None, dialect.default_schema_name
2942 else:
2943 return _schema_elements(schema)
2944
2945
2946_memoized_schema = util.LRUCache()
2947
2948
2949def _schema_elements(schema):
2950 if isinstance(schema, quoted_name) and schema.quote:
2951 return None, schema
2952
2953 if schema in _memoized_schema:
2954 return _memoized_schema[schema]
2955
2956 # tests for this function are in:
2957 # test/dialect/mssql/test_reflection.py ->
2958 # OwnerPlusDBTest.test_owner_database_pairs
2959 # test/dialect/mssql/test_compiler.py -> test_force_schema_*
2960 # test/dialect/mssql/test_compiler.py -> test_schema_many_tokens_*
2961 #
2962
2963 if schema.startswith("__[SCHEMA_"):
2964 return None, schema
2965
2966 push = []
2967 symbol = ""
2968 bracket = False
2969 has_brackets = False
2970 for token in re.split(r"(\[|\]|\.)", schema):
2971 if not token:
2972 continue
2973 if token == "[":
2974 bracket = True
2975 has_brackets = True
2976 elif token == "]":
2977 bracket = False
2978 elif not bracket and token == ".":
2979 if has_brackets:
2980 push.append("[%s]" % symbol)
2981 else:
2982 push.append(symbol)
2983 symbol = ""
2984 has_brackets = False
2985 else:
2986 symbol += token
2987 if symbol:
2988 push.append(symbol)
2989 if len(push) > 1:
2990 dbname, owner = ".".join(push[0:-1]), push[-1]
2991
2992 # test for internal brackets
2993 if re.match(r".*\].*\[.*", dbname[1:-1]):
2994 dbname = quoted_name(dbname, quote=False)
2995 else:
2996 dbname = dbname.lstrip("[").rstrip("]")
2997
2998 elif len(push):
2999 dbname, owner = None, push[0]
3000 else:
3001 dbname, owner = None, None
3002
3003 _memoized_schema[schema] = dbname, owner
3004 return dbname, owner
3005
3006
3007class MSDialect(default.DefaultDialect):
3008 # will assume it's at least mssql2005
3009 name = "mssql"
3010 supports_statement_cache = True
3011 supports_default_values = True
3012 supports_empty_insert = False
3013 favor_returning_over_lastrowid = True
3014
3015 returns_native_bytes = True
3016
3017 supports_comments = True
3018 supports_default_metavalue = False
3019 """dialect supports INSERT... VALUES (DEFAULT) syntax -
3020 SQL Server **does** support this, but **not** for the IDENTITY column,
3021 so we can't turn this on.
3022
3023 """
3024
3025 # supports_native_uuid is partial here, so we implement our
3026 # own impl type
3027
3028 execution_ctx_cls = MSExecutionContext
3029 use_scope_identity = True
3030 max_identifier_length = 128
3031 schema_name = "dbo"
3032
3033 insert_returning = True
3034 update_returning = True
3035 delete_returning = True
3036 update_returning_multifrom = True
3037 delete_returning_multifrom = True
3038
3039 colspecs = {
3040 sqltypes.DateTime: _MSDateTime,
3041 sqltypes.Date: _MSDate,
3042 sqltypes.JSON: JSON,
3043 sqltypes.JSON.JSONIndexType: JSONIndexType,
3044 sqltypes.JSON.JSONPathType: JSONPathType,
3045 sqltypes.Time: _BASETIMEIMPL,
3046 sqltypes.Unicode: _MSUnicode,
3047 sqltypes.UnicodeText: _MSUnicodeText,
3048 DATETIMEOFFSET: DATETIMEOFFSET,
3049 DATETIME2: DATETIME2,
3050 SMALLDATETIME: SMALLDATETIME,
3051 DATETIME: DATETIME,
3052 sqltypes.Uuid: MSUUid,
3053 }
3054
3055 engine_config_types = default.DefaultDialect.engine_config_types.union(
3056 {"legacy_schema_aliasing": util.asbool}
3057 )
3058
3059 ischema_names = ischema_names
3060
3061 supports_sequences = True
3062 sequences_optional = True
3063 # This is actually used for autoincrement, where itentity is used that
3064 # starts with 1.
3065 # for sequences T-SQL's actual default is -9223372036854775808
3066 default_sequence_base = 1
3067
3068 supports_native_boolean = False
3069 non_native_boolean_check_constraint = False
3070 supports_unicode_binds = True
3071 postfetch_lastrowid = True
3072
3073 # may be changed at server inspection time for older SQL server versions
3074 supports_multivalues_insert = True
3075
3076 use_insertmanyvalues = True
3077
3078 # note pyodbc will set this to False if fast_executemany is set,
3079 # as of SQLAlchemy 2.0.9
3080 use_insertmanyvalues_wo_returning = True
3081
3082 insertmanyvalues_implicit_sentinel = (
3083 InsertmanyvaluesSentinelOpts.AUTOINCREMENT
3084 | InsertmanyvaluesSentinelOpts.IDENTITY
3085 | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
3086 )
3087
3088 # "The incoming request has too many parameters. The server supports a "
3089 # "maximum of 2100 parameters."
3090 # in fact you can have 2099 parameters.
3091 insertmanyvalues_max_parameters = 2099
3092
3093 _supports_offset_fetch = False
3094 _supports_nvarchar_max = False
3095
3096 legacy_schema_aliasing = False
3097
3098 server_version_info = ()
3099
3100 statement_compiler = MSSQLCompiler
3101 ddl_compiler = MSDDLCompiler
3102 type_compiler_cls = MSTypeCompiler
3103 preparer = MSIdentifierPreparer
3104
3105 construct_arguments = [
3106 (sa_schema.PrimaryKeyConstraint, {"clustered": None}),
3107 (sa_schema.UniqueConstraint, {"clustered": None}),
3108 (
3109 sa_schema.Index,
3110 {
3111 "clustered": None,
3112 "include": None,
3113 "where": None,
3114 "columnstore": None,
3115 },
3116 ),
3117 (
3118 sa_schema.Column,
3119 {"identity_start": None, "identity_increment": None},
3120 ),
3121 ]
3122
3123 def __init__(
3124 self,
3125 query_timeout=None,
3126 use_scope_identity=True,
3127 schema_name="dbo",
3128 deprecate_large_types=None,
3129 supports_comments=None,
3130 json_serializer=None,
3131 json_deserializer=None,
3132 legacy_schema_aliasing=None,
3133 ignore_no_transaction_on_rollback=False,
3134 **opts,
3135 ):
3136 self.query_timeout = int(query_timeout or 0)
3137 self.schema_name = schema_name
3138
3139 self.use_scope_identity = use_scope_identity
3140 self.deprecate_large_types = deprecate_large_types
3141 self.ignore_no_transaction_on_rollback = (
3142 ignore_no_transaction_on_rollback
3143 )
3144 self._user_defined_supports_comments = uds = supports_comments
3145 if uds is not None:
3146 self.supports_comments = uds
3147
3148 if legacy_schema_aliasing is not None:
3149 util.warn_deprecated(
3150 "The legacy_schema_aliasing parameter is "
3151 "deprecated and will be removed in a future release.",
3152 "1.4",
3153 )
3154 self.legacy_schema_aliasing = legacy_schema_aliasing
3155
3156 super().__init__(**opts)
3157
3158 self._json_serializer = json_serializer
3159 self._json_deserializer = json_deserializer
3160
3161 def do_savepoint(self, connection, name):
3162 # give the DBAPI a push
3163 connection.exec_driver_sql("IF @@TRANCOUNT = 0 BEGIN TRANSACTION")
3164 super().do_savepoint(connection, name)
3165
3166 def do_release_savepoint(self, connection, name):
3167 # SQL Server does not support RELEASE SAVEPOINT
3168 pass
3169
3170 def do_rollback(self, dbapi_connection):
3171 try:
3172 super().do_rollback(dbapi_connection)
3173 except self.dbapi.ProgrammingError as e:
3174 if self.ignore_no_transaction_on_rollback and re.match(
3175 r".*\b111214\b", str(e)
3176 ):
3177 util.warn(
3178 "ProgrammingError 111214 "
3179 "'No corresponding transaction found.' "
3180 "has been suppressed via "
3181 "ignore_no_transaction_on_rollback=True"
3182 )
3183 else:
3184 raise
3185
3186 _isolation_lookup = {
3187 "SERIALIZABLE",
3188 "READ UNCOMMITTED",
3189 "READ COMMITTED",
3190 "REPEATABLE READ",
3191 "SNAPSHOT",
3192 }
3193
3194 def get_isolation_level_values(self, dbapi_connection):
3195 return list(self._isolation_lookup)
3196
3197 def set_isolation_level(self, dbapi_connection, level):
3198 cursor = dbapi_connection.cursor()
3199 cursor.execute(f"SET TRANSACTION ISOLATION LEVEL {level}")
3200 cursor.close()
3201 if level == "SNAPSHOT":
3202 dbapi_connection.commit()
3203
3204 def get_isolation_level(self, dbapi_connection):
3205 cursor = dbapi_connection.cursor()
3206 view_name = "sys.system_views"
3207 try:
3208 cursor.execute(
3209 (
3210 "SELECT name FROM {} WHERE name IN "
3211 "('dm_exec_sessions', 'dm_pdw_nodes_exec_sessions')"
3212 ).format(view_name)
3213 )
3214 row = cursor.fetchone()
3215 if not row:
3216 raise NotImplementedError(
3217 "Can't fetch isolation level on this particular "
3218 "SQL Server version."
3219 )
3220
3221 view_name = f"sys.{row[0]}"
3222
3223 cursor.execute(
3224 """
3225 SELECT CASE transaction_isolation_level
3226 WHEN 0 THEN NULL
3227 WHEN 1 THEN 'READ UNCOMMITTED'
3228 WHEN 2 THEN 'READ COMMITTED'
3229 WHEN 3 THEN 'REPEATABLE READ'
3230 WHEN 4 THEN 'SERIALIZABLE'
3231 WHEN 5 THEN 'SNAPSHOT' END
3232 AS TRANSACTION_ISOLATION_LEVEL
3233 FROM {}
3234 where session_id = @@SPID
3235 """.format(
3236 view_name
3237 )
3238 )
3239 except self.dbapi.Error as err:
3240 raise NotImplementedError(
3241 "Can't fetch isolation level; encountered error {} when "
3242 'attempting to query the "{}" view.'.format(err, view_name)
3243 ) from err
3244 else:
3245 row = cursor.fetchone()
3246 return row[0].upper()
3247 finally:
3248 cursor.close()
3249
3250 def initialize(self, connection):
3251 super().initialize(connection)
3252 self._setup_version_attributes()
3253 self._setup_supports_nvarchar_max(connection)
3254 self._setup_supports_comments(connection)
3255
3256 def _setup_version_attributes(self):
3257 if self.server_version_info[0] not in list(range(8, 17)):
3258 util.warn(
3259 "Unrecognized server version info '%s'. Some SQL Server "
3260 "features may not function properly."
3261 % ".".join(str(x) for x in self.server_version_info)
3262 )
3263
3264 if self.server_version_info >= MS_2008_VERSION:
3265 self.supports_multivalues_insert = True
3266 else:
3267 self.supports_multivalues_insert = False
3268
3269 if self.deprecate_large_types is None:
3270 self.deprecate_large_types = (
3271 self.server_version_info >= MS_2012_VERSION
3272 )
3273
3274 self._supports_offset_fetch = (
3275 self.server_version_info and self.server_version_info[0] >= 11
3276 )
3277
3278 def _setup_supports_nvarchar_max(self, connection):
3279 try:
3280 connection.scalar(
3281 sql.text("SELECT CAST('test max support' AS NVARCHAR(max))")
3282 )
3283 except exc.DBAPIError:
3284 self._supports_nvarchar_max = False
3285 else:
3286 self._supports_nvarchar_max = True
3287
3288 def _setup_supports_comments(self, connection):
3289 if self._user_defined_supports_comments is not None:
3290 return
3291
3292 try:
3293 connection.scalar(
3294 sql.text(
3295 "SELECT 1 FROM fn_listextendedproperty"
3296 "(default, default, default, default, "
3297 "default, default, default)"
3298 )
3299 )
3300 except exc.DBAPIError:
3301 self.supports_comments = False
3302 else:
3303 self.supports_comments = True
3304
3305 def _get_default_schema_name(self, connection):
3306 query = sql.text("SELECT schema_name()")
3307 default_schema_name = connection.scalar(query)
3308 if default_schema_name is not None:
3309 # guard against the case where the default_schema_name is being
3310 # fed back into a table reflection function.
3311 return quoted_name(default_schema_name, quote=True)
3312 else:
3313 return self.schema_name
3314
3315 @_db_plus_owner
3316 def has_table(self, connection, tablename, dbname, owner, schema, **kw):
3317 self._ensure_has_table_connection(connection)
3318
3319 return self._internal_has_table(connection, tablename, owner, **kw)
3320
3321 @reflection.cache
3322 @_db_plus_owner
3323 def has_sequence(
3324 self, connection, sequencename, dbname, owner, schema, **kw
3325 ):
3326 sequences = ischema.sequences
3327
3328 s = sql.select(sequences.c.sequence_name).where(
3329 sequences.c.sequence_name == sequencename
3330 )
3331
3332 if owner:
3333 s = s.where(sequences.c.sequence_schema == owner)
3334
3335 c = connection.execute(s)
3336
3337 return c.first() is not None
3338
3339 @reflection.cache
3340 @_db_plus_owner_listing
3341 def get_sequence_names(self, connection, dbname, owner, schema, **kw):
3342 sequences = ischema.sequences
3343
3344 s = sql.select(sequences.c.sequence_name)
3345 if owner:
3346 s = s.where(sequences.c.sequence_schema == owner)
3347
3348 c = connection.execute(s)
3349
3350 return [row[0] for row in c]
3351
3352 @reflection.cache
3353 def get_schema_names(self, connection, **kw):
3354 s = sql.select(ischema.schemata.c.schema_name).order_by(
3355 ischema.schemata.c.schema_name
3356 )
3357 schema_names = [r[0] for r in connection.execute(s)]
3358 return schema_names
3359
3360 @reflection.cache
3361 @_db_plus_owner_listing
3362 def get_table_names(self, connection, dbname, owner, schema, **kw):
3363 tables = ischema.tables
3364 s = (
3365 sql.select(tables.c.table_name)
3366 .where(
3367 sql.and_(
3368 tables.c.table_schema == owner,
3369 tables.c.table_type == "BASE TABLE",
3370 )
3371 )
3372 .order_by(tables.c.table_name)
3373 )
3374 table_names = [r[0] for r in connection.execute(s)]
3375 return table_names
3376
3377 @reflection.cache
3378 @_db_plus_owner_listing
3379 def get_view_names(self, connection, dbname, owner, schema, **kw):
3380 tables = ischema.tables
3381 s = (
3382 sql.select(tables.c.table_name)
3383 .where(
3384 sql.and_(
3385 tables.c.table_schema == owner,
3386 tables.c.table_type == "VIEW",
3387 )
3388 )
3389 .order_by(tables.c.table_name)
3390 )
3391 view_names = [r[0] for r in connection.execute(s)]
3392 return view_names
3393
3394 @reflection.cache
3395 def _internal_has_table(self, connection, tablename, owner, **kw):
3396 if tablename.startswith("#"): # temporary table
3397 # mssql does not support temporary views
3398 # SQL Error [4103] [S0001]: "#v": Temporary views are not allowed
3399 return bool(
3400 connection.scalar(
3401 # U filters on user tables only.
3402 text("SELECT object_id(:table_name, 'U')"),
3403 {"table_name": f"tempdb.dbo.[{tablename}]"},
3404 )
3405 )
3406 else:
3407 tables = ischema.tables
3408
3409 s = sql.select(tables.c.table_name).where(
3410 sql.and_(
3411 sql.or_(
3412 tables.c.table_type == "BASE TABLE",
3413 tables.c.table_type == "VIEW",
3414 ),
3415 tables.c.table_name == tablename,
3416 )
3417 )
3418
3419 if owner:
3420 s = s.where(tables.c.table_schema == owner)
3421
3422 c = connection.execute(s)
3423
3424 return c.first() is not None
3425
3426 def _default_or_error(self, connection, tablename, owner, method, **kw):
3427 # TODO: try to avoid having to run a separate query here
3428 if self._internal_has_table(connection, tablename, owner, **kw):
3429 return method()
3430 else:
3431 raise exc.NoSuchTableError(f"{owner}.{tablename}")
3432
3433 @reflection.cache
3434 @_db_plus_owner
3435 def get_indexes(self, connection, tablename, dbname, owner, schema, **kw):
3436 filter_definition = (
3437 "ind.filter_definition"
3438 if self.server_version_info >= MS_2008_VERSION
3439 else "NULL as filter_definition"
3440 )
3441 rp = connection.execution_options(future_result=True).execute(
3442 sql.text(
3443 f"""
3444select
3445 ind.index_id,
3446 ind.is_unique,
3447 ind.name,
3448 ind.type,
3449 {filter_definition}
3450from
3451 sys.indexes as ind
3452join sys.tables as tab on
3453 ind.object_id = tab.object_id
3454join sys.schemas as sch on
3455 sch.schema_id = tab.schema_id
3456where
3457 tab.name = :tabname
3458 and sch.name = :schname
3459 and ind.is_primary_key = 0
3460 and ind.type != 0
3461order by
3462 ind.name
3463 """
3464 )
3465 .bindparams(
3466 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
3467 sql.bindparam("schname", owner, ischema.CoerceUnicode()),
3468 )
3469 .columns(name=sqltypes.Unicode())
3470 )
3471 indexes = {}
3472 for row in rp.mappings():
3473 indexes[row["index_id"]] = current = {
3474 "name": row["name"],
3475 "unique": row["is_unique"] == 1,
3476 "column_names": [],
3477 "include_columns": [],
3478 "dialect_options": {},
3479 }
3480
3481 do = current["dialect_options"]
3482 index_type = row["type"]
3483 if index_type in {1, 2}:
3484 do["mssql_clustered"] = index_type == 1
3485 if index_type in {5, 6}:
3486 do["mssql_clustered"] = index_type == 5
3487 do["mssql_columnstore"] = True
3488 if row["filter_definition"] is not None:
3489 do["mssql_where"] = row["filter_definition"]
3490
3491 rp = connection.execution_options(future_result=True).execute(
3492 sql.text(
3493 """
3494select
3495 ind_col.index_id,
3496 col.name,
3497 ind_col.is_included_column
3498from
3499 sys.columns as col
3500join sys.tables as tab on
3501 tab.object_id = col.object_id
3502join sys.index_columns as ind_col on
3503 ind_col.column_id = col.column_id
3504 and ind_col.object_id = tab.object_id
3505join sys.schemas as sch on
3506 sch.schema_id = tab.schema_id
3507where
3508 tab.name = :tabname
3509 and sch.name = :schname
3510order by
3511 ind_col.index_id,
3512 ind_col.key_ordinal
3513 """
3514 )
3515 .bindparams(
3516 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
3517 sql.bindparam("schname", owner, ischema.CoerceUnicode()),
3518 )
3519 .columns(name=sqltypes.Unicode())
3520 )
3521 for row in rp.mappings():
3522 if row["index_id"] not in indexes:
3523 continue
3524 index_def = indexes[row["index_id"]]
3525 is_colstore = index_def["dialect_options"].get("mssql_columnstore")
3526 is_clustered = index_def["dialect_options"].get("mssql_clustered")
3527 if not (is_colstore and is_clustered):
3528 # a clustered columnstore index includes all columns but does
3529 # not want them in the index definition
3530 if row["is_included_column"] and not is_colstore:
3531 # a noncludsted columnstore index reports that includes
3532 # columns but requires that are listed as normal columns
3533 index_def["include_columns"].append(row["name"])
3534 else:
3535 index_def["column_names"].append(row["name"])
3536 for index_info in indexes.values():
3537 # NOTE: "root level" include_columns is legacy, now part of
3538 # dialect_options (issue #7382)
3539 index_info["dialect_options"]["mssql_include"] = index_info[
3540 "include_columns"
3541 ]
3542
3543 if indexes:
3544 return list(indexes.values())
3545 else:
3546 return self._default_or_error(
3547 connection, tablename, owner, ReflectionDefaults.indexes, **kw
3548 )
3549
3550 @reflection.cache
3551 @_db_plus_owner
3552 def get_view_definition(
3553 self, connection, viewname, dbname, owner, schema, **kw
3554 ):
3555 view_def = connection.execute(
3556 sql.text(
3557 "select mod.definition "
3558 "from sys.sql_modules as mod "
3559 "join sys.views as views on mod.object_id = views.object_id "
3560 "join sys.schemas as sch on views.schema_id = sch.schema_id "
3561 "where views.name=:viewname and sch.name=:schname"
3562 ).bindparams(
3563 sql.bindparam("viewname", viewname, ischema.CoerceUnicode()),
3564 sql.bindparam("schname", owner, ischema.CoerceUnicode()),
3565 )
3566 ).scalar()
3567 if view_def:
3568 return view_def
3569 else:
3570 raise exc.NoSuchTableError(f"{owner}.{viewname}")
3571
3572 @reflection.cache
3573 def get_table_comment(self, connection, table_name, schema=None, **kw):
3574 if not self.supports_comments:
3575 raise NotImplementedError(
3576 "Can't get table comments on current SQL Server version in use"
3577 )
3578
3579 schema_name = schema if schema else self.default_schema_name
3580 COMMENT_SQL = """
3581 SELECT cast(com.value as nvarchar(max))
3582 FROM fn_listextendedproperty('MS_Description',
3583 'schema', :schema, 'table', :table, NULL, NULL
3584 ) as com;
3585 """
3586
3587 comment = connection.execute(
3588 sql.text(COMMENT_SQL).bindparams(
3589 sql.bindparam("schema", schema_name, ischema.CoerceUnicode()),
3590 sql.bindparam("table", table_name, ischema.CoerceUnicode()),
3591 )
3592 ).scalar()
3593 if comment:
3594 return {"text": comment}
3595 else:
3596 return self._default_or_error(
3597 connection,
3598 table_name,
3599 None,
3600 ReflectionDefaults.table_comment,
3601 **kw,
3602 )
3603
3604 def _temp_table_name_like_pattern(self, tablename):
3605 # LIKE uses '%' to match zero or more characters and '_' to match any
3606 # single character. We want to match literal underscores, so T-SQL
3607 # requires that we enclose them in square brackets.
3608 return tablename + (
3609 ("[_][_][_]%") if not tablename.startswith("##") else ""
3610 )
3611
3612 def _get_internal_temp_table_name(self, connection, tablename):
3613 # it's likely that schema is always "dbo", but since we can
3614 # get it here, let's get it.
3615 # see https://stackoverflow.com/questions/8311959/
3616 # specifying-schema-for-temporary-tables
3617
3618 try:
3619 return connection.execute(
3620 sql.text(
3621 "select table_schema, table_name "
3622 "from tempdb.information_schema.tables "
3623 "where table_name like :p1"
3624 ),
3625 {"p1": self._temp_table_name_like_pattern(tablename)},
3626 ).one()
3627 except exc.MultipleResultsFound as me:
3628 raise exc.UnreflectableTableError(
3629 "Found more than one temporary table named '%s' in tempdb "
3630 "at this time. Cannot reliably resolve that name to its "
3631 "internal table name." % tablename
3632 ) from me
3633 except exc.NoResultFound as ne:
3634 raise exc.NoSuchTableError(
3635 "Unable to find a temporary table named '%s' in tempdb."
3636 % tablename
3637 ) from ne
3638
3639 @reflection.cache
3640 @_db_plus_owner
3641 def get_columns(self, connection, tablename, dbname, owner, schema, **kw):
3642 sys_columns = ischema.sys_columns
3643 sys_types = ischema.sys_types
3644 sys_default_constraints = ischema.sys_default_constraints
3645 computed_cols = ischema.computed_columns
3646 identity_cols = ischema.identity_columns
3647 extended_properties = ischema.extended_properties
3648
3649 # to access sys tables, need an object_id.
3650 # object_id() can normally match to the unquoted name even if it
3651 # has special characters. however it also accepts quoted names,
3652 # which means for the special case that the name itself has
3653 # "quotes" (e.g. brackets for SQL Server) we need to "quote" (e.g.
3654 # bracket) that name anyway. Fixed as part of #12654
3655
3656 is_temp_table = tablename.startswith("#")
3657 if is_temp_table:
3658 owner, tablename = self._get_internal_temp_table_name(
3659 connection, tablename
3660 )
3661
3662 object_id_tokens = [self.identifier_preparer.quote(tablename)]
3663 if owner:
3664 object_id_tokens.insert(0, self.identifier_preparer.quote(owner))
3665
3666 if is_temp_table:
3667 object_id_tokens.insert(0, "tempdb")
3668
3669 object_id = func.object_id(".".join(object_id_tokens))
3670
3671 whereclause = sys_columns.c.object_id == object_id
3672
3673 if self._supports_nvarchar_max:
3674 computed_definition = computed_cols.c.definition
3675 else:
3676 # tds_version 4.2 does not support NVARCHAR(MAX)
3677 computed_definition = sql.cast(
3678 computed_cols.c.definition, NVARCHAR(4000)
3679 )
3680
3681 s = (
3682 sql.select(
3683 sys_columns.c.name,
3684 sys_types.c.name,
3685 sys_columns.c.is_nullable,
3686 sys_columns.c.max_length,
3687 sys_columns.c.precision,
3688 sys_columns.c.scale,
3689 sys_default_constraints.c.definition,
3690 sys_columns.c.collation_name,
3691 computed_definition,
3692 computed_cols.c.is_persisted,
3693 identity_cols.c.is_identity,
3694 identity_cols.c.seed_value,
3695 identity_cols.c.increment_value,
3696 extended_properties.c.value.label("comment"),
3697 )
3698 .select_from(sys_columns)
3699 .join(
3700 sys_types,
3701 onclause=sys_columns.c.user_type_id
3702 == sys_types.c.user_type_id,
3703 )
3704 .outerjoin(
3705 sys_default_constraints,
3706 sql.and_(
3707 sys_default_constraints.c.object_id
3708 == sys_columns.c.default_object_id,
3709 sys_default_constraints.c.parent_column_id
3710 == sys_columns.c.column_id,
3711 ),
3712 )
3713 .outerjoin(
3714 computed_cols,
3715 onclause=sql.and_(
3716 computed_cols.c.object_id == sys_columns.c.object_id,
3717 computed_cols.c.column_id == sys_columns.c.column_id,
3718 ),
3719 )
3720 .outerjoin(
3721 identity_cols,
3722 onclause=sql.and_(
3723 identity_cols.c.object_id == sys_columns.c.object_id,
3724 identity_cols.c.column_id == sys_columns.c.column_id,
3725 ),
3726 )
3727 .outerjoin(
3728 extended_properties,
3729 onclause=sql.and_(
3730 extended_properties.c["class"] == 1,
3731 extended_properties.c.name == "MS_Description",
3732 sys_columns.c.object_id == extended_properties.c.major_id,
3733 sys_columns.c.column_id == extended_properties.c.minor_id,
3734 ),
3735 )
3736 .where(whereclause)
3737 .order_by(sys_columns.c.column_id)
3738 )
3739
3740 if is_temp_table:
3741 exec_opts = {"schema_translate_map": {"sys": "tempdb.sys"}}
3742 else:
3743 exec_opts = {"schema_translate_map": {}}
3744 c = connection.execution_options(**exec_opts).execute(s)
3745
3746 cols = []
3747 for row in c.mappings():
3748 name = row[sys_columns.c.name]
3749 type_ = row[sys_types.c.name]
3750 nullable = row[sys_columns.c.is_nullable] == 1
3751 maxlen = row[sys_columns.c.max_length]
3752 numericprec = row[sys_columns.c.precision]
3753 numericscale = row[sys_columns.c.scale]
3754 default = row[sys_default_constraints.c.definition]
3755 collation = row[sys_columns.c.collation_name]
3756 definition = row[computed_definition]
3757 is_persisted = row[computed_cols.c.is_persisted]
3758 is_identity = row[identity_cols.c.is_identity]
3759 identity_start = row[identity_cols.c.seed_value]
3760 identity_increment = row[identity_cols.c.increment_value]
3761 comment = row[extended_properties.c.value]
3762
3763 coltype = self.ischema_names.get(type_, None)
3764
3765 kwargs = {}
3766
3767 if coltype in (
3768 MSBinary,
3769 MSVarBinary,
3770 sqltypes.LargeBinary,
3771 ):
3772 kwargs["length"] = maxlen if maxlen != -1 else None
3773 elif coltype in (
3774 MSString,
3775 MSChar,
3776 MSText,
3777 ):
3778 kwargs["length"] = maxlen if maxlen != -1 else None
3779 if collation:
3780 kwargs["collation"] = collation
3781 elif coltype in (
3782 MSNVarchar,
3783 MSNChar,
3784 MSNText,
3785 ):
3786 kwargs["length"] = maxlen // 2 if maxlen != -1 else None
3787 if collation:
3788 kwargs["collation"] = collation
3789
3790 if coltype is None:
3791 util.warn(
3792 "Did not recognize type '%s' of column '%s'"
3793 % (type_, name)
3794 )
3795 coltype = sqltypes.NULLTYPE
3796 else:
3797 if issubclass(coltype, sqltypes.Numeric):
3798 kwargs["precision"] = numericprec
3799
3800 if not issubclass(coltype, sqltypes.Float):
3801 kwargs["scale"] = numericscale
3802
3803 coltype = coltype(**kwargs)
3804 cdict = {
3805 "name": name,
3806 "type": coltype,
3807 "nullable": nullable,
3808 "default": default,
3809 "autoincrement": is_identity is not None,
3810 "comment": comment,
3811 }
3812
3813 if definition is not None and is_persisted is not None:
3814 cdict["computed"] = {
3815 "sqltext": definition,
3816 "persisted": is_persisted,
3817 }
3818
3819 if is_identity is not None:
3820 # identity_start and identity_increment are Decimal or None
3821 if identity_start is None or identity_increment is None:
3822 cdict["identity"] = {}
3823 else:
3824 if isinstance(coltype, sqltypes.BigInteger):
3825 start = int(identity_start)
3826 increment = int(identity_increment)
3827 elif isinstance(coltype, sqltypes.Integer):
3828 start = int(identity_start)
3829 increment = int(identity_increment)
3830 else:
3831 start = identity_start
3832 increment = identity_increment
3833
3834 cdict["identity"] = {
3835 "start": start,
3836 "increment": increment,
3837 }
3838
3839 cols.append(cdict)
3840
3841 if cols:
3842 return cols
3843 else:
3844 return self._default_or_error(
3845 connection, tablename, owner, ReflectionDefaults.columns, **kw
3846 )
3847
3848 @reflection.cache
3849 @_db_plus_owner
3850 def get_pk_constraint(
3851 self, connection, tablename, dbname, owner, schema, **kw
3852 ):
3853 pkeys = []
3854 TC = ischema.constraints
3855 C = ischema.key_constraints.alias("C")
3856
3857 # Primary key constraints
3858 s = (
3859 sql.select(
3860 C.c.column_name,
3861 TC.c.constraint_type,
3862 C.c.constraint_name,
3863 func.objectproperty(
3864 func.object_id(
3865 C.c.table_schema + "." + C.c.constraint_name
3866 ),
3867 "CnstIsClustKey",
3868 ).label("is_clustered"),
3869 )
3870 .where(
3871 sql.and_(
3872 TC.c.constraint_name == C.c.constraint_name,
3873 TC.c.table_schema == C.c.table_schema,
3874 C.c.table_name == tablename,
3875 C.c.table_schema == owner,
3876 ),
3877 )
3878 .order_by(TC.c.constraint_name, C.c.ordinal_position)
3879 )
3880 c = connection.execution_options(future_result=True).execute(s)
3881 constraint_name = None
3882 is_clustered = None
3883 for row in c.mappings():
3884 if "PRIMARY" in row[TC.c.constraint_type.name]:
3885 pkeys.append(row["COLUMN_NAME"])
3886 if constraint_name is None:
3887 constraint_name = row[C.c.constraint_name.name]
3888 if is_clustered is None:
3889 is_clustered = row["is_clustered"]
3890 if pkeys:
3891 return {
3892 "constrained_columns": pkeys,
3893 "name": constraint_name,
3894 "dialect_options": {"mssql_clustered": is_clustered},
3895 }
3896 else:
3897 return self._default_or_error(
3898 connection,
3899 tablename,
3900 owner,
3901 ReflectionDefaults.pk_constraint,
3902 **kw,
3903 )
3904
3905 @reflection.cache
3906 @_db_plus_owner
3907 def get_foreign_keys(
3908 self, connection, tablename, dbname, owner, schema, **kw
3909 ):
3910 # Foreign key constraints
3911 s = (
3912 text(
3913 """\
3914WITH fk_info AS (
3915 SELECT
3916 ischema_ref_con.constraint_schema,
3917 ischema_ref_con.constraint_name,
3918 ischema_key_col.ordinal_position,
3919 ischema_key_col.table_schema,
3920 ischema_key_col.table_name,
3921 ischema_ref_con.unique_constraint_schema,
3922 ischema_ref_con.unique_constraint_name,
3923 ischema_ref_con.match_option,
3924 ischema_ref_con.update_rule,
3925 ischema_ref_con.delete_rule,
3926 ischema_key_col.column_name AS constrained_column
3927 FROM
3928 INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS ischema_ref_con
3929 INNER JOIN
3930 INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col ON
3931 ischema_key_col.table_schema = ischema_ref_con.constraint_schema
3932 AND ischema_key_col.constraint_name =
3933 ischema_ref_con.constraint_name
3934 WHERE ischema_key_col.table_name = :tablename
3935 AND ischema_key_col.table_schema = :owner
3936),
3937constraint_info AS (
3938 SELECT
3939 ischema_key_col.constraint_schema,
3940 ischema_key_col.constraint_name,
3941 ischema_key_col.ordinal_position,
3942 ischema_key_col.table_schema,
3943 ischema_key_col.table_name,
3944 ischema_key_col.column_name
3945 FROM
3946 INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col
3947),
3948index_info AS (
3949 SELECT
3950 sys.schemas.name AS index_schema,
3951 sys.indexes.name AS index_name,
3952 sys.index_columns.key_ordinal AS ordinal_position,
3953 sys.schemas.name AS table_schema,
3954 sys.objects.name AS table_name,
3955 sys.columns.name AS column_name
3956 FROM
3957 sys.indexes
3958 INNER JOIN
3959 sys.objects ON
3960 sys.objects.object_id = sys.indexes.object_id
3961 INNER JOIN
3962 sys.schemas ON
3963 sys.schemas.schema_id = sys.objects.schema_id
3964 INNER JOIN
3965 sys.index_columns ON
3966 sys.index_columns.object_id = sys.objects.object_id
3967 AND sys.index_columns.index_id = sys.indexes.index_id
3968 INNER JOIN
3969 sys.columns ON
3970 sys.columns.object_id = sys.indexes.object_id
3971 AND sys.columns.column_id = sys.index_columns.column_id
3972)
3973 SELECT
3974 fk_info.constraint_schema,
3975 fk_info.constraint_name,
3976 fk_info.ordinal_position,
3977 fk_info.constrained_column,
3978 constraint_info.table_schema AS referred_table_schema,
3979 constraint_info.table_name AS referred_table_name,
3980 constraint_info.column_name AS referred_column,
3981 fk_info.match_option,
3982 fk_info.update_rule,
3983 fk_info.delete_rule
3984 FROM
3985 fk_info INNER JOIN constraint_info ON
3986 constraint_info.constraint_schema =
3987 fk_info.unique_constraint_schema
3988 AND constraint_info.constraint_name =
3989 fk_info.unique_constraint_name
3990 AND constraint_info.ordinal_position = fk_info.ordinal_position
3991 UNION
3992 SELECT
3993 fk_info.constraint_schema,
3994 fk_info.constraint_name,
3995 fk_info.ordinal_position,
3996 fk_info.constrained_column,
3997 index_info.table_schema AS referred_table_schema,
3998 index_info.table_name AS referred_table_name,
3999 index_info.column_name AS referred_column,
4000 fk_info.match_option,
4001 fk_info.update_rule,
4002 fk_info.delete_rule
4003 FROM
4004 fk_info INNER JOIN index_info ON
4005 index_info.index_schema = fk_info.unique_constraint_schema
4006 AND index_info.index_name = fk_info.unique_constraint_name
4007 AND index_info.ordinal_position = fk_info.ordinal_position
4008 AND NOT (index_info.table_schema = fk_info.table_schema
4009 AND index_info.table_name = fk_info.table_name)
4010
4011 ORDER BY fk_info.constraint_schema, fk_info.constraint_name,
4012 fk_info.ordinal_position
4013"""
4014 )
4015 .bindparams(
4016 sql.bindparam("tablename", tablename, ischema.CoerceUnicode()),
4017 sql.bindparam("owner", owner, ischema.CoerceUnicode()),
4018 )
4019 .columns(
4020 constraint_schema=sqltypes.Unicode(),
4021 constraint_name=sqltypes.Unicode(),
4022 table_schema=sqltypes.Unicode(),
4023 table_name=sqltypes.Unicode(),
4024 constrained_column=sqltypes.Unicode(),
4025 referred_table_schema=sqltypes.Unicode(),
4026 referred_table_name=sqltypes.Unicode(),
4027 referred_column=sqltypes.Unicode(),
4028 )
4029 )
4030
4031 # group rows by constraint ID, to handle multi-column FKs
4032 fkeys = util.defaultdict(
4033 lambda: {
4034 "name": None,
4035 "constrained_columns": [],
4036 "referred_schema": None,
4037 "referred_table": None,
4038 "referred_columns": [],
4039 "options": {},
4040 }
4041 )
4042
4043 for r in connection.execute(s).all():
4044 (
4045 _, # constraint schema
4046 rfknm,
4047 _, # ordinal position
4048 scol,
4049 rschema,
4050 rtbl,
4051 rcol,
4052 # TODO: we support match=<keyword> for foreign keys so
4053 # we can support this also, PG has match=FULL for example
4054 # but this seems to not be a valid value for SQL Server
4055 _, # match rule
4056 fkuprule,
4057 fkdelrule,
4058 ) = r
4059
4060 rec = fkeys[rfknm]
4061 rec["name"] = rfknm
4062
4063 if fkuprule != "NO ACTION":
4064 rec["options"]["onupdate"] = fkuprule
4065
4066 if fkdelrule != "NO ACTION":
4067 rec["options"]["ondelete"] = fkdelrule
4068
4069 if not rec["referred_table"]:
4070 rec["referred_table"] = rtbl
4071 if schema is not None or owner != rschema:
4072 if dbname:
4073 rschema = dbname + "." + rschema
4074 rec["referred_schema"] = rschema
4075
4076 local_cols, remote_cols = (
4077 rec["constrained_columns"],
4078 rec["referred_columns"],
4079 )
4080
4081 local_cols.append(scol)
4082 remote_cols.append(rcol)
4083
4084 if fkeys:
4085 return list(fkeys.values())
4086 else:
4087 return self._default_or_error(
4088 connection,
4089 tablename,
4090 owner,
4091 ReflectionDefaults.foreign_keys,
4092 **kw,
4093 )