1# dialects/mssql/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9"""
10.. dialect:: mssql
11 :name: Microsoft SQL Server
12 :normal_support: 2012+
13 :best_effort: 2005+
14
15.. _mssql_external_dialects:
16
17External Dialects
18-----------------
19
20In addition to the above DBAPI layers with native SQLAlchemy support, there
21are third-party dialects for other DBAPI layers that are compatible
22with SQL Server. See the "External Dialects" list on the
23:ref:`dialect_toplevel` page.
24
25.. _mssql_identity:
26
27Auto Increment Behavior / IDENTITY Columns
28------------------------------------------
29
30SQL Server provides so-called "auto incrementing" behavior using the
31``IDENTITY`` construct, which can be placed on any single integer column in a
32table. SQLAlchemy considers ``IDENTITY`` within its default "autoincrement"
33behavior for an integer primary key column, described at
34:paramref:`_schema.Column.autoincrement`. This means that by default,
35the first integer primary key column in a :class:`_schema.Table` will be
36considered to be the identity column - unless it is associated with a
37:class:`.Sequence` - and will generate DDL as such::
38
39 from sqlalchemy import Table, MetaData, Column, Integer
40
41 m = MetaData()
42 t = Table(
43 "t",
44 m,
45 Column("id", Integer, primary_key=True),
46 Column("x", Integer),
47 )
48 m.create_all(engine)
49
50The above example will generate DDL as:
51
52.. sourcecode:: sql
53
54 CREATE TABLE t (
55 id INTEGER NOT NULL IDENTITY,
56 x INTEGER NULL,
57 PRIMARY KEY (id)
58 )
59
60For the case where this default generation of ``IDENTITY`` is not desired,
61specify ``False`` for the :paramref:`_schema.Column.autoincrement` flag,
62on the first integer primary key column::
63
64 m = MetaData()
65 t = Table(
66 "t",
67 m,
68 Column("id", Integer, primary_key=True, autoincrement=False),
69 Column("x", Integer),
70 )
71 m.create_all(engine)
72
73To add the ``IDENTITY`` keyword to a non-primary key column, specify
74``True`` for the :paramref:`_schema.Column.autoincrement` flag on the desired
75:class:`_schema.Column` object, and ensure that
76:paramref:`_schema.Column.autoincrement`
77is set to ``False`` on any integer primary key column::
78
79 m = MetaData()
80 t = Table(
81 "t",
82 m,
83 Column("id", Integer, primary_key=True, autoincrement=False),
84 Column("x", Integer, autoincrement=True),
85 )
86 m.create_all(engine)
87
88.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
89 in a :class:`_schema.Column` to specify the start and increment
90 parameters of an IDENTITY. These replace
91 the use of the :class:`.Sequence` object in order to specify these values.
92
93.. deprecated:: 1.4
94
95 The ``mssql_identity_start`` and ``mssql_identity_increment`` parameters
96 to :class:`_schema.Column` are deprecated and should we replaced by
97 an :class:`_schema.Identity` object. Specifying both ways of configuring
98 an IDENTITY will result in a compile error.
99 These options are also no longer returned as part of the
100 ``dialect_options`` key in :meth:`_reflection.Inspector.get_columns`.
101 Use the information in the ``identity`` key instead.
102
103.. deprecated:: 1.3
104
105 The use of :class:`.Sequence` to specify IDENTITY characteristics is
106 deprecated and will be removed in a future release. Please use
107 the :class:`_schema.Identity` object parameters
108 :paramref:`_schema.Identity.start` and
109 :paramref:`_schema.Identity.increment`.
110
111.. versionchanged:: 1.4 Removed the ability to use a :class:`.Sequence`
112 object to modify IDENTITY characteristics. :class:`.Sequence` objects
113 now only manipulate true T-SQL SEQUENCE types.
114
115.. note::
116
117 There can only be one IDENTITY column on the table. When using
118 ``autoincrement=True`` to enable the IDENTITY keyword, SQLAlchemy does not
119 guard against multiple columns specifying the option simultaneously. The
120 SQL Server database will instead reject the ``CREATE TABLE`` statement.
121
122.. note::
123
124 An INSERT statement which attempts to provide a value for a column that is
125 marked with IDENTITY will be rejected by SQL Server. In order for the
126 value to be accepted, a session-level option "SET IDENTITY_INSERT" must be
127 enabled. The SQLAlchemy SQL Server dialect will perform this operation
128 automatically when using a core :class:`_expression.Insert`
129 construct; if the
130 execution specifies a value for the IDENTITY column, the "IDENTITY_INSERT"
131 option will be enabled for the span of that statement's invocation.However,
132 this scenario is not high performing and should not be relied upon for
133 normal use. If a table doesn't actually require IDENTITY behavior in its
134 integer primary key column, the keyword should be disabled when creating
135 the table by ensuring that ``autoincrement=False`` is set.
136
137Controlling "Start" and "Increment"
138^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
139
140Specific control over the "start" and "increment" values for
141the ``IDENTITY`` generator are provided using the
142:paramref:`_schema.Identity.start` and :paramref:`_schema.Identity.increment`
143parameters passed to the :class:`_schema.Identity` object::
144
145 from sqlalchemy import Table, Integer, Column, Identity
146
147 test = Table(
148 "test",
149 metadata,
150 Column(
151 "id", Integer, primary_key=True, Identity(start=100, increment=10)
152 ),
153 Column("name", String(20)),
154 )
155
156The CREATE TABLE for the above :class:`_schema.Table` object would be:
157
158.. sourcecode:: sql
159
160 CREATE TABLE test (
161 id INTEGER NOT NULL IDENTITY(100,10) PRIMARY KEY,
162 name VARCHAR(20) NULL,
163 )
164
165.. note::
166
167 The :class:`_schema.Identity` object supports many other parameter in
168 addition to ``start`` and ``increment``. These are not supported by
169 SQL Server and will be ignored when generating the CREATE TABLE ddl.
170
171.. versionchanged:: 1.3.19 The :class:`_schema.Identity` object is
172 now used to affect the
173 ``IDENTITY`` generator for a :class:`_schema.Column` under SQL Server.
174 Previously, the :class:`.Sequence` object was used. As SQL Server now
175 supports real sequences as a separate construct, :class:`.Sequence` will be
176 functional in the normal way starting from SQLAlchemy version 1.4.
177
178
179Using IDENTITY with Non-Integer numeric types
180^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
181
182SQL Server also allows ``IDENTITY`` to be used with ``NUMERIC`` columns. To
183implement this pattern smoothly in SQLAlchemy, the primary datatype of the
184column should remain as ``Integer``, however the underlying implementation
185type deployed to the SQL Server database can be specified as ``Numeric`` using
186:meth:`.TypeEngine.with_variant`::
187
188 from sqlalchemy import Column
189 from sqlalchemy import Integer
190 from sqlalchemy import Numeric
191 from sqlalchemy import String
192 from sqlalchemy.ext.declarative import declarative_base
193
194 Base = declarative_base()
195
196
197 class TestTable(Base):
198 __tablename__ = "test"
199 id = Column(
200 Integer().with_variant(Numeric(10, 0), "mssql"),
201 primary_key=True,
202 autoincrement=True,
203 )
204 name = Column(String)
205
206In the above example, ``Integer().with_variant()`` provides clear usage
207information that accurately describes the intent of the code. The general
208restriction that ``autoincrement`` only applies to ``Integer`` is established
209at the metadata level and not at the per-dialect level.
210
211When using the above pattern, the primary key identifier that comes back from
212the insertion of a row, which is also the value that would be assigned to an
213ORM object such as ``TestTable`` above, will be an instance of ``Decimal()``
214and not ``int`` when using SQL Server. The numeric return type of the
215:class:`_types.Numeric` type can be changed to return floats by passing False
216to :paramref:`_types.Numeric.asdecimal`. To normalize the return type of the
217above ``Numeric(10, 0)`` to return Python ints (which also support "long"
218integer values in Python 3), use :class:`_types.TypeDecorator` as follows::
219
220 from sqlalchemy import TypeDecorator
221
222
223 class NumericAsInteger(TypeDecorator):
224 "normalize floating point return values into ints"
225
226 impl = Numeric(10, 0, asdecimal=False)
227 cache_ok = True
228
229 def process_result_value(self, value, dialect):
230 if value is not None:
231 value = int(value)
232 return value
233
234
235 class TestTable(Base):
236 __tablename__ = "test"
237 id = Column(
238 Integer().with_variant(NumericAsInteger, "mssql"),
239 primary_key=True,
240 autoincrement=True,
241 )
242 name = Column(String)
243
244.. _mssql_insert_behavior:
245
246INSERT behavior
247^^^^^^^^^^^^^^^^
248
249Handling of the ``IDENTITY`` column at INSERT time involves two key
250techniques. The most common is being able to fetch the "last inserted value"
251for a given ``IDENTITY`` column, a process which SQLAlchemy performs
252implicitly in many cases, most importantly within the ORM.
253
254The process for fetching this value has several variants:
255
256* In the vast majority of cases, RETURNING is used in conjunction with INSERT
257 statements on SQL Server in order to get newly generated primary key values:
258
259 .. sourcecode:: sql
260
261 INSERT INTO t (x) OUTPUT inserted.id VALUES (?)
262
263 As of SQLAlchemy 2.0, the :ref:`engine_insertmanyvalues` feature is also
264 used by default to optimize many-row INSERT statements; for SQL Server
265 the feature takes place for both RETURNING and-non RETURNING
266 INSERT statements.
267
268 .. versionchanged:: 2.0.10 The :ref:`engine_insertmanyvalues` feature for
269 SQL Server was temporarily disabled for SQLAlchemy version 2.0.9 due to
270 issues with row ordering. As of 2.0.10 the feature is re-enabled, with
271 special case handling for the unit of work's requirement for RETURNING to
272 be ordered.
273
274* When RETURNING is not available or has been disabled via
275 ``implicit_returning=False``, either the ``scope_identity()`` function or
276 the ``@@identity`` variable is used; behavior varies by backend:
277
278 * when using PyODBC, the phrase ``; select scope_identity()`` will be
279 appended to the end of the INSERT statement; a second result set will be
280 fetched in order to receive the value. Given a table as::
281
282 t = Table(
283 "t",
284 metadata,
285 Column("id", Integer, primary_key=True),
286 Column("x", Integer),
287 implicit_returning=False,
288 )
289
290 an INSERT will look like:
291
292 .. sourcecode:: sql
293
294 INSERT INTO t (x) VALUES (?); select scope_identity()
295
296 * Other dialects such as pymssql will call upon
297 ``SELECT scope_identity() AS lastrowid`` subsequent to an INSERT
298 statement. If the flag ``use_scope_identity=False`` is passed to
299 :func:`_sa.create_engine`,
300 the statement ``SELECT @@identity AS lastrowid``
301 is used instead.
302
303A table that contains an ``IDENTITY`` column will prohibit an INSERT statement
304that refers to the identity column explicitly. The SQLAlchemy dialect will
305detect when an INSERT construct, created using a core
306:func:`_expression.insert`
307construct (not a plain string SQL), refers to the identity column, and
308in this case will emit ``SET IDENTITY_INSERT ON`` prior to the insert
309statement proceeding, and ``SET IDENTITY_INSERT OFF`` subsequent to the
310execution. Given this example::
311
312 m = MetaData()
313 t = Table(
314 "t", m, Column("id", Integer, primary_key=True), Column("x", Integer)
315 )
316 m.create_all(engine)
317
318 with engine.begin() as conn:
319 conn.execute(t.insert(), {"id": 1, "x": 1}, {"id": 2, "x": 2})
320
321The above column will be created with IDENTITY, however the INSERT statement
322we emit is specifying explicit values. In the echo output we can see
323how SQLAlchemy handles this:
324
325.. sourcecode:: sql
326
327 CREATE TABLE t (
328 id INTEGER NOT NULL IDENTITY(1,1),
329 x INTEGER NULL,
330 PRIMARY KEY (id)
331 )
332
333 COMMIT
334 SET IDENTITY_INSERT t ON
335 INSERT INTO t (id, x) VALUES (?, ?)
336 ((1, 1), (2, 2))
337 SET IDENTITY_INSERT t OFF
338 COMMIT
339
340
341
342This is an auxiliary use case suitable for testing and bulk insert scenarios.
343
344SEQUENCE support
345----------------
346
347The :class:`.Sequence` object creates "real" sequences, i.e.,
348``CREATE SEQUENCE``:
349
350.. sourcecode:: pycon+sql
351
352 >>> from sqlalchemy import Sequence
353 >>> from sqlalchemy.schema import CreateSequence
354 >>> from sqlalchemy.dialects import mssql
355 >>> print(
356 ... CreateSequence(Sequence("my_seq", start=1)).compile(
357 ... dialect=mssql.dialect()
358 ... )
359 ... )
360 {printsql}CREATE SEQUENCE my_seq START WITH 1
361
362For integer primary key generation, SQL Server's ``IDENTITY`` construct should
363generally be preferred vs. sequence.
364
365.. tip::
366
367 The default start value for T-SQL is ``-2**63`` instead of 1 as
368 in most other SQL databases. Users should explicitly set the
369 :paramref:`.Sequence.start` to 1 if that's the expected default::
370
371 seq = Sequence("my_sequence", start=1)
372
373.. versionadded:: 1.4 added SQL Server support for :class:`.Sequence`
374
375.. versionchanged:: 2.0 The SQL Server dialect will no longer implicitly
376 render "START WITH 1" for ``CREATE SEQUENCE``, which was the behavior
377 first implemented in version 1.4.
378
379MAX on VARCHAR / NVARCHAR
380-------------------------
381
382SQL Server supports the special string "MAX" within the
383:class:`_types.VARCHAR` and :class:`_types.NVARCHAR` datatypes,
384to indicate "maximum length possible". The dialect currently handles this as
385a length of "None" in the base type, rather than supplying a
386dialect-specific version of these types, so that a base type
387specified such as ``VARCHAR(None)`` can assume "unlengthed" behavior on
388more than one backend without using dialect-specific types.
389
390To build a SQL Server VARCHAR or NVARCHAR with MAX length, use None::
391
392 my_table = Table(
393 "my_table",
394 metadata,
395 Column("my_data", VARCHAR(None)),
396 Column("my_n_data", NVARCHAR(None)),
397 )
398
399Collation Support
400-----------------
401
402Character collations are supported by the base string types,
403specified by the string argument "collation"::
404
405 from sqlalchemy import VARCHAR
406
407 Column("login", VARCHAR(32, collation="Latin1_General_CI_AS"))
408
409When such a column is associated with a :class:`_schema.Table`, the
410CREATE TABLE statement for this column will yield:
411
412.. sourcecode:: sql
413
414 login VARCHAR(32) COLLATE Latin1_General_CI_AS NULL
415
416LIMIT/OFFSET Support
417--------------------
418
419MSSQL has added support for LIMIT / OFFSET as of SQL Server 2012, via the
420"OFFSET n ROWS" and "FETCH NEXT n ROWS" clauses. SQLAlchemy supports these
421syntaxes automatically if SQL Server 2012 or greater is detected.
422
423.. versionchanged:: 1.4 support added for SQL Server "OFFSET n ROWS" and
424 "FETCH NEXT n ROWS" syntax.
425
426For statements that specify only LIMIT and no OFFSET, all versions of SQL
427Server support the TOP keyword. This syntax is used for all SQL Server
428versions when no OFFSET clause is present. A statement such as::
429
430 select(some_table).limit(5)
431
432will render similarly to:
433
434.. sourcecode:: sql
435
436 SELECT TOP 5 col1, col2.. FROM table
437
438For versions of SQL Server prior to SQL Server 2012, a statement that uses
439LIMIT and OFFSET, or just OFFSET alone, will be rendered using the
440``ROW_NUMBER()`` window function. A statement such as::
441
442 select(some_table).order_by(some_table.c.col3).limit(5).offset(10)
443
444will render similarly to:
445
446.. sourcecode:: sql
447
448 SELECT anon_1.col1, anon_1.col2 FROM (SELECT col1, col2,
449 ROW_NUMBER() OVER (ORDER BY col3) AS
450 mssql_rn FROM table WHERE t.x = :x_1) AS
451 anon_1 WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1
452
453Note that when using LIMIT and/or OFFSET, whether using the older
454or newer SQL Server syntaxes, the statement must have an ORDER BY as well,
455else a :class:`.CompileError` is raised.
456
457.. _mssql_comment_support:
458
459DDL Comment Support
460--------------------
461
462Comment support, which includes DDL rendering for attributes such as
463:paramref:`_schema.Table.comment` and :paramref:`_schema.Column.comment`, as
464well as the ability to reflect these comments, is supported assuming a
465supported version of SQL Server is in use. If a non-supported version such as
466Azure Synapse is detected at first-connect time (based on the presence
467of the ``fn_listextendedproperty`` SQL function), comment support including
468rendering and table-comment reflection is disabled, as both features rely upon
469SQL Server stored procedures and functions that are not available on all
470backend types.
471
472To force comment support to be on or off, bypassing autodetection, set the
473parameter ``supports_comments`` within :func:`_sa.create_engine`::
474
475 e = create_engine("mssql+pyodbc://u:p@dsn", supports_comments=False)
476
477.. versionadded:: 2.0 Added support for table and column comments for
478 the SQL Server dialect, including DDL generation and reflection.
479
480.. _mssql_isolation_level:
481
482Transaction Isolation Level
483---------------------------
484
485All SQL Server dialects support setting of transaction isolation level
486both via a dialect-specific parameter
487:paramref:`_sa.create_engine.isolation_level`
488accepted by :func:`_sa.create_engine`,
489as well as the :paramref:`.Connection.execution_options.isolation_level`
490argument as passed to
491:meth:`_engine.Connection.execution_options`.
492This feature works by issuing the
493command ``SET TRANSACTION ISOLATION LEVEL <level>`` for
494each new connection.
495
496To set isolation level using :func:`_sa.create_engine`::
497
498 engine = create_engine(
499 "mssql+pyodbc://scott:tiger@ms_2008", isolation_level="REPEATABLE READ"
500 )
501
502To set using per-connection execution options::
503
504 connection = engine.connect()
505 connection = connection.execution_options(isolation_level="READ COMMITTED")
506
507Valid values for ``isolation_level`` include:
508
509* ``AUTOCOMMIT`` - pyodbc / pymssql-specific
510* ``READ COMMITTED``
511* ``READ UNCOMMITTED``
512* ``REPEATABLE READ``
513* ``SERIALIZABLE``
514* ``SNAPSHOT`` - specific to SQL Server
515
516There are also more options for isolation level configurations, such as
517"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply
518different isolation level settings. See the discussion at
519:ref:`dbapi_autocommit` for background.
520
521.. seealso::
522
523 :ref:`dbapi_autocommit`
524
525.. _mssql_reset_on_return:
526
527Temporary Table / Resource Reset for Connection Pooling
528-------------------------------------------------------
529
530The :class:`.QueuePool` connection pool implementation used
531by the SQLAlchemy :class:`.Engine` object includes
532:ref:`reset on return <pool_reset_on_return>` behavior that will invoke
533the DBAPI ``.rollback()`` method when connections are returned to the pool.
534While this rollback will clear out the immediate state used by the previous
535transaction, it does not cover a wider range of session-level state, including
536temporary tables as well as other server state such as prepared statement
537handles and statement caches. An undocumented SQL Server procedure known
538as ``sp_reset_connection`` is known to be a workaround for this issue which
539will reset most of the session state that builds up on a connection, including
540temporary tables.
541
542To install ``sp_reset_connection`` as the means of performing reset-on-return,
543the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated in the
544example below. The :paramref:`_sa.create_engine.pool_reset_on_return` parameter
545is set to ``None`` so that the custom scheme can replace the default behavior
546completely. The custom hook implementation calls ``.rollback()`` in any case,
547as it's usually important that the DBAPI's own tracking of commit/rollback
548will remain consistent with the state of the transaction::
549
550 from sqlalchemy import create_engine
551 from sqlalchemy import event
552
553 mssql_engine = create_engine(
554 "mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server",
555 # disable default reset-on-return scheme
556 pool_reset_on_return=None,
557 )
558
559
560 @event.listens_for(mssql_engine, "reset")
561 def _reset_mssql(dbapi_connection, connection_record, reset_state):
562 if not reset_state.terminate_only:
563 dbapi_connection.execute("{call sys.sp_reset_connection}")
564
565 # so that the DBAPI itself knows that the connection has been
566 # reset
567 dbapi_connection.rollback()
568
569.. versionchanged:: 2.0.0b3 Added additional state arguments to
570 the :meth:`.PoolEvents.reset` event and additionally ensured the event
571 is invoked for all "reset" occurrences, so that it's appropriate
572 as a place for custom "reset" handlers. Previous schemes which
573 use the :meth:`.PoolEvents.checkin` handler remain usable as well.
574
575.. seealso::
576
577 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation
578
579Nullability
580-----------
581MSSQL has support for three levels of column nullability. The default
582nullability allows nulls and is explicit in the CREATE TABLE
583construct:
584
585.. sourcecode:: sql
586
587 name VARCHAR(20) NULL
588
589If ``nullable=None`` is specified then no specification is made. In
590other words the database's configured default is used. This will
591render:
592
593.. sourcecode:: sql
594
595 name VARCHAR(20)
596
597If ``nullable`` is ``True`` or ``False`` then the column will be
598``NULL`` or ``NOT NULL`` respectively.
599
600Date / Time Handling
601--------------------
602DATE and TIME are supported. Bind parameters are converted
603to datetime.datetime() objects as required by most MSSQL drivers,
604and results are processed from strings if needed.
605The DATE and TIME types are not available for MSSQL 2005 and
606previous - if a server version below 2008 is detected, DDL
607for these types will be issued as DATETIME.
608
609.. _mssql_large_type_deprecation:
610
611Large Text/Binary Type Deprecation
612----------------------------------
613
614Per
615`SQL Server 2012/2014 Documentation <https://technet.microsoft.com/en-us/library/ms187993.aspx>`_,
616the ``NTEXT``, ``TEXT`` and ``IMAGE`` datatypes are to be removed from SQL
617Server in a future release. SQLAlchemy normally relates these types to the
618:class:`.UnicodeText`, :class:`_expression.TextClause` and
619:class:`.LargeBinary` datatypes.
620
621In order to accommodate this change, a new flag ``deprecate_large_types``
622is added to the dialect, which will be automatically set based on detection
623of the server version in use, if not otherwise set by the user. The
624behavior of this flag is as follows:
625
626* When this flag is ``True``, the :class:`.UnicodeText`,
627 :class:`_expression.TextClause` and
628 :class:`.LargeBinary` datatypes, when used to render DDL, will render the
629 types ``NVARCHAR(max)``, ``VARCHAR(max)``, and ``VARBINARY(max)``,
630 respectively. This is a new behavior as of the addition of this flag.
631
632* When this flag is ``False``, the :class:`.UnicodeText`,
633 :class:`_expression.TextClause` and
634 :class:`.LargeBinary` datatypes, when used to render DDL, will render the
635 types ``NTEXT``, ``TEXT``, and ``IMAGE``,
636 respectively. This is the long-standing behavior of these types.
637
638* The flag begins with the value ``None``, before a database connection is
639 established. If the dialect is used to render DDL without the flag being
640 set, it is interpreted the same as ``False``.
641
642* On first connection, the dialect detects if SQL Server version 2012 or
643 greater is in use; if the flag is still at ``None``, it sets it to ``True``
644 or ``False`` based on whether 2012 or greater is detected.
645
646* The flag can be set to either ``True`` or ``False`` when the dialect
647 is created, typically via :func:`_sa.create_engine`::
648
649 eng = create_engine(
650 "mssql+pymssql://user:pass@host/db", deprecate_large_types=True
651 )
652
653* Complete control over whether the "old" or "new" types are rendered is
654 available in all SQLAlchemy versions by using the UPPERCASE type objects
655 instead: :class:`_types.NVARCHAR`, :class:`_types.VARCHAR`,
656 :class:`_types.VARBINARY`, :class:`_types.TEXT`, :class:`_mssql.NTEXT`,
657 :class:`_mssql.IMAGE`
658 will always remain fixed and always output exactly that
659 type.
660
661.. _multipart_schema_names:
662
663Multipart Schema Names
664----------------------
665
666SQL Server schemas sometimes require multiple parts to their "schema"
667qualifier, that is, including the database name and owner name as separate
668tokens, such as ``mydatabase.dbo.some_table``. These multipart names can be set
669at once using the :paramref:`_schema.Table.schema` argument of
670:class:`_schema.Table`::
671
672 Table(
673 "some_table",
674 metadata,
675 Column("q", String(50)),
676 schema="mydatabase.dbo",
677 )
678
679When performing operations such as table or component reflection, a schema
680argument that contains a dot will be split into separate
681"database" and "owner" components in order to correctly query the SQL
682Server information schema tables, as these two values are stored separately.
683Additionally, when rendering the schema name for DDL or SQL, the two
684components will be quoted separately for case sensitive names and other
685special characters. Given an argument as below::
686
687 Table(
688 "some_table",
689 metadata,
690 Column("q", String(50)),
691 schema="MyDataBase.dbo",
692 )
693
694The above schema would be rendered as ``[MyDataBase].dbo``, and also in
695reflection, would be reflected using "dbo" as the owner and "MyDataBase"
696as the database name.
697
698To control how the schema name is broken into database / owner,
699specify brackets (which in SQL Server are quoting characters) in the name.
700Below, the "owner" will be considered as ``MyDataBase.dbo`` and the
701"database" will be None::
702
703 Table(
704 "some_table",
705 metadata,
706 Column("q", String(50)),
707 schema="[MyDataBase.dbo]",
708 )
709
710To individually specify both database and owner name with special characters
711or embedded dots, use two sets of brackets::
712
713 Table(
714 "some_table",
715 metadata,
716 Column("q", String(50)),
717 schema="[MyDataBase.Period].[MyOwner.Dot]",
718 )
719
720.. versionchanged:: 1.2 the SQL Server dialect now treats brackets as
721 identifier delimiters splitting the schema into separate database
722 and owner tokens, to allow dots within either name itself.
723
724.. _legacy_schema_rendering:
725
726Legacy Schema Mode
727------------------
728
729Very old versions of the MSSQL dialect introduced the behavior such that a
730schema-qualified table would be auto-aliased when used in a
731SELECT statement; given a table::
732
733 account_table = Table(
734 "account",
735 metadata,
736 Column("id", Integer, primary_key=True),
737 Column("info", String(100)),
738 schema="customer_schema",
739 )
740
741this legacy mode of rendering would assume that "customer_schema.account"
742would not be accepted by all parts of the SQL statement, as illustrated
743below:
744
745.. sourcecode:: pycon+sql
746
747 >>> eng = create_engine("mssql+pymssql://mydsn", legacy_schema_aliasing=True)
748 >>> print(account_table.select().compile(eng))
749 {printsql}SELECT account_1.id, account_1.info
750 FROM customer_schema.account AS account_1
751
752This mode of behavior is now off by default, as it appears to have served
753no purpose; however in the case that legacy applications rely upon it,
754it is available using the ``legacy_schema_aliasing`` argument to
755:func:`_sa.create_engine` as illustrated above.
756
757.. deprecated:: 1.4
758
759 The ``legacy_schema_aliasing`` flag is now
760 deprecated and will be removed in a future release.
761
762.. _mssql_indexes:
763
764Clustered Index Support
765-----------------------
766
767The MSSQL dialect supports clustered indexes (and primary keys) via the
768``mssql_clustered`` option. This option is available to :class:`.Index`,
769:class:`.UniqueConstraint`. and :class:`.PrimaryKeyConstraint`.
770For indexes this option can be combined with the ``mssql_columnstore`` one
771to create a clustered columnstore index.
772
773To generate a clustered index::
774
775 Index("my_index", table.c.x, mssql_clustered=True)
776
777which renders the index as ``CREATE CLUSTERED INDEX my_index ON table (x)``.
778
779To generate a clustered primary key use::
780
781 Table(
782 "my_table",
783 metadata,
784 Column("x", ...),
785 Column("y", ...),
786 PrimaryKeyConstraint("x", "y", mssql_clustered=True),
787 )
788
789which will render the table, for example, as:
790
791.. sourcecode:: sql
792
793 CREATE TABLE my_table (
794 x INTEGER NOT NULL,
795 y INTEGER NOT NULL,
796 PRIMARY KEY CLUSTERED (x, y)
797 )
798
799Similarly, we can generate a clustered unique constraint using::
800
801 Table(
802 "my_table",
803 metadata,
804 Column("x", ...),
805 Column("y", ...),
806 PrimaryKeyConstraint("x"),
807 UniqueConstraint("y", mssql_clustered=True),
808 )
809
810To explicitly request a non-clustered primary key (for example, when
811a separate clustered index is desired), use::
812
813 Table(
814 "my_table",
815 metadata,
816 Column("x", ...),
817 Column("y", ...),
818 PrimaryKeyConstraint("x", "y", mssql_clustered=False),
819 )
820
821which will render the table, for example, as:
822
823.. sourcecode:: sql
824
825 CREATE TABLE my_table (
826 x INTEGER NOT NULL,
827 y INTEGER NOT NULL,
828 PRIMARY KEY NONCLUSTERED (x, y)
829 )
830
831Columnstore Index Support
832-------------------------
833
834The MSSQL dialect supports columnstore indexes via the ``mssql_columnstore``
835option. This option is available to :class:`.Index`. It be combined with
836the ``mssql_clustered`` option to create a clustered columnstore index.
837
838To generate a columnstore index::
839
840 Index("my_index", table.c.x, mssql_columnstore=True)
841
842which renders the index as ``CREATE COLUMNSTORE INDEX my_index ON table (x)``.
843
844To generate a clustered columnstore index provide no columns::
845
846 idx = Index("my_index", mssql_clustered=True, mssql_columnstore=True)
847 # required to associate the index with the table
848 table.append_constraint(idx)
849
850the above renders the index as
851``CREATE CLUSTERED COLUMNSTORE INDEX my_index ON table``.
852
853.. versionadded:: 2.0.18
854
855MSSQL-Specific Index Options
856-----------------------------
857
858In addition to clustering, the MSSQL dialect supports other special options
859for :class:`.Index`.
860
861INCLUDE
862^^^^^^^
863
864The ``mssql_include`` option renders INCLUDE(colname) for the given string
865names::
866
867 Index("my_index", table.c.x, mssql_include=["y"])
868
869would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)``
870
871.. _mssql_index_where:
872
873Filtered Indexes
874^^^^^^^^^^^^^^^^
875
876The ``mssql_where`` option renders WHERE(condition) for the given string
877names::
878
879 Index("my_index", table.c.x, mssql_where=table.c.x > 10)
880
881would render the index as ``CREATE INDEX my_index ON table (x) WHERE x > 10``.
882
883.. versionadded:: 1.3.4
884
885Index ordering
886^^^^^^^^^^^^^^
887
888Index ordering is available via functional expressions, such as::
889
890 Index("my_index", table.c.x.desc())
891
892would render the index as ``CREATE INDEX my_index ON table (x DESC)``
893
894.. seealso::
895
896 :ref:`schema_indexes_functional`
897
898Compatibility Levels
899--------------------
900MSSQL supports the notion of setting compatibility levels at the
901database level. This allows, for instance, to run a database that
902is compatible with SQL2000 while running on a SQL2005 database
903server. ``server_version_info`` will always return the database
904server version information (in this case SQL2005) and not the
905compatibility level information. Because of this, if running under
906a backwards compatibility mode SQLAlchemy may attempt to use T-SQL
907statements that are unable to be parsed by the database server.
908
909.. _mssql_triggers:
910
911Triggers
912--------
913
914SQLAlchemy by default uses OUTPUT INSERTED to get at newly
915generated primary key values via IDENTITY columns or other
916server side defaults. MS-SQL does not
917allow the usage of OUTPUT INSERTED on tables that have triggers.
918To disable the usage of OUTPUT INSERTED on a per-table basis,
919specify ``implicit_returning=False`` for each :class:`_schema.Table`
920which has triggers::
921
922 Table(
923 "mytable",
924 metadata,
925 Column("id", Integer, primary_key=True),
926 # ...,
927 implicit_returning=False,
928 )
929
930Declarative form::
931
932 class MyClass(Base):
933 # ...
934 __table_args__ = {"implicit_returning": False}
935
936.. _mssql_rowcount_versioning:
937
938Rowcount Support / ORM Versioning
939---------------------------------
940
941The SQL Server drivers may have limited ability to return the number
942of rows updated from an UPDATE or DELETE statement.
943
944As of this writing, the PyODBC driver is not able to return a rowcount when
945OUTPUT INSERTED is used. Previous versions of SQLAlchemy therefore had
946limitations for features such as the "ORM Versioning" feature that relies upon
947accurate rowcounts in order to match version numbers with matched rows.
948
949SQLAlchemy 2.0 now retrieves the "rowcount" manually for these particular use
950cases based on counting the rows that arrived back within RETURNING; so while
951the driver still has this limitation, the ORM Versioning feature is no longer
952impacted by it. As of SQLAlchemy 2.0.5, ORM versioning has been fully
953re-enabled for the pyodbc driver.
954
955.. versionchanged:: 2.0.5 ORM versioning support is restored for the pyodbc
956 driver. Previously, a warning would be emitted during ORM flush that
957 versioning was not supported.
958
959
960Enabling Snapshot Isolation
961---------------------------
962
963SQL Server has a default transaction
964isolation mode that locks entire tables, and causes even mildly concurrent
965applications to have long held locks and frequent deadlocks.
966Enabling snapshot isolation for the database as a whole is recommended
967for modern levels of concurrency support. This is accomplished via the
968following ALTER DATABASE commands executed at the SQL prompt:
969
970.. sourcecode:: sql
971
972 ALTER DATABASE MyDatabase SET ALLOW_SNAPSHOT_ISOLATION ON
973
974 ALTER DATABASE MyDatabase SET READ_COMMITTED_SNAPSHOT ON
975
976Background on SQL Server snapshot isolation is available at
977https://msdn.microsoft.com/en-us/library/ms175095.aspx.
978
979""" # noqa
980
981from __future__ import annotations
982
983import codecs
984import datetime
985import operator
986import re
987from typing import overload
988from typing import TYPE_CHECKING
989from uuid import UUID as _python_UUID
990
991from . import information_schema as ischema
992from .json import JSON
993from .json import JSONIndexType
994from .json import JSONPathType
995from ... import exc
996from ... import Identity
997from ... import schema as sa_schema
998from ... import Sequence
999from ... import sql
1000from ... import text
1001from ... import util
1002from ...engine import cursor as _cursor
1003from ...engine import default
1004from ...engine import reflection
1005from ...engine.reflection import ReflectionDefaults
1006from ...sql import coercions
1007from ...sql import compiler
1008from ...sql import elements
1009from ...sql import expression
1010from ...sql import func
1011from ...sql import quoted_name
1012from ...sql import roles
1013from ...sql import sqltypes
1014from ...sql import try_cast as try_cast # noqa: F401
1015from ...sql import util as sql_util
1016from ...sql._typing import is_sql_compiler
1017from ...sql.compiler import InsertmanyvaluesSentinelOpts
1018from ...sql.elements import TryCast as TryCast # noqa: F401
1019from ...types import BIGINT
1020from ...types import BINARY
1021from ...types import CHAR
1022from ...types import DATE
1023from ...types import DATETIME
1024from ...types import DECIMAL
1025from ...types import FLOAT
1026from ...types import INTEGER
1027from ...types import NCHAR
1028from ...types import NUMERIC
1029from ...types import NVARCHAR
1030from ...types import SMALLINT
1031from ...types import TEXT
1032from ...types import VARCHAR
1033from ...util import update_wrapper
1034from ...util.typing import Literal
1035
1036if TYPE_CHECKING:
1037 from ...sql.dml import DMLState
1038 from ...sql.selectable import TableClause
1039
1040# https://sqlserverbuilds.blogspot.com/
1041MS_2017_VERSION = (14,)
1042MS_2016_VERSION = (13,)
1043MS_2014_VERSION = (12,)
1044MS_2012_VERSION = (11,)
1045MS_2008_VERSION = (10,)
1046MS_2005_VERSION = (9,)
1047MS_2000_VERSION = (8,)
1048
1049RESERVED_WORDS = {
1050 "add",
1051 "all",
1052 "alter",
1053 "and",
1054 "any",
1055 "as",
1056 "asc",
1057 "authorization",
1058 "backup",
1059 "begin",
1060 "between",
1061 "break",
1062 "browse",
1063 "bulk",
1064 "by",
1065 "cascade",
1066 "case",
1067 "check",
1068 "checkpoint",
1069 "close",
1070 "clustered",
1071 "coalesce",
1072 "collate",
1073 "column",
1074 "commit",
1075 "compute",
1076 "constraint",
1077 "contains",
1078 "containstable",
1079 "continue",
1080 "convert",
1081 "create",
1082 "cross",
1083 "current",
1084 "current_date",
1085 "current_time",
1086 "current_timestamp",
1087 "current_user",
1088 "cursor",
1089 "database",
1090 "dbcc",
1091 "deallocate",
1092 "declare",
1093 "default",
1094 "delete",
1095 "deny",
1096 "desc",
1097 "disk",
1098 "distinct",
1099 "distributed",
1100 "double",
1101 "drop",
1102 "dump",
1103 "else",
1104 "end",
1105 "errlvl",
1106 "escape",
1107 "except",
1108 "exec",
1109 "execute",
1110 "exists",
1111 "exit",
1112 "external",
1113 "fetch",
1114 "file",
1115 "fillfactor",
1116 "for",
1117 "foreign",
1118 "freetext",
1119 "freetexttable",
1120 "from",
1121 "full",
1122 "function",
1123 "goto",
1124 "grant",
1125 "group",
1126 "having",
1127 "holdlock",
1128 "identity",
1129 "identity_insert",
1130 "identitycol",
1131 "if",
1132 "in",
1133 "index",
1134 "inner",
1135 "insert",
1136 "intersect",
1137 "into",
1138 "is",
1139 "join",
1140 "key",
1141 "kill",
1142 "left",
1143 "like",
1144 "lineno",
1145 "load",
1146 "merge",
1147 "national",
1148 "nocheck",
1149 "nonclustered",
1150 "not",
1151 "null",
1152 "nullif",
1153 "of",
1154 "off",
1155 "offsets",
1156 "on",
1157 "open",
1158 "opendatasource",
1159 "openquery",
1160 "openrowset",
1161 "openxml",
1162 "option",
1163 "or",
1164 "order",
1165 "outer",
1166 "over",
1167 "percent",
1168 "pivot",
1169 "plan",
1170 "precision",
1171 "primary",
1172 "print",
1173 "proc",
1174 "procedure",
1175 "public",
1176 "raiserror",
1177 "read",
1178 "readtext",
1179 "reconfigure",
1180 "references",
1181 "replication",
1182 "restore",
1183 "restrict",
1184 "return",
1185 "revert",
1186 "revoke",
1187 "right",
1188 "rollback",
1189 "rowcount",
1190 "rowguidcol",
1191 "rule",
1192 "save",
1193 "schema",
1194 "securityaudit",
1195 "select",
1196 "session_user",
1197 "set",
1198 "setuser",
1199 "shutdown",
1200 "some",
1201 "statistics",
1202 "system_user",
1203 "table",
1204 "tablesample",
1205 "textsize",
1206 "then",
1207 "to",
1208 "top",
1209 "tran",
1210 "transaction",
1211 "trigger",
1212 "truncate",
1213 "tsequal",
1214 "union",
1215 "unique",
1216 "unpivot",
1217 "update",
1218 "updatetext",
1219 "use",
1220 "user",
1221 "values",
1222 "varying",
1223 "view",
1224 "waitfor",
1225 "when",
1226 "where",
1227 "while",
1228 "with",
1229 "writetext",
1230}
1231
1232
1233class REAL(sqltypes.REAL):
1234 """the SQL Server REAL datatype."""
1235
1236 def __init__(self, **kw):
1237 # REAL is a synonym for FLOAT(24) on SQL server.
1238 # it is only accepted as the word "REAL" in DDL, the numeric
1239 # precision value is not allowed to be present
1240 kw.setdefault("precision", 24)
1241 super().__init__(**kw)
1242
1243
1244class DOUBLE_PRECISION(sqltypes.DOUBLE_PRECISION):
1245 """the SQL Server DOUBLE PRECISION datatype.
1246
1247 .. versionadded:: 2.0.11
1248
1249 """
1250
1251 def __init__(self, **kw):
1252 # DOUBLE PRECISION is a synonym for FLOAT(53) on SQL server.
1253 # it is only accepted as the word "DOUBLE PRECISION" in DDL,
1254 # the numeric precision value is not allowed to be present
1255 kw.setdefault("precision", 53)
1256 super().__init__(**kw)
1257
1258
1259class TINYINT(sqltypes.Integer):
1260 __visit_name__ = "TINYINT"
1261
1262
1263# MSSQL DATE/TIME types have varied behavior, sometimes returning
1264# strings. MSDate/TIME check for everything, and always
1265# filter bind parameters into datetime objects (required by pyodbc,
1266# not sure about other dialects).
1267
1268
1269class _MSDate(sqltypes.Date):
1270 def bind_processor(self, dialect):
1271 def process(value):
1272 if type(value) == datetime.date:
1273 return datetime.datetime(value.year, value.month, value.day)
1274 else:
1275 return value
1276
1277 return process
1278
1279 _reg = re.compile(r"(\d+)-(\d+)-(\d+)")
1280
1281 def result_processor(self, dialect, coltype):
1282 def process(value):
1283 if isinstance(value, datetime.datetime):
1284 return value.date()
1285 elif isinstance(value, str):
1286 m = self._reg.match(value)
1287 if not m:
1288 raise ValueError(
1289 "could not parse %r as a date value" % (value,)
1290 )
1291 return datetime.date(*[int(x or 0) for x in m.groups()])
1292 else:
1293 return value
1294
1295 return process
1296
1297
1298class TIME(sqltypes.TIME):
1299 def __init__(self, precision=None, **kwargs):
1300 self.precision = precision
1301 super().__init__()
1302
1303 __zero_date = datetime.date(1900, 1, 1)
1304
1305 def bind_processor(self, dialect):
1306 def process(value):
1307 if isinstance(value, datetime.datetime):
1308 value = datetime.datetime.combine(
1309 self.__zero_date, value.time()
1310 )
1311 elif isinstance(value, datetime.time):
1312 """issue #5339
1313 per: https://github.com/mkleehammer/pyodbc/wiki/Tips-and-Tricks-by-Database-Platform#time-columns
1314 pass TIME value as string
1315 """ # noqa
1316 value = str(value)
1317 return value
1318
1319 return process
1320
1321 _reg = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d{0,6}))?")
1322
1323 def result_processor(self, dialect, coltype):
1324 def process(value):
1325 if isinstance(value, datetime.datetime):
1326 return value.time()
1327 elif isinstance(value, str):
1328 m = self._reg.match(value)
1329 if not m:
1330 raise ValueError(
1331 "could not parse %r as a time value" % (value,)
1332 )
1333 return datetime.time(*[int(x or 0) for x in m.groups()])
1334 else:
1335 return value
1336
1337 return process
1338
1339
1340_MSTime = TIME
1341
1342
1343class _BASETIMEIMPL(TIME):
1344 __visit_name__ = "_BASETIMEIMPL"
1345
1346
1347class _DateTimeBase:
1348 def bind_processor(self, dialect):
1349 def process(value):
1350 if type(value) == datetime.date:
1351 return datetime.datetime(value.year, value.month, value.day)
1352 else:
1353 return value
1354
1355 return process
1356
1357
1358class _MSDateTime(_DateTimeBase, sqltypes.DateTime):
1359 pass
1360
1361
1362class SMALLDATETIME(_DateTimeBase, sqltypes.DateTime):
1363 __visit_name__ = "SMALLDATETIME"
1364
1365
1366class DATETIME2(_DateTimeBase, sqltypes.DateTime):
1367 __visit_name__ = "DATETIME2"
1368
1369 def __init__(self, precision=None, **kw):
1370 super().__init__(**kw)
1371 self.precision = precision
1372
1373
1374class DATETIMEOFFSET(_DateTimeBase, sqltypes.DateTime):
1375 __visit_name__ = "DATETIMEOFFSET"
1376
1377 def __init__(self, precision=None, **kw):
1378 super().__init__(**kw)
1379 self.precision = precision
1380
1381
1382class _UnicodeLiteral:
1383 def literal_processor(self, dialect):
1384 def process(value):
1385 value = value.replace("'", "''")
1386
1387 if dialect.identifier_preparer._double_percents:
1388 value = value.replace("%", "%%")
1389
1390 return "N'%s'" % value
1391
1392 return process
1393
1394
1395class _MSUnicode(_UnicodeLiteral, sqltypes.Unicode):
1396 pass
1397
1398
1399class _MSUnicodeText(_UnicodeLiteral, sqltypes.UnicodeText):
1400 pass
1401
1402
1403class TIMESTAMP(sqltypes._Binary):
1404 """Implement the SQL Server TIMESTAMP type.
1405
1406 Note this is **completely different** than the SQL Standard
1407 TIMESTAMP type, which is not supported by SQL Server. It
1408 is a read-only datatype that does not support INSERT of values.
1409
1410 .. versionadded:: 1.2
1411
1412 .. seealso::
1413
1414 :class:`_mssql.ROWVERSION`
1415
1416 """
1417
1418 __visit_name__ = "TIMESTAMP"
1419
1420 # expected by _Binary to be present
1421 length = None
1422
1423 def __init__(self, convert_int=False):
1424 """Construct a TIMESTAMP or ROWVERSION type.
1425
1426 :param convert_int: if True, binary integer values will
1427 be converted to integers on read.
1428
1429 .. versionadded:: 1.2
1430
1431 """
1432 self.convert_int = convert_int
1433
1434 def result_processor(self, dialect, coltype):
1435 super_ = super().result_processor(dialect, coltype)
1436 if self.convert_int:
1437
1438 def process(value):
1439 if super_:
1440 value = super_(value)
1441 if value is not None:
1442 # https://stackoverflow.com/a/30403242/34549
1443 value = int(codecs.encode(value, "hex"), 16)
1444 return value
1445
1446 return process
1447 else:
1448 return super_
1449
1450
1451class ROWVERSION(TIMESTAMP):
1452 """Implement the SQL Server ROWVERSION type.
1453
1454 The ROWVERSION datatype is a SQL Server synonym for the TIMESTAMP
1455 datatype, however current SQL Server documentation suggests using
1456 ROWVERSION for new datatypes going forward.
1457
1458 The ROWVERSION datatype does **not** reflect (e.g. introspect) from the
1459 database as itself; the returned datatype will be
1460 :class:`_mssql.TIMESTAMP`.
1461
1462 This is a read-only datatype that does not support INSERT of values.
1463
1464 .. versionadded:: 1.2
1465
1466 .. seealso::
1467
1468 :class:`_mssql.TIMESTAMP`
1469
1470 """
1471
1472 __visit_name__ = "ROWVERSION"
1473
1474
1475class NTEXT(sqltypes.UnicodeText):
1476 """MSSQL NTEXT type, for variable-length unicode text up to 2^30
1477 characters."""
1478
1479 __visit_name__ = "NTEXT"
1480
1481
1482class VARBINARY(sqltypes.VARBINARY, sqltypes.LargeBinary):
1483 """The MSSQL VARBINARY type.
1484
1485 This type adds additional features to the core :class:`_types.VARBINARY`
1486 type, including "deprecate_large_types" mode where
1487 either ``VARBINARY(max)`` or IMAGE is rendered, as well as the SQL
1488 Server ``FILESTREAM`` option.
1489
1490 .. seealso::
1491
1492 :ref:`mssql_large_type_deprecation`
1493
1494 """
1495
1496 __visit_name__ = "VARBINARY"
1497
1498 def __init__(self, length=None, filestream=False):
1499 """
1500 Construct a VARBINARY type.
1501
1502 :param length: optional, a length for the column for use in
1503 DDL statements, for those binary types that accept a length,
1504 such as the MySQL BLOB type.
1505
1506 :param filestream=False: if True, renders the ``FILESTREAM`` keyword
1507 in the table definition. In this case ``length`` must be ``None``
1508 or ``'max'``.
1509
1510 .. versionadded:: 1.4.31
1511
1512 """
1513
1514 self.filestream = filestream
1515 if self.filestream and length not in (None, "max"):
1516 raise ValueError(
1517 "length must be None or 'max' when setting filestream"
1518 )
1519 super().__init__(length=length)
1520
1521
1522class IMAGE(sqltypes.LargeBinary):
1523 __visit_name__ = "IMAGE"
1524
1525
1526class XML(sqltypes.Text):
1527 """MSSQL XML type.
1528
1529 This is a placeholder type for reflection purposes that does not include
1530 any Python-side datatype support. It also does not currently support
1531 additional arguments, such as "CONTENT", "DOCUMENT",
1532 "xml_schema_collection".
1533
1534 """
1535
1536 __visit_name__ = "XML"
1537
1538
1539class BIT(sqltypes.Boolean):
1540 """MSSQL BIT type.
1541
1542 Both pyodbc and pymssql return values from BIT columns as
1543 Python <class 'bool'> so just subclass Boolean.
1544
1545 """
1546
1547 __visit_name__ = "BIT"
1548
1549
1550class MONEY(sqltypes.TypeEngine):
1551 __visit_name__ = "MONEY"
1552
1553
1554class SMALLMONEY(sqltypes.TypeEngine):
1555 __visit_name__ = "SMALLMONEY"
1556
1557
1558class MSUUid(sqltypes.Uuid):
1559 def bind_processor(self, dialect):
1560 if self.native_uuid:
1561 # this is currently assuming pyodbc; might not work for
1562 # some other mssql driver
1563 return None
1564 else:
1565 if self.as_uuid:
1566
1567 def process(value):
1568 if value is not None:
1569 value = value.hex
1570 return value
1571
1572 return process
1573 else:
1574
1575 def process(value):
1576 if value is not None:
1577 value = value.replace("-", "").replace("''", "'")
1578 return value
1579
1580 return process
1581
1582 def literal_processor(self, dialect):
1583 if self.native_uuid:
1584
1585 def process(value):
1586 return f"""'{str(value).replace("''", "'")}'"""
1587
1588 return process
1589 else:
1590 if self.as_uuid:
1591
1592 def process(value):
1593 return f"""'{value.hex}'"""
1594
1595 return process
1596 else:
1597
1598 def process(value):
1599 return f"""'{
1600 value.replace("-", "").replace("'", "''")
1601 }'"""
1602
1603 return process
1604
1605
1606class UNIQUEIDENTIFIER(sqltypes.Uuid[sqltypes._UUID_RETURN]):
1607 __visit_name__ = "UNIQUEIDENTIFIER"
1608
1609 @overload
1610 def __init__(
1611 self: UNIQUEIDENTIFIER[_python_UUID], as_uuid: Literal[True] = ...
1612 ): ...
1613
1614 @overload
1615 def __init__(
1616 self: UNIQUEIDENTIFIER[str], as_uuid: Literal[False] = ...
1617 ): ...
1618
1619 def __init__(self, as_uuid: bool = True):
1620 """Construct a :class:`_mssql.UNIQUEIDENTIFIER` type.
1621
1622
1623 :param as_uuid=True: if True, values will be interpreted
1624 as Python uuid objects, converting to/from string via the
1625 DBAPI.
1626
1627 .. versionchanged: 2.0 Added direct "uuid" support to the
1628 :class:`_mssql.UNIQUEIDENTIFIER` datatype; uuid interpretation
1629 defaults to ``True``.
1630
1631 """
1632 self.as_uuid = as_uuid
1633 self.native_uuid = True
1634
1635
1636class SQL_VARIANT(sqltypes.TypeEngine):
1637 __visit_name__ = "SQL_VARIANT"
1638
1639
1640# old names.
1641MSDateTime = _MSDateTime
1642MSDate = _MSDate
1643MSReal = REAL
1644MSTinyInteger = TINYINT
1645MSTime = TIME
1646MSSmallDateTime = SMALLDATETIME
1647MSDateTime2 = DATETIME2
1648MSDateTimeOffset = DATETIMEOFFSET
1649MSText = TEXT
1650MSNText = NTEXT
1651MSString = VARCHAR
1652MSNVarchar = NVARCHAR
1653MSChar = CHAR
1654MSNChar = NCHAR
1655MSBinary = BINARY
1656MSVarBinary = VARBINARY
1657MSImage = IMAGE
1658MSBit = BIT
1659MSMoney = MONEY
1660MSSmallMoney = SMALLMONEY
1661MSUniqueIdentifier = UNIQUEIDENTIFIER
1662MSVariant = SQL_VARIANT
1663
1664ischema_names = {
1665 "int": INTEGER,
1666 "bigint": BIGINT,
1667 "smallint": SMALLINT,
1668 "tinyint": TINYINT,
1669 "varchar": VARCHAR,
1670 "nvarchar": NVARCHAR,
1671 "char": CHAR,
1672 "nchar": NCHAR,
1673 "text": TEXT,
1674 "ntext": NTEXT,
1675 "decimal": DECIMAL,
1676 "numeric": NUMERIC,
1677 "float": FLOAT,
1678 "datetime": DATETIME,
1679 "datetime2": DATETIME2,
1680 "datetimeoffset": DATETIMEOFFSET,
1681 "date": DATE,
1682 "time": TIME,
1683 "smalldatetime": SMALLDATETIME,
1684 "binary": BINARY,
1685 "varbinary": VARBINARY,
1686 "bit": BIT,
1687 "real": REAL,
1688 "double precision": DOUBLE_PRECISION,
1689 "image": IMAGE,
1690 "xml": XML,
1691 "timestamp": TIMESTAMP,
1692 "money": MONEY,
1693 "smallmoney": SMALLMONEY,
1694 "uniqueidentifier": UNIQUEIDENTIFIER,
1695 "sql_variant": SQL_VARIANT,
1696}
1697
1698
1699class MSTypeCompiler(compiler.GenericTypeCompiler):
1700 def _extend(self, spec, type_, length=None):
1701 """Extend a string-type declaration with standard SQL
1702 COLLATE annotations.
1703
1704 """
1705
1706 if getattr(type_, "collation", None):
1707 collation = "COLLATE %s" % type_.collation
1708 else:
1709 collation = None
1710
1711 if not length:
1712 length = type_.length
1713
1714 if length:
1715 spec = spec + "(%s)" % length
1716
1717 return " ".join([c for c in (spec, collation) if c is not None])
1718
1719 def visit_double(self, type_, **kw):
1720 return self.visit_DOUBLE_PRECISION(type_, **kw)
1721
1722 def visit_FLOAT(self, type_, **kw):
1723 precision = getattr(type_, "precision", None)
1724 if precision is None:
1725 return "FLOAT"
1726 else:
1727 return "FLOAT(%(precision)s)" % {"precision": precision}
1728
1729 def visit_TINYINT(self, type_, **kw):
1730 return "TINYINT"
1731
1732 def visit_TIME(self, type_, **kw):
1733 precision = getattr(type_, "precision", None)
1734 if precision is not None:
1735 return "TIME(%s)" % precision
1736 else:
1737 return "TIME"
1738
1739 def visit_TIMESTAMP(self, type_, **kw):
1740 return "TIMESTAMP"
1741
1742 def visit_ROWVERSION(self, type_, **kw):
1743 return "ROWVERSION"
1744
1745 def visit_datetime(self, type_, **kw):
1746 if type_.timezone:
1747 return self.visit_DATETIMEOFFSET(type_, **kw)
1748 else:
1749 return self.visit_DATETIME(type_, **kw)
1750
1751 def visit_DATETIMEOFFSET(self, type_, **kw):
1752 precision = getattr(type_, "precision", None)
1753 if precision is not None:
1754 return "DATETIMEOFFSET(%s)" % type_.precision
1755 else:
1756 return "DATETIMEOFFSET"
1757
1758 def visit_DATETIME2(self, type_, **kw):
1759 precision = getattr(type_, "precision", None)
1760 if precision is not None:
1761 return "DATETIME2(%s)" % precision
1762 else:
1763 return "DATETIME2"
1764
1765 def visit_SMALLDATETIME(self, type_, **kw):
1766 return "SMALLDATETIME"
1767
1768 def visit_unicode(self, type_, **kw):
1769 return self.visit_NVARCHAR(type_, **kw)
1770
1771 def visit_text(self, type_, **kw):
1772 if self.dialect.deprecate_large_types:
1773 return self.visit_VARCHAR(type_, **kw)
1774 else:
1775 return self.visit_TEXT(type_, **kw)
1776
1777 def visit_unicode_text(self, type_, **kw):
1778 if self.dialect.deprecate_large_types:
1779 return self.visit_NVARCHAR(type_, **kw)
1780 else:
1781 return self.visit_NTEXT(type_, **kw)
1782
1783 def visit_NTEXT(self, type_, **kw):
1784 return self._extend("NTEXT", type_)
1785
1786 def visit_TEXT(self, type_, **kw):
1787 return self._extend("TEXT", type_)
1788
1789 def visit_VARCHAR(self, type_, **kw):
1790 return self._extend("VARCHAR", type_, length=type_.length or "max")
1791
1792 def visit_CHAR(self, type_, **kw):
1793 return self._extend("CHAR", type_)
1794
1795 def visit_NCHAR(self, type_, **kw):
1796 return self._extend("NCHAR", type_)
1797
1798 def visit_NVARCHAR(self, type_, **kw):
1799 return self._extend("NVARCHAR", type_, length=type_.length or "max")
1800
1801 def visit_date(self, type_, **kw):
1802 if self.dialect.server_version_info < MS_2008_VERSION:
1803 return self.visit_DATETIME(type_, **kw)
1804 else:
1805 return self.visit_DATE(type_, **kw)
1806
1807 def visit__BASETIMEIMPL(self, type_, **kw):
1808 return self.visit_time(type_, **kw)
1809
1810 def visit_time(self, type_, **kw):
1811 if self.dialect.server_version_info < MS_2008_VERSION:
1812 return self.visit_DATETIME(type_, **kw)
1813 else:
1814 return self.visit_TIME(type_, **kw)
1815
1816 def visit_large_binary(self, type_, **kw):
1817 if self.dialect.deprecate_large_types:
1818 return self.visit_VARBINARY(type_, **kw)
1819 else:
1820 return self.visit_IMAGE(type_, **kw)
1821
1822 def visit_IMAGE(self, type_, **kw):
1823 return "IMAGE"
1824
1825 def visit_XML(self, type_, **kw):
1826 return "XML"
1827
1828 def visit_VARBINARY(self, type_, **kw):
1829 text = self._extend("VARBINARY", type_, length=type_.length or "max")
1830 if getattr(type_, "filestream", False):
1831 text += " FILESTREAM"
1832 return text
1833
1834 def visit_boolean(self, type_, **kw):
1835 return self.visit_BIT(type_)
1836
1837 def visit_BIT(self, type_, **kw):
1838 return "BIT"
1839
1840 def visit_JSON(self, type_, **kw):
1841 # this is a bit of a break with SQLAlchemy's convention of
1842 # "UPPERCASE name goes to UPPERCASE type name with no modification"
1843 return self._extend("NVARCHAR", type_, length="max")
1844
1845 def visit_MONEY(self, type_, **kw):
1846 return "MONEY"
1847
1848 def visit_SMALLMONEY(self, type_, **kw):
1849 return "SMALLMONEY"
1850
1851 def visit_uuid(self, type_, **kw):
1852 if type_.native_uuid:
1853 return self.visit_UNIQUEIDENTIFIER(type_, **kw)
1854 else:
1855 return super().visit_uuid(type_, **kw)
1856
1857 def visit_UNIQUEIDENTIFIER(self, type_, **kw):
1858 return "UNIQUEIDENTIFIER"
1859
1860 def visit_SQL_VARIANT(self, type_, **kw):
1861 return "SQL_VARIANT"
1862
1863
1864class MSExecutionContext(default.DefaultExecutionContext):
1865 _enable_identity_insert = False
1866 _select_lastrowid = False
1867 _lastrowid = None
1868
1869 dialect: MSDialect
1870
1871 def _opt_encode(self, statement):
1872 if self.compiled and self.compiled.schema_translate_map:
1873 rst = self.compiled.preparer._render_schema_translates
1874 statement = rst(statement, self.compiled.schema_translate_map)
1875
1876 return statement
1877
1878 def pre_exec(self):
1879 """Activate IDENTITY_INSERT if needed."""
1880
1881 if self.isinsert:
1882 if TYPE_CHECKING:
1883 assert is_sql_compiler(self.compiled)
1884 assert isinstance(self.compiled.compile_state, DMLState)
1885 assert isinstance(
1886 self.compiled.compile_state.dml_table, TableClause
1887 )
1888
1889 tbl = self.compiled.compile_state.dml_table
1890 id_column = tbl._autoincrement_column
1891
1892 if id_column is not None and (
1893 not isinstance(id_column.default, Sequence)
1894 ):
1895 insert_has_identity = True
1896 compile_state = self.compiled.dml_compile_state
1897 self._enable_identity_insert = (
1898 id_column.key in self.compiled_parameters[0]
1899 ) or (
1900 compile_state._dict_parameters
1901 and (id_column.key in compile_state._insert_col_keys)
1902 )
1903
1904 else:
1905 insert_has_identity = False
1906 self._enable_identity_insert = False
1907
1908 self._select_lastrowid = (
1909 not self.compiled.inline
1910 and insert_has_identity
1911 and not self.compiled.effective_returning
1912 and not self._enable_identity_insert
1913 and not self.executemany
1914 )
1915
1916 if self._enable_identity_insert:
1917 self.root_connection._cursor_execute(
1918 self.cursor,
1919 self._opt_encode(
1920 "SET IDENTITY_INSERT %s ON"
1921 % self.identifier_preparer.format_table(tbl)
1922 ),
1923 (),
1924 self,
1925 )
1926
1927 def post_exec(self):
1928 """Disable IDENTITY_INSERT if enabled."""
1929
1930 conn = self.root_connection
1931
1932 if self.isinsert or self.isupdate or self.isdelete:
1933 self._rowcount = self.cursor.rowcount
1934
1935 if self._select_lastrowid:
1936 if self.dialect.use_scope_identity:
1937 conn._cursor_execute(
1938 self.cursor,
1939 "SELECT scope_identity() AS lastrowid",
1940 (),
1941 self,
1942 )
1943 else:
1944 conn._cursor_execute(
1945 self.cursor, "SELECT @@identity AS lastrowid", (), self
1946 )
1947 # fetchall() ensures the cursor is consumed without closing it
1948 row = self.cursor.fetchall()[0]
1949 self._lastrowid = int(row[0])
1950
1951 self.cursor_fetch_strategy = _cursor._NO_CURSOR_DML
1952 elif (
1953 self.compiled is not None
1954 and is_sql_compiler(self.compiled)
1955 and self.compiled.effective_returning
1956 ):
1957 self.cursor_fetch_strategy = (
1958 _cursor.FullyBufferedCursorFetchStrategy(
1959 self.cursor,
1960 self.cursor.description,
1961 self.cursor.fetchall(),
1962 )
1963 )
1964
1965 if self._enable_identity_insert:
1966 if TYPE_CHECKING:
1967 assert is_sql_compiler(self.compiled)
1968 assert isinstance(self.compiled.compile_state, DMLState)
1969 assert isinstance(
1970 self.compiled.compile_state.dml_table, TableClause
1971 )
1972 conn._cursor_execute(
1973 self.cursor,
1974 self._opt_encode(
1975 "SET IDENTITY_INSERT %s OFF"
1976 % self.identifier_preparer.format_table(
1977 self.compiled.compile_state.dml_table
1978 )
1979 ),
1980 (),
1981 self,
1982 )
1983
1984 def get_lastrowid(self):
1985 return self._lastrowid
1986
1987 def handle_dbapi_exception(self, e):
1988 if self._enable_identity_insert:
1989 try:
1990 self.cursor.execute(
1991 self._opt_encode(
1992 "SET IDENTITY_INSERT %s OFF"
1993 % self.identifier_preparer.format_table(
1994 self.compiled.compile_state.dml_table
1995 )
1996 )
1997 )
1998 except Exception:
1999 pass
2000
2001 def fire_sequence(self, seq, type_):
2002 return self._execute_scalar(
2003 (
2004 "SELECT NEXT VALUE FOR %s"
2005 % self.identifier_preparer.format_sequence(seq)
2006 ),
2007 type_,
2008 )
2009
2010 def get_insert_default(self, column):
2011 if (
2012 isinstance(column, sa_schema.Column)
2013 and column is column.table._autoincrement_column
2014 and isinstance(column.default, sa_schema.Sequence)
2015 and column.default.optional
2016 ):
2017 return None
2018 return super().get_insert_default(column)
2019
2020
2021class MSSQLCompiler(compiler.SQLCompiler):
2022 returning_precedes_values = True
2023
2024 extract_map = util.update_copy(
2025 compiler.SQLCompiler.extract_map,
2026 {
2027 "doy": "dayofyear",
2028 "dow": "weekday",
2029 "milliseconds": "millisecond",
2030 "microseconds": "microsecond",
2031 },
2032 )
2033
2034 def __init__(self, *args, **kwargs):
2035 self.tablealiases = {}
2036 super().__init__(*args, **kwargs)
2037
2038 def _format_frame_clause(self, range_, **kw):
2039 kw["literal_execute"] = True
2040 return super()._format_frame_clause(range_, **kw)
2041
2042 def _with_legacy_schema_aliasing(fn):
2043 def decorate(self, *arg, **kw):
2044 if self.dialect.legacy_schema_aliasing:
2045 return fn(self, *arg, **kw)
2046 else:
2047 super_ = getattr(super(MSSQLCompiler, self), fn.__name__)
2048 return super_(*arg, **kw)
2049
2050 return decorate
2051
2052 def visit_now_func(self, fn, **kw):
2053 return "CURRENT_TIMESTAMP"
2054
2055 def visit_current_date_func(self, fn, **kw):
2056 return "GETDATE()"
2057
2058 def visit_length_func(self, fn, **kw):
2059 return "LEN%s" % self.function_argspec(fn, **kw)
2060
2061 def visit_char_length_func(self, fn, **kw):
2062 return "LEN%s" % self.function_argspec(fn, **kw)
2063
2064 def visit_aggregate_strings_func(self, fn, **kw):
2065 expr = fn.clauses.clauses[0]._compiler_dispatch(self, **kw)
2066 kw["literal_execute"] = True
2067 delimeter = fn.clauses.clauses[1]._compiler_dispatch(self, **kw)
2068 return f"string_agg({expr}, {delimeter})"
2069
2070 def visit_concat_op_expression_clauselist(
2071 self, clauselist, operator, **kw
2072 ):
2073 return " + ".join(self.process(elem, **kw) for elem in clauselist)
2074
2075 def visit_concat_op_binary(self, binary, operator, **kw):
2076 return "%s + %s" % (
2077 self.process(binary.left, **kw),
2078 self.process(binary.right, **kw),
2079 )
2080
2081 def visit_true(self, expr, **kw):
2082 return "1"
2083
2084 def visit_false(self, expr, **kw):
2085 return "0"
2086
2087 def visit_match_op_binary(self, binary, operator, **kw):
2088 return "CONTAINS (%s, %s)" % (
2089 self.process(binary.left, **kw),
2090 self.process(binary.right, **kw),
2091 )
2092
2093 def get_select_precolumns(self, select, **kw):
2094 """MS-SQL puts TOP, it's version of LIMIT here"""
2095
2096 s = super().get_select_precolumns(select, **kw)
2097
2098 if select._has_row_limiting_clause and self._use_top(select):
2099 # ODBC drivers and possibly others
2100 # don't support bind params in the SELECT clause on SQL Server.
2101 # so have to use literal here.
2102 kw["literal_execute"] = True
2103 s += "TOP %s " % self.process(
2104 self._get_limit_or_fetch(select), **kw
2105 )
2106 if select._fetch_clause is not None:
2107 if select._fetch_clause_options["percent"]:
2108 s += "PERCENT "
2109 if select._fetch_clause_options["with_ties"]:
2110 s += "WITH TIES "
2111
2112 return s
2113
2114 def get_from_hint_text(self, table, text):
2115 return text
2116
2117 def get_crud_hint_text(self, table, text):
2118 return text
2119
2120 def _get_limit_or_fetch(self, select):
2121 if select._fetch_clause is None:
2122 return select._limit_clause
2123 else:
2124 return select._fetch_clause
2125
2126 def _use_top(self, select):
2127 return (select._offset_clause is None) and (
2128 select._simple_int_clause(select._limit_clause)
2129 or (
2130 # limit can use TOP with is by itself. fetch only uses TOP
2131 # when it needs to because of PERCENT and/or WITH TIES
2132 # TODO: Why? shouldn't we use TOP always ?
2133 select._simple_int_clause(select._fetch_clause)
2134 and (
2135 select._fetch_clause_options["percent"]
2136 or select._fetch_clause_options["with_ties"]
2137 )
2138 )
2139 )
2140
2141 def limit_clause(self, cs, **kwargs):
2142 return ""
2143
2144 def _check_can_use_fetch_limit(self, select):
2145 # to use ROW_NUMBER(), an ORDER BY is required.
2146 # OFFSET are FETCH are options of the ORDER BY clause
2147 if not select._order_by_clause.clauses:
2148 raise exc.CompileError(
2149 "MSSQL requires an order_by when "
2150 "using an OFFSET or a non-simple "
2151 "LIMIT clause"
2152 )
2153
2154 if select._fetch_clause_options is not None and (
2155 select._fetch_clause_options["percent"]
2156 or select._fetch_clause_options["with_ties"]
2157 ):
2158 raise exc.CompileError(
2159 "MSSQL needs TOP to use PERCENT and/or WITH TIES. "
2160 "Only simple fetch without offset can be used."
2161 )
2162
2163 def _row_limit_clause(self, select, **kw):
2164 """MSSQL 2012 supports OFFSET/FETCH operators
2165 Use it instead subquery with row_number
2166
2167 """
2168
2169 if self.dialect._supports_offset_fetch and not self._use_top(select):
2170 self._check_can_use_fetch_limit(select)
2171
2172 return self.fetch_clause(
2173 select,
2174 fetch_clause=self._get_limit_or_fetch(select),
2175 require_offset=True,
2176 **kw,
2177 )
2178
2179 else:
2180 return ""
2181
2182 def visit_try_cast(self, element, **kw):
2183 return "TRY_CAST (%s AS %s)" % (
2184 self.process(element.clause, **kw),
2185 self.process(element.typeclause, **kw),
2186 )
2187
2188 def translate_select_structure(self, select_stmt, **kwargs):
2189 """Look for ``LIMIT`` and OFFSET in a select statement, and if
2190 so tries to wrap it in a subquery with ``row_number()`` criterion.
2191 MSSQL 2012 and above are excluded
2192
2193 """
2194 select = select_stmt
2195
2196 if (
2197 select._has_row_limiting_clause
2198 and not self.dialect._supports_offset_fetch
2199 and not self._use_top(select)
2200 and not getattr(select, "_mssql_visit", None)
2201 ):
2202 self._check_can_use_fetch_limit(select)
2203
2204 _order_by_clauses = [
2205 sql_util.unwrap_label_reference(elem)
2206 for elem in select._order_by_clause.clauses
2207 ]
2208
2209 limit_clause = self._get_limit_or_fetch(select)
2210 offset_clause = select._offset_clause
2211
2212 select = select._generate()
2213 select._mssql_visit = True
2214 select = (
2215 select.add_columns(
2216 sql.func.ROW_NUMBER()
2217 .over(order_by=_order_by_clauses)
2218 .label("mssql_rn")
2219 )
2220 .order_by(None)
2221 .alias()
2222 )
2223
2224 mssql_rn = sql.column("mssql_rn")
2225 limitselect = sql.select(
2226 *[c for c in select.c if c.key != "mssql_rn"]
2227 )
2228 if offset_clause is not None:
2229 limitselect = limitselect.where(mssql_rn > offset_clause)
2230 if limit_clause is not None:
2231 limitselect = limitselect.where(
2232 mssql_rn <= (limit_clause + offset_clause)
2233 )
2234 else:
2235 limitselect = limitselect.where(mssql_rn <= (limit_clause))
2236 return limitselect
2237 else:
2238 return select
2239
2240 @_with_legacy_schema_aliasing
2241 def visit_table(self, table, mssql_aliased=False, iscrud=False, **kwargs):
2242 if mssql_aliased is table or iscrud:
2243 return super().visit_table(table, **kwargs)
2244
2245 # alias schema-qualified tables
2246 alias = self._schema_aliased_table(table)
2247 if alias is not None:
2248 return self.process(alias, mssql_aliased=table, **kwargs)
2249 else:
2250 return super().visit_table(table, **kwargs)
2251
2252 @_with_legacy_schema_aliasing
2253 def visit_alias(self, alias, **kw):
2254 # translate for schema-qualified table aliases
2255 kw["mssql_aliased"] = alias.element
2256 return super().visit_alias(alias, **kw)
2257
2258 @_with_legacy_schema_aliasing
2259 def visit_column(self, column, add_to_result_map=None, **kw):
2260 if (
2261 column.table is not None
2262 and (not self.isupdate and not self.isdelete)
2263 or self.is_subquery()
2264 ):
2265 # translate for schema-qualified table aliases
2266 t = self._schema_aliased_table(column.table)
2267 if t is not None:
2268 converted = elements._corresponding_column_or_error(t, column)
2269 if add_to_result_map is not None:
2270 add_to_result_map(
2271 column.name,
2272 column.name,
2273 (column, column.name, column.key),
2274 column.type,
2275 )
2276
2277 return super().visit_column(converted, **kw)
2278
2279 return super().visit_column(
2280 column, add_to_result_map=add_to_result_map, **kw
2281 )
2282
2283 def _schema_aliased_table(self, table):
2284 if getattr(table, "schema", None) is not None:
2285 if table not in self.tablealiases:
2286 self.tablealiases[table] = table.alias()
2287 return self.tablealiases[table]
2288 else:
2289 return None
2290
2291 def visit_extract(self, extract, **kw):
2292 field = self.extract_map.get(extract.field, extract.field)
2293 return "DATEPART(%s, %s)" % (field, self.process(extract.expr, **kw))
2294
2295 def visit_savepoint(self, savepoint_stmt, **kw):
2296 return "SAVE TRANSACTION %s" % self.preparer.format_savepoint(
2297 savepoint_stmt
2298 )
2299
2300 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw):
2301 return "ROLLBACK TRANSACTION %s" % self.preparer.format_savepoint(
2302 savepoint_stmt
2303 )
2304
2305 def visit_binary(self, binary, **kwargs):
2306 """Move bind parameters to the right-hand side of an operator, where
2307 possible.
2308
2309 """
2310 if (
2311 isinstance(binary.left, expression.BindParameter)
2312 and binary.operator == operator.eq
2313 and not isinstance(binary.right, expression.BindParameter)
2314 ):
2315 return self.process(
2316 expression.BinaryExpression(
2317 binary.right, binary.left, binary.operator
2318 ),
2319 **kwargs,
2320 )
2321 return super().visit_binary(binary, **kwargs)
2322
2323 def returning_clause(
2324 self, stmt, returning_cols, *, populate_result_map, **kw
2325 ):
2326 # SQL server returning clause requires that the columns refer to
2327 # the virtual table names "inserted" or "deleted". Here, we make
2328 # a simple alias of our table with that name, and then adapt the
2329 # columns we have from the list of RETURNING columns to that new name
2330 # so that they render as "inserted.<colname>" / "deleted.<colname>".
2331
2332 if stmt.is_insert or stmt.is_update:
2333 target = stmt.table.alias("inserted")
2334 elif stmt.is_delete:
2335 target = stmt.table.alias("deleted")
2336 else:
2337 assert False, "expected Insert, Update or Delete statement"
2338
2339 adapter = sql_util.ClauseAdapter(target)
2340
2341 # adapter.traverse() takes a column from our target table and returns
2342 # the one that is linked to the "inserted" / "deleted" tables. So in
2343 # order to retrieve these values back from the result (e.g. like
2344 # row[column]), tell the compiler to also add the original unadapted
2345 # column to the result map. Before #4877, these were (unknowingly)
2346 # falling back using string name matching in the result set which
2347 # necessarily used an expensive KeyError in order to match.
2348
2349 columns = [
2350 self._label_returning_column(
2351 stmt,
2352 adapter.traverse(column),
2353 populate_result_map,
2354 {"result_map_targets": (column,)},
2355 fallback_label_name=fallback_label_name,
2356 column_is_repeated=repeated,
2357 name=name,
2358 proxy_name=proxy_name,
2359 **kw,
2360 )
2361 for (
2362 name,
2363 proxy_name,
2364 fallback_label_name,
2365 column,
2366 repeated,
2367 ) in stmt._generate_columns_plus_names(
2368 True, cols=expression._select_iterables(returning_cols)
2369 )
2370 ]
2371
2372 return "OUTPUT " + ", ".join(columns)
2373
2374 def get_cte_preamble(self, recursive):
2375 # SQL Server finds it too inconvenient to accept
2376 # an entirely optional, SQL standard specified,
2377 # "RECURSIVE" word with their "WITH",
2378 # so here we go
2379 return "WITH"
2380
2381 def label_select_column(self, select, column, asfrom):
2382 if isinstance(column, expression.Function):
2383 return column.label(None)
2384 else:
2385 return super().label_select_column(select, column, asfrom)
2386
2387 def for_update_clause(self, select, **kw):
2388 # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which
2389 # SQLAlchemy doesn't use
2390 return ""
2391
2392 def order_by_clause(self, select, **kw):
2393 # MSSQL only allows ORDER BY in subqueries if there is a LIMIT:
2394 # "The ORDER BY clause is invalid in views, inline functions,
2395 # derived tables, subqueries, and common table expressions,
2396 # unless TOP, OFFSET or FOR XML is also specified."
2397 if (
2398 self.is_subquery()
2399 and not self._use_top(select)
2400 and (
2401 select._offset is None
2402 or not self.dialect._supports_offset_fetch
2403 )
2404 ):
2405 # avoid processing the order by clause if we won't end up
2406 # using it, because we don't want all the bind params tacked
2407 # onto the positional list if that is what the dbapi requires
2408 return ""
2409
2410 order_by = self.process(select._order_by_clause, **kw)
2411
2412 if order_by:
2413 return " ORDER BY " + order_by
2414 else:
2415 return ""
2416
2417 def update_from_clause(
2418 self, update_stmt, from_table, extra_froms, from_hints, **kw
2419 ):
2420 """Render the UPDATE..FROM clause specific to MSSQL.
2421
2422 In MSSQL, if the UPDATE statement involves an alias of the table to
2423 be updated, then the table itself must be added to the FROM list as
2424 well. Otherwise, it is optional. Here, we add it regardless.
2425
2426 """
2427 return "FROM " + ", ".join(
2428 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw)
2429 for t in [from_table] + extra_froms
2430 )
2431
2432 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw):
2433 """If we have extra froms make sure we render any alias as hint."""
2434 ashint = False
2435 if extra_froms:
2436 ashint = True
2437 return from_table._compiler_dispatch(
2438 self, asfrom=True, iscrud=True, ashint=ashint, **kw
2439 )
2440
2441 def delete_extra_from_clause(
2442 self, delete_stmt, from_table, extra_froms, from_hints, **kw
2443 ):
2444 """Render the DELETE .. FROM clause specific to MSSQL.
2445
2446 Yes, it has the FROM keyword twice.
2447
2448 """
2449 return "FROM " + ", ".join(
2450 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw)
2451 for t in [from_table] + extra_froms
2452 )
2453
2454 def visit_empty_set_expr(self, type_, **kw):
2455 return "SELECT 1 WHERE 1!=1"
2456
2457 def visit_is_distinct_from_binary(self, binary, operator, **kw):
2458 return "NOT EXISTS (SELECT %s INTERSECT SELECT %s)" % (
2459 self.process(binary.left),
2460 self.process(binary.right),
2461 )
2462
2463 def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
2464 return "EXISTS (SELECT %s INTERSECT SELECT %s)" % (
2465 self.process(binary.left),
2466 self.process(binary.right),
2467 )
2468
2469 def _render_json_extract_from_binary(self, binary, operator, **kw):
2470 # note we are intentionally calling upon the process() calls in the
2471 # order in which they appear in the SQL String as this is used
2472 # by positional parameter rendering
2473
2474 if binary.type._type_affinity is sqltypes.JSON:
2475 return "JSON_QUERY(%s, %s)" % (
2476 self.process(binary.left, **kw),
2477 self.process(binary.right, **kw),
2478 )
2479
2480 # as with other dialects, start with an explicit test for NULL
2481 case_expression = "CASE JSON_VALUE(%s, %s) WHEN NULL THEN NULL" % (
2482 self.process(binary.left, **kw),
2483 self.process(binary.right, **kw),
2484 )
2485
2486 if binary.type._type_affinity is sqltypes.Integer:
2487 type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS INTEGER)" % (
2488 self.process(binary.left, **kw),
2489 self.process(binary.right, **kw),
2490 )
2491 elif binary.type._type_affinity is sqltypes.Numeric:
2492 type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS %s)" % (
2493 self.process(binary.left, **kw),
2494 self.process(binary.right, **kw),
2495 (
2496 "FLOAT"
2497 if isinstance(binary.type, sqltypes.Float)
2498 else "NUMERIC(%s, %s)"
2499 % (binary.type.precision, binary.type.scale)
2500 ),
2501 )
2502 elif binary.type._type_affinity is sqltypes.Boolean:
2503 # the NULL handling is particularly weird with boolean, so
2504 # explicitly return numeric (BIT) constants
2505 type_expression = (
2506 "WHEN 'true' THEN 1 WHEN 'false' THEN 0 ELSE NULL"
2507 )
2508 elif binary.type._type_affinity is sqltypes.String:
2509 # TODO: does this comment (from mysql) apply to here, too?
2510 # this fails with a JSON value that's a four byte unicode
2511 # string. SQLite has the same problem at the moment
2512 type_expression = "ELSE JSON_VALUE(%s, %s)" % (
2513 self.process(binary.left, **kw),
2514 self.process(binary.right, **kw),
2515 )
2516 else:
2517 # other affinity....this is not expected right now
2518 type_expression = "ELSE JSON_QUERY(%s, %s)" % (
2519 self.process(binary.left, **kw),
2520 self.process(binary.right, **kw),
2521 )
2522
2523 return case_expression + " " + type_expression + " END"
2524
2525 def visit_json_getitem_op_binary(self, binary, operator, **kw):
2526 return self._render_json_extract_from_binary(binary, operator, **kw)
2527
2528 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
2529 return self._render_json_extract_from_binary(binary, operator, **kw)
2530
2531 def visit_sequence(self, seq, **kw):
2532 return "NEXT VALUE FOR %s" % self.preparer.format_sequence(seq)
2533
2534
2535class MSSQLStrictCompiler(MSSQLCompiler):
2536 """A subclass of MSSQLCompiler which disables the usage of bind
2537 parameters where not allowed natively by MS-SQL.
2538
2539 A dialect may use this compiler on a platform where native
2540 binds are used.
2541
2542 """
2543
2544 ansi_bind_rules = True
2545
2546 def visit_in_op_binary(self, binary, operator, **kw):
2547 kw["literal_execute"] = True
2548 return "%s IN %s" % (
2549 self.process(binary.left, **kw),
2550 self.process(binary.right, **kw),
2551 )
2552
2553 def visit_not_in_op_binary(self, binary, operator, **kw):
2554 kw["literal_execute"] = True
2555 return "%s NOT IN %s" % (
2556 self.process(binary.left, **kw),
2557 self.process(binary.right, **kw),
2558 )
2559
2560 def render_literal_value(self, value, type_):
2561 """
2562 For date and datetime values, convert to a string
2563 format acceptable to MSSQL. That seems to be the
2564 so-called ODBC canonical date format which looks
2565 like this:
2566
2567 yyyy-mm-dd hh:mi:ss.mmm(24h)
2568
2569 For other data types, call the base class implementation.
2570 """
2571 # datetime and date are both subclasses of datetime.date
2572 if issubclass(type(value), datetime.date):
2573 # SQL Server wants single quotes around the date string.
2574 return "'" + str(value) + "'"
2575 else:
2576 return super().render_literal_value(value, type_)
2577
2578
2579class MSDDLCompiler(compiler.DDLCompiler):
2580 def get_column_specification(self, column, **kwargs):
2581 colspec = self.preparer.format_column(column)
2582
2583 # type is not accepted in a computed column
2584 if column.computed is not None:
2585 colspec += " " + self.process(column.computed)
2586 else:
2587 colspec += " " + self.dialect.type_compiler_instance.process(
2588 column.type, type_expression=column
2589 )
2590
2591 if column.nullable is not None:
2592 if (
2593 not column.nullable
2594 or column.primary_key
2595 or isinstance(column.default, sa_schema.Sequence)
2596 or column.autoincrement is True
2597 or column.identity
2598 ):
2599 colspec += " NOT NULL"
2600 elif column.computed is None:
2601 # don't specify "NULL" for computed columns
2602 colspec += " NULL"
2603
2604 if column.table is None:
2605 raise exc.CompileError(
2606 "mssql requires Table-bound columns "
2607 "in order to generate DDL"
2608 )
2609
2610 d_opt = column.dialect_options["mssql"]
2611 start = d_opt["identity_start"]
2612 increment = d_opt["identity_increment"]
2613 if start is not None or increment is not None:
2614 if column.identity:
2615 raise exc.CompileError(
2616 "Cannot specify options 'mssql_identity_start' and/or "
2617 "'mssql_identity_increment' while also using the "
2618 "'Identity' construct."
2619 )
2620 util.warn_deprecated(
2621 "The dialect options 'mssql_identity_start' and "
2622 "'mssql_identity_increment' are deprecated. "
2623 "Use the 'Identity' object instead.",
2624 "1.4",
2625 )
2626
2627 if column.identity:
2628 colspec += self.process(column.identity, **kwargs)
2629 elif (
2630 column is column.table._autoincrement_column
2631 or column.autoincrement is True
2632 ) and (
2633 not isinstance(column.default, Sequence) or column.default.optional
2634 ):
2635 colspec += self.process(Identity(start=start, increment=increment))
2636 else:
2637 default = self.get_column_default_string(column)
2638 if default is not None:
2639 colspec += " DEFAULT " + default
2640
2641 return colspec
2642
2643 def visit_create_index(self, create, include_schema=False, **kw):
2644 index = create.element
2645 self._verify_index_table(index)
2646 preparer = self.preparer
2647 text = "CREATE "
2648 if index.unique:
2649 text += "UNIQUE "
2650
2651 # handle clustering option
2652 clustered = index.dialect_options["mssql"]["clustered"]
2653 if clustered is not None:
2654 if clustered:
2655 text += "CLUSTERED "
2656 else:
2657 text += "NONCLUSTERED "
2658
2659 # handle columnstore option (has no negative value)
2660 columnstore = index.dialect_options["mssql"]["columnstore"]
2661 if columnstore:
2662 text += "COLUMNSTORE "
2663
2664 text += "INDEX %s ON %s" % (
2665 self._prepared_index_name(index, include_schema=include_schema),
2666 preparer.format_table(index.table),
2667 )
2668
2669 # in some case mssql allows indexes with no columns defined
2670 if len(index.expressions) > 0:
2671 text += " (%s)" % ", ".join(
2672 self.sql_compiler.process(
2673 expr, include_table=False, literal_binds=True
2674 )
2675 for expr in index.expressions
2676 )
2677
2678 # handle other included columns
2679 if index.dialect_options["mssql"]["include"]:
2680 inclusions = [
2681 index.table.c[col] if isinstance(col, str) else col
2682 for col in index.dialect_options["mssql"]["include"]
2683 ]
2684
2685 text += " INCLUDE (%s)" % ", ".join(
2686 [preparer.quote(c.name) for c in inclusions]
2687 )
2688
2689 whereclause = index.dialect_options["mssql"]["where"]
2690
2691 if whereclause is not None:
2692 whereclause = coercions.expect(
2693 roles.DDLExpressionRole, whereclause
2694 )
2695
2696 where_compiled = self.sql_compiler.process(
2697 whereclause, include_table=False, literal_binds=True
2698 )
2699 text += " WHERE " + where_compiled
2700
2701 return text
2702
2703 def visit_drop_index(self, drop, **kw):
2704 return "\nDROP INDEX %s ON %s" % (
2705 self._prepared_index_name(drop.element, include_schema=False),
2706 self.preparer.format_table(drop.element.table),
2707 )
2708
2709 def visit_primary_key_constraint(self, constraint, **kw):
2710 if len(constraint) == 0:
2711 return ""
2712 text = ""
2713 if constraint.name is not None:
2714 text += "CONSTRAINT %s " % self.preparer.format_constraint(
2715 constraint
2716 )
2717 text += "PRIMARY KEY "
2718
2719 clustered = constraint.dialect_options["mssql"]["clustered"]
2720 if clustered is not None:
2721 if clustered:
2722 text += "CLUSTERED "
2723 else:
2724 text += "NONCLUSTERED "
2725
2726 text += "(%s)" % ", ".join(
2727 self.preparer.quote(c.name) for c in constraint
2728 )
2729 text += self.define_constraint_deferrability(constraint)
2730 return text
2731
2732 def visit_unique_constraint(self, constraint, **kw):
2733 if len(constraint) == 0:
2734 return ""
2735 text = ""
2736 if constraint.name is not None:
2737 formatted_name = self.preparer.format_constraint(constraint)
2738 if formatted_name is not None:
2739 text += "CONSTRAINT %s " % formatted_name
2740 text += "UNIQUE %s" % self.define_unique_constraint_distinct(
2741 constraint, **kw
2742 )
2743 clustered = constraint.dialect_options["mssql"]["clustered"]
2744 if clustered is not None:
2745 if clustered:
2746 text += "CLUSTERED "
2747 else:
2748 text += "NONCLUSTERED "
2749
2750 text += "(%s)" % ", ".join(
2751 self.preparer.quote(c.name) for c in constraint
2752 )
2753 text += self.define_constraint_deferrability(constraint)
2754 return text
2755
2756 def visit_computed_column(self, generated, **kw):
2757 text = "AS (%s)" % self.sql_compiler.process(
2758 generated.sqltext, include_table=False, literal_binds=True
2759 )
2760 # explicitly check for True|False since None means server default
2761 if generated.persisted is True:
2762 text += " PERSISTED"
2763 return text
2764
2765 def visit_set_table_comment(self, create, **kw):
2766 schema = self.preparer.schema_for_object(create.element)
2767 schema_name = schema if schema else self.dialect.default_schema_name
2768 return (
2769 "execute sp_addextendedproperty 'MS_Description', "
2770 "{}, 'schema', {}, 'table', {}".format(
2771 self.sql_compiler.render_literal_value(
2772 create.element.comment, sqltypes.NVARCHAR()
2773 ),
2774 self.preparer.quote_schema(schema_name),
2775 self.preparer.format_table(create.element, use_schema=False),
2776 )
2777 )
2778
2779 def visit_drop_table_comment(self, drop, **kw):
2780 schema = self.preparer.schema_for_object(drop.element)
2781 schema_name = schema if schema else self.dialect.default_schema_name
2782 return (
2783 "execute sp_dropextendedproperty 'MS_Description', 'schema', "
2784 "{}, 'table', {}".format(
2785 self.preparer.quote_schema(schema_name),
2786 self.preparer.format_table(drop.element, use_schema=False),
2787 )
2788 )
2789
2790 def visit_set_column_comment(self, create, **kw):
2791 schema = self.preparer.schema_for_object(create.element.table)
2792 schema_name = schema if schema else self.dialect.default_schema_name
2793 return (
2794 "execute sp_addextendedproperty 'MS_Description', "
2795 "{}, 'schema', {}, 'table', {}, 'column', {}".format(
2796 self.sql_compiler.render_literal_value(
2797 create.element.comment, sqltypes.NVARCHAR()
2798 ),
2799 self.preparer.quote_schema(schema_name),
2800 self.preparer.format_table(
2801 create.element.table, use_schema=False
2802 ),
2803 self.preparer.format_column(create.element),
2804 )
2805 )
2806
2807 def visit_drop_column_comment(self, drop, **kw):
2808 schema = self.preparer.schema_for_object(drop.element.table)
2809 schema_name = schema if schema else self.dialect.default_schema_name
2810 return (
2811 "execute sp_dropextendedproperty 'MS_Description', 'schema', "
2812 "{}, 'table', {}, 'column', {}".format(
2813 self.preparer.quote_schema(schema_name),
2814 self.preparer.format_table(
2815 drop.element.table, use_schema=False
2816 ),
2817 self.preparer.format_column(drop.element),
2818 )
2819 )
2820
2821 def visit_create_sequence(self, create, **kw):
2822 prefix = None
2823 if create.element.data_type is not None:
2824 data_type = create.element.data_type
2825 prefix = " AS %s" % self.type_compiler.process(data_type)
2826 return super().visit_create_sequence(create, prefix=prefix, **kw)
2827
2828 def visit_identity_column(self, identity, **kw):
2829 text = " IDENTITY"
2830 if identity.start is not None or identity.increment is not None:
2831 start = 1 if identity.start is None else identity.start
2832 increment = 1 if identity.increment is None else identity.increment
2833 text += "(%s,%s)" % (start, increment)
2834 return text
2835
2836
2837class MSIdentifierPreparer(compiler.IdentifierPreparer):
2838 reserved_words = RESERVED_WORDS
2839
2840 def __init__(self, dialect):
2841 super().__init__(
2842 dialect,
2843 initial_quote="[",
2844 final_quote="]",
2845 quote_case_sensitive_collations=False,
2846 )
2847
2848 def _escape_identifier(self, value):
2849 return value.replace("]", "]]")
2850
2851 def _unescape_identifier(self, value):
2852 return value.replace("]]", "]")
2853
2854 def quote_schema(self, schema, force=None):
2855 """Prepare a quoted table and schema name."""
2856
2857 # need to re-implement the deprecation warning entirely
2858 if force is not None:
2859 # not using the util.deprecated_params() decorator in this
2860 # case because of the additional function call overhead on this
2861 # very performance-critical spot.
2862 util.warn_deprecated(
2863 "The IdentifierPreparer.quote_schema.force parameter is "
2864 "deprecated and will be removed in a future release. This "
2865 "flag has no effect on the behavior of the "
2866 "IdentifierPreparer.quote method; please refer to "
2867 "quoted_name().",
2868 version="1.3",
2869 )
2870
2871 dbname, owner = _schema_elements(schema)
2872 if dbname:
2873 result = "%s.%s" % (self.quote(dbname), self.quote(owner))
2874 elif owner:
2875 result = self.quote(owner)
2876 else:
2877 result = ""
2878 return result
2879
2880
2881def _db_plus_owner_listing(fn):
2882 def wrap(dialect, connection, schema=None, **kw):
2883 dbname, owner = _owner_plus_db(dialect, schema)
2884 return _switch_db(
2885 dbname,
2886 connection,
2887 fn,
2888 dialect,
2889 connection,
2890 dbname,
2891 owner,
2892 schema,
2893 **kw,
2894 )
2895
2896 return update_wrapper(wrap, fn)
2897
2898
2899def _db_plus_owner(fn):
2900 def wrap(dialect, connection, tablename, schema=None, **kw):
2901 dbname, owner = _owner_plus_db(dialect, schema)
2902 return _switch_db(
2903 dbname,
2904 connection,
2905 fn,
2906 dialect,
2907 connection,
2908 tablename,
2909 dbname,
2910 owner,
2911 schema,
2912 **kw,
2913 )
2914
2915 return update_wrapper(wrap, fn)
2916
2917
2918def _switch_db(dbname, connection, fn, *arg, **kw):
2919 if dbname:
2920 current_db = connection.exec_driver_sql("select db_name()").scalar()
2921 if current_db != dbname:
2922 connection.exec_driver_sql(
2923 "use %s" % connection.dialect.identifier_preparer.quote(dbname)
2924 )
2925 try:
2926 return fn(*arg, **kw)
2927 finally:
2928 if dbname and current_db != dbname:
2929 connection.exec_driver_sql(
2930 "use %s"
2931 % connection.dialect.identifier_preparer.quote(current_db)
2932 )
2933
2934
2935def _owner_plus_db(dialect, schema):
2936 if not schema:
2937 return None, dialect.default_schema_name
2938 else:
2939 return _schema_elements(schema)
2940
2941
2942_memoized_schema = util.LRUCache()
2943
2944
2945def _schema_elements(schema):
2946 if isinstance(schema, quoted_name) and schema.quote:
2947 return None, schema
2948
2949 if schema in _memoized_schema:
2950 return _memoized_schema[schema]
2951
2952 # tests for this function are in:
2953 # test/dialect/mssql/test_reflection.py ->
2954 # OwnerPlusDBTest.test_owner_database_pairs
2955 # test/dialect/mssql/test_compiler.py -> test_force_schema_*
2956 # test/dialect/mssql/test_compiler.py -> test_schema_many_tokens_*
2957 #
2958
2959 if schema.startswith("__[SCHEMA_"):
2960 return None, schema
2961
2962 push = []
2963 symbol = ""
2964 bracket = False
2965 has_brackets = False
2966 for token in re.split(r"(\[|\]|\.)", schema):
2967 if not token:
2968 continue
2969 if token == "[":
2970 bracket = True
2971 has_brackets = True
2972 elif token == "]":
2973 bracket = False
2974 elif not bracket and token == ".":
2975 if has_brackets:
2976 push.append("[%s]" % symbol)
2977 else:
2978 push.append(symbol)
2979 symbol = ""
2980 has_brackets = False
2981 else:
2982 symbol += token
2983 if symbol:
2984 push.append(symbol)
2985 if len(push) > 1:
2986 dbname, owner = ".".join(push[0:-1]), push[-1]
2987
2988 # test for internal brackets
2989 if re.match(r".*\].*\[.*", dbname[1:-1]):
2990 dbname = quoted_name(dbname, quote=False)
2991 else:
2992 dbname = dbname.lstrip("[").rstrip("]")
2993
2994 elif len(push):
2995 dbname, owner = None, push[0]
2996 else:
2997 dbname, owner = None, None
2998
2999 _memoized_schema[schema] = dbname, owner
3000 return dbname, owner
3001
3002
3003class MSDialect(default.DefaultDialect):
3004 # will assume it's at least mssql2005
3005 name = "mssql"
3006 supports_statement_cache = True
3007 supports_default_values = True
3008 supports_empty_insert = False
3009 favor_returning_over_lastrowid = True
3010
3011 returns_native_bytes = True
3012
3013 supports_comments = True
3014 supports_default_metavalue = False
3015 """dialect supports INSERT... VALUES (DEFAULT) syntax -
3016 SQL Server **does** support this, but **not** for the IDENTITY column,
3017 so we can't turn this on.
3018
3019 """
3020
3021 # supports_native_uuid is partial here, so we implement our
3022 # own impl type
3023
3024 execution_ctx_cls = MSExecutionContext
3025 use_scope_identity = True
3026 max_identifier_length = 128
3027 schema_name = "dbo"
3028
3029 insert_returning = True
3030 update_returning = True
3031 delete_returning = True
3032 update_returning_multifrom = True
3033 delete_returning_multifrom = True
3034
3035 colspecs = {
3036 sqltypes.DateTime: _MSDateTime,
3037 sqltypes.Date: _MSDate,
3038 sqltypes.JSON: JSON,
3039 sqltypes.JSON.JSONIndexType: JSONIndexType,
3040 sqltypes.JSON.JSONPathType: JSONPathType,
3041 sqltypes.Time: _BASETIMEIMPL,
3042 sqltypes.Unicode: _MSUnicode,
3043 sqltypes.UnicodeText: _MSUnicodeText,
3044 DATETIMEOFFSET: DATETIMEOFFSET,
3045 DATETIME2: DATETIME2,
3046 SMALLDATETIME: SMALLDATETIME,
3047 DATETIME: DATETIME,
3048 sqltypes.Uuid: MSUUid,
3049 }
3050
3051 engine_config_types = default.DefaultDialect.engine_config_types.union(
3052 {"legacy_schema_aliasing": util.asbool}
3053 )
3054
3055 ischema_names = ischema_names
3056
3057 supports_sequences = True
3058 sequences_optional = True
3059 # This is actually used for autoincrement, where itentity is used that
3060 # starts with 1.
3061 # for sequences T-SQL's actual default is -9223372036854775808
3062 default_sequence_base = 1
3063
3064 supports_native_boolean = False
3065 non_native_boolean_check_constraint = False
3066 supports_unicode_binds = True
3067 postfetch_lastrowid = True
3068
3069 # may be changed at server inspection time for older SQL server versions
3070 supports_multivalues_insert = True
3071
3072 use_insertmanyvalues = True
3073
3074 # note pyodbc will set this to False if fast_executemany is set,
3075 # as of SQLAlchemy 2.0.9
3076 use_insertmanyvalues_wo_returning = True
3077
3078 insertmanyvalues_implicit_sentinel = (
3079 InsertmanyvaluesSentinelOpts.AUTOINCREMENT
3080 | InsertmanyvaluesSentinelOpts.IDENTITY
3081 | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
3082 )
3083
3084 # "The incoming request has too many parameters. The server supports a "
3085 # "maximum of 2100 parameters."
3086 # in fact you can have 2099 parameters.
3087 insertmanyvalues_max_parameters = 2099
3088
3089 _supports_offset_fetch = False
3090 _supports_nvarchar_max = False
3091
3092 legacy_schema_aliasing = False
3093
3094 server_version_info = ()
3095
3096 statement_compiler = MSSQLCompiler
3097 ddl_compiler = MSDDLCompiler
3098 type_compiler_cls = MSTypeCompiler
3099 preparer = MSIdentifierPreparer
3100
3101 construct_arguments = [
3102 (sa_schema.PrimaryKeyConstraint, {"clustered": None}),
3103 (sa_schema.UniqueConstraint, {"clustered": None}),
3104 (
3105 sa_schema.Index,
3106 {
3107 "clustered": None,
3108 "include": None,
3109 "where": None,
3110 "columnstore": None,
3111 },
3112 ),
3113 (
3114 sa_schema.Column,
3115 {"identity_start": None, "identity_increment": None},
3116 ),
3117 ]
3118
3119 def __init__(
3120 self,
3121 query_timeout=None,
3122 use_scope_identity=True,
3123 schema_name="dbo",
3124 deprecate_large_types=None,
3125 supports_comments=None,
3126 json_serializer=None,
3127 json_deserializer=None,
3128 legacy_schema_aliasing=None,
3129 ignore_no_transaction_on_rollback=False,
3130 **opts,
3131 ):
3132 self.query_timeout = int(query_timeout or 0)
3133 self.schema_name = schema_name
3134
3135 self.use_scope_identity = use_scope_identity
3136 self.deprecate_large_types = deprecate_large_types
3137 self.ignore_no_transaction_on_rollback = (
3138 ignore_no_transaction_on_rollback
3139 )
3140 self._user_defined_supports_comments = uds = supports_comments
3141 if uds is not None:
3142 self.supports_comments = uds
3143
3144 if legacy_schema_aliasing is not None:
3145 util.warn_deprecated(
3146 "The legacy_schema_aliasing parameter is "
3147 "deprecated and will be removed in a future release.",
3148 "1.4",
3149 )
3150 self.legacy_schema_aliasing = legacy_schema_aliasing
3151
3152 super().__init__(**opts)
3153
3154 self._json_serializer = json_serializer
3155 self._json_deserializer = json_deserializer
3156
3157 def do_savepoint(self, connection, name):
3158 # give the DBAPI a push
3159 connection.exec_driver_sql("IF @@TRANCOUNT = 0 BEGIN TRANSACTION")
3160 super().do_savepoint(connection, name)
3161
3162 def do_release_savepoint(self, connection, name):
3163 # SQL Server does not support RELEASE SAVEPOINT
3164 pass
3165
3166 def do_rollback(self, dbapi_connection):
3167 try:
3168 super().do_rollback(dbapi_connection)
3169 except self.dbapi.ProgrammingError as e:
3170 if self.ignore_no_transaction_on_rollback and re.match(
3171 r".*\b111214\b", str(e)
3172 ):
3173 util.warn(
3174 "ProgrammingError 111214 "
3175 "'No corresponding transaction found.' "
3176 "has been suppressed via "
3177 "ignore_no_transaction_on_rollback=True"
3178 )
3179 else:
3180 raise
3181
3182 _isolation_lookup = {
3183 "SERIALIZABLE",
3184 "READ UNCOMMITTED",
3185 "READ COMMITTED",
3186 "REPEATABLE READ",
3187 "SNAPSHOT",
3188 }
3189
3190 def get_isolation_level_values(self, dbapi_connection):
3191 return list(self._isolation_lookup)
3192
3193 def set_isolation_level(self, dbapi_connection, level):
3194 cursor = dbapi_connection.cursor()
3195 cursor.execute(f"SET TRANSACTION ISOLATION LEVEL {level}")
3196 cursor.close()
3197 if level == "SNAPSHOT":
3198 dbapi_connection.commit()
3199
3200 def get_isolation_level(self, dbapi_connection):
3201 cursor = dbapi_connection.cursor()
3202 view_name = "sys.system_views"
3203 try:
3204 cursor.execute(
3205 (
3206 "SELECT name FROM {} WHERE name IN "
3207 "('dm_exec_sessions', 'dm_pdw_nodes_exec_sessions')"
3208 ).format(view_name)
3209 )
3210 row = cursor.fetchone()
3211 if not row:
3212 raise NotImplementedError(
3213 "Can't fetch isolation level on this particular "
3214 "SQL Server version."
3215 )
3216
3217 view_name = f"sys.{row[0]}"
3218
3219 cursor.execute(
3220 """
3221 SELECT CASE transaction_isolation_level
3222 WHEN 0 THEN NULL
3223 WHEN 1 THEN 'READ UNCOMMITTED'
3224 WHEN 2 THEN 'READ COMMITTED'
3225 WHEN 3 THEN 'REPEATABLE READ'
3226 WHEN 4 THEN 'SERIALIZABLE'
3227 WHEN 5 THEN 'SNAPSHOT' END
3228 AS TRANSACTION_ISOLATION_LEVEL
3229 FROM {}
3230 where session_id = @@SPID
3231 """.format(
3232 view_name
3233 )
3234 )
3235 except self.dbapi.Error as err:
3236 raise NotImplementedError(
3237 "Can't fetch isolation level; encountered error {} when "
3238 'attempting to query the "{}" view.'.format(err, view_name)
3239 ) from err
3240 else:
3241 row = cursor.fetchone()
3242 return row[0].upper()
3243 finally:
3244 cursor.close()
3245
3246 def initialize(self, connection):
3247 super().initialize(connection)
3248 self._setup_version_attributes()
3249 self._setup_supports_nvarchar_max(connection)
3250 self._setup_supports_comments(connection)
3251
3252 def _setup_version_attributes(self):
3253 if self.server_version_info[0] not in list(range(8, 17)):
3254 util.warn(
3255 "Unrecognized server version info '%s'. Some SQL Server "
3256 "features may not function properly."
3257 % ".".join(str(x) for x in self.server_version_info)
3258 )
3259
3260 if self.server_version_info >= MS_2008_VERSION:
3261 self.supports_multivalues_insert = True
3262 else:
3263 self.supports_multivalues_insert = False
3264
3265 if self.deprecate_large_types is None:
3266 self.deprecate_large_types = (
3267 self.server_version_info >= MS_2012_VERSION
3268 )
3269
3270 self._supports_offset_fetch = (
3271 self.server_version_info and self.server_version_info[0] >= 11
3272 )
3273
3274 def _setup_supports_nvarchar_max(self, connection):
3275 try:
3276 connection.scalar(
3277 sql.text("SELECT CAST('test max support' AS NVARCHAR(max))")
3278 )
3279 except exc.DBAPIError:
3280 self._supports_nvarchar_max = False
3281 else:
3282 self._supports_nvarchar_max = True
3283
3284 def _setup_supports_comments(self, connection):
3285 if self._user_defined_supports_comments is not None:
3286 return
3287
3288 try:
3289 connection.scalar(
3290 sql.text(
3291 "SELECT 1 FROM fn_listextendedproperty"
3292 "(default, default, default, default, "
3293 "default, default, default)"
3294 )
3295 )
3296 except exc.DBAPIError:
3297 self.supports_comments = False
3298 else:
3299 self.supports_comments = True
3300
3301 def _get_default_schema_name(self, connection):
3302 query = sql.text("SELECT schema_name()")
3303 default_schema_name = connection.scalar(query)
3304 if default_schema_name is not None:
3305 # guard against the case where the default_schema_name is being
3306 # fed back into a table reflection function.
3307 return quoted_name(default_schema_name, quote=True)
3308 else:
3309 return self.schema_name
3310
3311 @_db_plus_owner
3312 def has_table(self, connection, tablename, dbname, owner, schema, **kw):
3313 self._ensure_has_table_connection(connection)
3314
3315 return self._internal_has_table(connection, tablename, owner, **kw)
3316
3317 @reflection.cache
3318 @_db_plus_owner
3319 def has_sequence(
3320 self, connection, sequencename, dbname, owner, schema, **kw
3321 ):
3322 sequences = ischema.sequences
3323
3324 s = sql.select(sequences.c.sequence_name).where(
3325 sequences.c.sequence_name == sequencename
3326 )
3327
3328 if owner:
3329 s = s.where(sequences.c.sequence_schema == owner)
3330
3331 c = connection.execute(s)
3332
3333 return c.first() is not None
3334
3335 @reflection.cache
3336 @_db_plus_owner_listing
3337 def get_sequence_names(self, connection, dbname, owner, schema, **kw):
3338 sequences = ischema.sequences
3339
3340 s = sql.select(sequences.c.sequence_name)
3341 if owner:
3342 s = s.where(sequences.c.sequence_schema == owner)
3343
3344 c = connection.execute(s)
3345
3346 return [row[0] for row in c]
3347
3348 @reflection.cache
3349 def get_schema_names(self, connection, **kw):
3350 s = sql.select(ischema.schemata.c.schema_name).order_by(
3351 ischema.schemata.c.schema_name
3352 )
3353 schema_names = [r[0] for r in connection.execute(s)]
3354 return schema_names
3355
3356 @reflection.cache
3357 @_db_plus_owner_listing
3358 def get_table_names(self, connection, dbname, owner, schema, **kw):
3359 tables = ischema.tables
3360 s = (
3361 sql.select(tables.c.table_name)
3362 .where(
3363 sql.and_(
3364 tables.c.table_schema == owner,
3365 tables.c.table_type == "BASE TABLE",
3366 )
3367 )
3368 .order_by(tables.c.table_name)
3369 )
3370 table_names = [r[0] for r in connection.execute(s)]
3371 return table_names
3372
3373 @reflection.cache
3374 @_db_plus_owner_listing
3375 def get_view_names(self, connection, dbname, owner, schema, **kw):
3376 tables = ischema.tables
3377 s = (
3378 sql.select(tables.c.table_name)
3379 .where(
3380 sql.and_(
3381 tables.c.table_schema == owner,
3382 tables.c.table_type == "VIEW",
3383 )
3384 )
3385 .order_by(tables.c.table_name)
3386 )
3387 view_names = [r[0] for r in connection.execute(s)]
3388 return view_names
3389
3390 @reflection.cache
3391 def _internal_has_table(self, connection, tablename, owner, **kw):
3392 if tablename.startswith("#"): # temporary table
3393 # mssql does not support temporary views
3394 # SQL Error [4103] [S0001]: "#v": Temporary views are not allowed
3395 return bool(
3396 connection.scalar(
3397 # U filters on user tables only.
3398 text("SELECT object_id(:table_name, 'U')"),
3399 {"table_name": f"tempdb.dbo.[{tablename}]"},
3400 )
3401 )
3402 else:
3403 tables = ischema.tables
3404
3405 s = sql.select(tables.c.table_name).where(
3406 sql.and_(
3407 sql.or_(
3408 tables.c.table_type == "BASE TABLE",
3409 tables.c.table_type == "VIEW",
3410 ),
3411 tables.c.table_name == tablename,
3412 )
3413 )
3414
3415 if owner:
3416 s = s.where(tables.c.table_schema == owner)
3417
3418 c = connection.execute(s)
3419
3420 return c.first() is not None
3421
3422 def _default_or_error(self, connection, tablename, owner, method, **kw):
3423 # TODO: try to avoid having to run a separate query here
3424 if self._internal_has_table(connection, tablename, owner, **kw):
3425 return method()
3426 else:
3427 raise exc.NoSuchTableError(f"{owner}.{tablename}")
3428
3429 @reflection.cache
3430 @_db_plus_owner
3431 def get_indexes(self, connection, tablename, dbname, owner, schema, **kw):
3432 filter_definition = (
3433 "ind.filter_definition"
3434 if self.server_version_info >= MS_2008_VERSION
3435 else "NULL as filter_definition"
3436 )
3437 rp = connection.execution_options(future_result=True).execute(
3438 sql.text(
3439 f"""
3440select
3441 ind.index_id,
3442 ind.is_unique,
3443 ind.name,
3444 ind.type,
3445 {filter_definition}
3446from
3447 sys.indexes as ind
3448join sys.tables as tab on
3449 ind.object_id = tab.object_id
3450join sys.schemas as sch on
3451 sch.schema_id = tab.schema_id
3452where
3453 tab.name = :tabname
3454 and sch.name = :schname
3455 and ind.is_primary_key = 0
3456 and ind.type != 0
3457order by
3458 ind.name
3459 """
3460 )
3461 .bindparams(
3462 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
3463 sql.bindparam("schname", owner, ischema.CoerceUnicode()),
3464 )
3465 .columns(name=sqltypes.Unicode())
3466 )
3467 indexes = {}
3468 for row in rp.mappings():
3469 indexes[row["index_id"]] = current = {
3470 "name": row["name"],
3471 "unique": row["is_unique"] == 1,
3472 "column_names": [],
3473 "include_columns": [],
3474 "dialect_options": {},
3475 }
3476
3477 do = current["dialect_options"]
3478 index_type = row["type"]
3479 if index_type in {1, 2}:
3480 do["mssql_clustered"] = index_type == 1
3481 if index_type in {5, 6}:
3482 do["mssql_clustered"] = index_type == 5
3483 do["mssql_columnstore"] = True
3484 if row["filter_definition"] is not None:
3485 do["mssql_where"] = row["filter_definition"]
3486
3487 rp = connection.execution_options(future_result=True).execute(
3488 sql.text(
3489 """
3490select
3491 ind_col.index_id,
3492 col.name,
3493 ind_col.is_included_column
3494from
3495 sys.columns as col
3496join sys.tables as tab on
3497 tab.object_id = col.object_id
3498join sys.index_columns as ind_col on
3499 ind_col.column_id = col.column_id
3500 and ind_col.object_id = tab.object_id
3501join sys.schemas as sch on
3502 sch.schema_id = tab.schema_id
3503where
3504 tab.name = :tabname
3505 and sch.name = :schname
3506 """
3507 )
3508 .bindparams(
3509 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
3510 sql.bindparam("schname", owner, ischema.CoerceUnicode()),
3511 )
3512 .columns(name=sqltypes.Unicode())
3513 )
3514 for row in rp.mappings():
3515 if row["index_id"] not in indexes:
3516 continue
3517 index_def = indexes[row["index_id"]]
3518 is_colstore = index_def["dialect_options"].get("mssql_columnstore")
3519 is_clustered = index_def["dialect_options"].get("mssql_clustered")
3520 if not (is_colstore and is_clustered):
3521 # a clustered columnstore index includes all columns but does
3522 # not want them in the index definition
3523 if row["is_included_column"] and not is_colstore:
3524 # a noncludsted columnstore index reports that includes
3525 # columns but requires that are listed as normal columns
3526 index_def["include_columns"].append(row["name"])
3527 else:
3528 index_def["column_names"].append(row["name"])
3529 for index_info in indexes.values():
3530 # NOTE: "root level" include_columns is legacy, now part of
3531 # dialect_options (issue #7382)
3532 index_info["dialect_options"]["mssql_include"] = index_info[
3533 "include_columns"
3534 ]
3535
3536 if indexes:
3537 return list(indexes.values())
3538 else:
3539 return self._default_or_error(
3540 connection, tablename, owner, ReflectionDefaults.indexes, **kw
3541 )
3542
3543 @reflection.cache
3544 @_db_plus_owner
3545 def get_view_definition(
3546 self, connection, viewname, dbname, owner, schema, **kw
3547 ):
3548 view_def = connection.execute(
3549 sql.text(
3550 "select mod.definition "
3551 "from sys.sql_modules as mod "
3552 "join sys.views as views on mod.object_id = views.object_id "
3553 "join sys.schemas as sch on views.schema_id = sch.schema_id "
3554 "where views.name=:viewname and sch.name=:schname"
3555 ).bindparams(
3556 sql.bindparam("viewname", viewname, ischema.CoerceUnicode()),
3557 sql.bindparam("schname", owner, ischema.CoerceUnicode()),
3558 )
3559 ).scalar()
3560 if view_def:
3561 return view_def
3562 else:
3563 raise exc.NoSuchTableError(f"{owner}.{viewname}")
3564
3565 @reflection.cache
3566 def get_table_comment(self, connection, table_name, schema=None, **kw):
3567 if not self.supports_comments:
3568 raise NotImplementedError(
3569 "Can't get table comments on current SQL Server version in use"
3570 )
3571
3572 schema_name = schema if schema else self.default_schema_name
3573 COMMENT_SQL = """
3574 SELECT cast(com.value as nvarchar(max))
3575 FROM fn_listextendedproperty('MS_Description',
3576 'schema', :schema, 'table', :table, NULL, NULL
3577 ) as com;
3578 """
3579
3580 comment = connection.execute(
3581 sql.text(COMMENT_SQL).bindparams(
3582 sql.bindparam("schema", schema_name, ischema.CoerceUnicode()),
3583 sql.bindparam("table", table_name, ischema.CoerceUnicode()),
3584 )
3585 ).scalar()
3586 if comment:
3587 return {"text": comment}
3588 else:
3589 return self._default_or_error(
3590 connection,
3591 table_name,
3592 None,
3593 ReflectionDefaults.table_comment,
3594 **kw,
3595 )
3596
3597 def _temp_table_name_like_pattern(self, tablename):
3598 # LIKE uses '%' to match zero or more characters and '_' to match any
3599 # single character. We want to match literal underscores, so T-SQL
3600 # requires that we enclose them in square brackets.
3601 return tablename + (
3602 ("[_][_][_]%") if not tablename.startswith("##") else ""
3603 )
3604
3605 def _get_internal_temp_table_name(self, connection, tablename):
3606 # it's likely that schema is always "dbo", but since we can
3607 # get it here, let's get it.
3608 # see https://stackoverflow.com/questions/8311959/
3609 # specifying-schema-for-temporary-tables
3610
3611 try:
3612 return connection.execute(
3613 sql.text(
3614 "select table_schema, table_name "
3615 "from tempdb.information_schema.tables "
3616 "where table_name like :p1"
3617 ),
3618 {"p1": self._temp_table_name_like_pattern(tablename)},
3619 ).one()
3620 except exc.MultipleResultsFound as me:
3621 raise exc.UnreflectableTableError(
3622 "Found more than one temporary table named '%s' in tempdb "
3623 "at this time. Cannot reliably resolve that name to its "
3624 "internal table name." % tablename
3625 ) from me
3626 except exc.NoResultFound as ne:
3627 raise exc.NoSuchTableError(
3628 "Unable to find a temporary table named '%s' in tempdb."
3629 % tablename
3630 ) from ne
3631
3632 @reflection.cache
3633 @_db_plus_owner
3634 def get_columns(self, connection, tablename, dbname, owner, schema, **kw):
3635 is_temp_table = tablename.startswith("#")
3636 if is_temp_table:
3637 owner, tablename = self._get_internal_temp_table_name(
3638 connection, tablename
3639 )
3640
3641 columns = ischema.mssql_temp_table_columns
3642 else:
3643 columns = ischema.columns
3644
3645 computed_cols = ischema.computed_columns
3646 identity_cols = ischema.identity_columns
3647 if owner:
3648 whereclause = sql.and_(
3649 columns.c.table_name == tablename,
3650 columns.c.table_schema == owner,
3651 )
3652 full_name = columns.c.table_schema + "." + columns.c.table_name
3653 else:
3654 whereclause = columns.c.table_name == tablename
3655 full_name = columns.c.table_name
3656
3657 if self._supports_nvarchar_max:
3658 computed_definition = computed_cols.c.definition
3659 else:
3660 # tds_version 4.2 does not support NVARCHAR(MAX)
3661 computed_definition = sql.cast(
3662 computed_cols.c.definition, NVARCHAR(4000)
3663 )
3664
3665 object_id = func.object_id(full_name)
3666
3667 s = (
3668 sql.select(
3669 columns.c.column_name,
3670 columns.c.data_type,
3671 columns.c.is_nullable,
3672 columns.c.character_maximum_length,
3673 columns.c.numeric_precision,
3674 columns.c.numeric_scale,
3675 columns.c.column_default,
3676 columns.c.collation_name,
3677 computed_definition,
3678 computed_cols.c.is_persisted,
3679 identity_cols.c.is_identity,
3680 identity_cols.c.seed_value,
3681 identity_cols.c.increment_value,
3682 ischema.extended_properties.c.value.label("comment"),
3683 )
3684 .select_from(columns)
3685 .outerjoin(
3686 computed_cols,
3687 onclause=sql.and_(
3688 computed_cols.c.object_id == object_id,
3689 computed_cols.c.name
3690 == columns.c.column_name.collate("DATABASE_DEFAULT"),
3691 ),
3692 )
3693 .outerjoin(
3694 identity_cols,
3695 onclause=sql.and_(
3696 identity_cols.c.object_id == object_id,
3697 identity_cols.c.name
3698 == columns.c.column_name.collate("DATABASE_DEFAULT"),
3699 ),
3700 )
3701 .outerjoin(
3702 ischema.extended_properties,
3703 onclause=sql.and_(
3704 ischema.extended_properties.c["class"] == 1,
3705 ischema.extended_properties.c.major_id == object_id,
3706 ischema.extended_properties.c.minor_id
3707 == columns.c.ordinal_position,
3708 ischema.extended_properties.c.name == "MS_Description",
3709 ),
3710 )
3711 .where(whereclause)
3712 .order_by(columns.c.ordinal_position)
3713 )
3714
3715 c = connection.execution_options(future_result=True).execute(s)
3716
3717 cols = []
3718 for row in c.mappings():
3719 name = row[columns.c.column_name]
3720 type_ = row[columns.c.data_type]
3721 nullable = row[columns.c.is_nullable] == "YES"
3722 charlen = row[columns.c.character_maximum_length]
3723 numericprec = row[columns.c.numeric_precision]
3724 numericscale = row[columns.c.numeric_scale]
3725 default = row[columns.c.column_default]
3726 collation = row[columns.c.collation_name]
3727 definition = row[computed_definition]
3728 is_persisted = row[computed_cols.c.is_persisted]
3729 is_identity = row[identity_cols.c.is_identity]
3730 identity_start = row[identity_cols.c.seed_value]
3731 identity_increment = row[identity_cols.c.increment_value]
3732 comment = row[ischema.extended_properties.c.value]
3733
3734 coltype = self.ischema_names.get(type_, None)
3735
3736 kwargs = {}
3737 if coltype in (
3738 MSString,
3739 MSChar,
3740 MSNVarchar,
3741 MSNChar,
3742 MSText,
3743 MSNText,
3744 MSBinary,
3745 MSVarBinary,
3746 sqltypes.LargeBinary,
3747 ):
3748 if charlen == -1:
3749 charlen = None
3750 kwargs["length"] = charlen
3751 if collation:
3752 kwargs["collation"] = collation
3753
3754 if coltype is None:
3755 util.warn(
3756 "Did not recognize type '%s' of column '%s'"
3757 % (type_, name)
3758 )
3759 coltype = sqltypes.NULLTYPE
3760 else:
3761 if issubclass(coltype, sqltypes.Numeric):
3762 kwargs["precision"] = numericprec
3763
3764 if not issubclass(coltype, sqltypes.Float):
3765 kwargs["scale"] = numericscale
3766
3767 coltype = coltype(**kwargs)
3768 cdict = {
3769 "name": name,
3770 "type": coltype,
3771 "nullable": nullable,
3772 "default": default,
3773 "autoincrement": is_identity is not None,
3774 "comment": comment,
3775 }
3776
3777 if definition is not None and is_persisted is not None:
3778 cdict["computed"] = {
3779 "sqltext": definition,
3780 "persisted": is_persisted,
3781 }
3782
3783 if is_identity is not None:
3784 # identity_start and identity_increment are Decimal or None
3785 if identity_start is None or identity_increment is None:
3786 cdict["identity"] = {}
3787 else:
3788 if isinstance(coltype, sqltypes.BigInteger):
3789 start = int(identity_start)
3790 increment = int(identity_increment)
3791 elif isinstance(coltype, sqltypes.Integer):
3792 start = int(identity_start)
3793 increment = int(identity_increment)
3794 else:
3795 start = identity_start
3796 increment = identity_increment
3797
3798 cdict["identity"] = {
3799 "start": start,
3800 "increment": increment,
3801 }
3802
3803 cols.append(cdict)
3804
3805 if cols:
3806 return cols
3807 else:
3808 return self._default_or_error(
3809 connection, tablename, owner, ReflectionDefaults.columns, **kw
3810 )
3811
3812 @reflection.cache
3813 @_db_plus_owner
3814 def get_pk_constraint(
3815 self, connection, tablename, dbname, owner, schema, **kw
3816 ):
3817 pkeys = []
3818 TC = ischema.constraints
3819 C = ischema.key_constraints.alias("C")
3820
3821 # Primary key constraints
3822 s = (
3823 sql.select(
3824 C.c.column_name,
3825 TC.c.constraint_type,
3826 C.c.constraint_name,
3827 func.objectproperty(
3828 func.object_id(
3829 C.c.table_schema + "." + C.c.constraint_name
3830 ),
3831 "CnstIsClustKey",
3832 ).label("is_clustered"),
3833 )
3834 .where(
3835 sql.and_(
3836 TC.c.constraint_name == C.c.constraint_name,
3837 TC.c.table_schema == C.c.table_schema,
3838 C.c.table_name == tablename,
3839 C.c.table_schema == owner,
3840 ),
3841 )
3842 .order_by(TC.c.constraint_name, C.c.ordinal_position)
3843 )
3844 c = connection.execution_options(future_result=True).execute(s)
3845 constraint_name = None
3846 is_clustered = None
3847 for row in c.mappings():
3848 if "PRIMARY" in row[TC.c.constraint_type.name]:
3849 pkeys.append(row["COLUMN_NAME"])
3850 if constraint_name is None:
3851 constraint_name = row[C.c.constraint_name.name]
3852 if is_clustered is None:
3853 is_clustered = row["is_clustered"]
3854 if pkeys:
3855 return {
3856 "constrained_columns": pkeys,
3857 "name": constraint_name,
3858 "dialect_options": {"mssql_clustered": is_clustered},
3859 }
3860 else:
3861 return self._default_or_error(
3862 connection,
3863 tablename,
3864 owner,
3865 ReflectionDefaults.pk_constraint,
3866 **kw,
3867 )
3868
3869 @reflection.cache
3870 @_db_plus_owner
3871 def get_foreign_keys(
3872 self, connection, tablename, dbname, owner, schema, **kw
3873 ):
3874 # Foreign key constraints
3875 s = (
3876 text(
3877 """\
3878WITH fk_info AS (
3879 SELECT
3880 ischema_ref_con.constraint_schema,
3881 ischema_ref_con.constraint_name,
3882 ischema_key_col.ordinal_position,
3883 ischema_key_col.table_schema,
3884 ischema_key_col.table_name,
3885 ischema_ref_con.unique_constraint_schema,
3886 ischema_ref_con.unique_constraint_name,
3887 ischema_ref_con.match_option,
3888 ischema_ref_con.update_rule,
3889 ischema_ref_con.delete_rule,
3890 ischema_key_col.column_name AS constrained_column
3891 FROM
3892 INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS ischema_ref_con
3893 INNER JOIN
3894 INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col ON
3895 ischema_key_col.table_schema = ischema_ref_con.constraint_schema
3896 AND ischema_key_col.constraint_name =
3897 ischema_ref_con.constraint_name
3898 WHERE ischema_key_col.table_name = :tablename
3899 AND ischema_key_col.table_schema = :owner
3900),
3901constraint_info AS (
3902 SELECT
3903 ischema_key_col.constraint_schema,
3904 ischema_key_col.constraint_name,
3905 ischema_key_col.ordinal_position,
3906 ischema_key_col.table_schema,
3907 ischema_key_col.table_name,
3908 ischema_key_col.column_name
3909 FROM
3910 INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col
3911),
3912index_info AS (
3913 SELECT
3914 sys.schemas.name AS index_schema,
3915 sys.indexes.name AS index_name,
3916 sys.index_columns.key_ordinal AS ordinal_position,
3917 sys.schemas.name AS table_schema,
3918 sys.objects.name AS table_name,
3919 sys.columns.name AS column_name
3920 FROM
3921 sys.indexes
3922 INNER JOIN
3923 sys.objects ON
3924 sys.objects.object_id = sys.indexes.object_id
3925 INNER JOIN
3926 sys.schemas ON
3927 sys.schemas.schema_id = sys.objects.schema_id
3928 INNER JOIN
3929 sys.index_columns ON
3930 sys.index_columns.object_id = sys.objects.object_id
3931 AND sys.index_columns.index_id = sys.indexes.index_id
3932 INNER JOIN
3933 sys.columns ON
3934 sys.columns.object_id = sys.indexes.object_id
3935 AND sys.columns.column_id = sys.index_columns.column_id
3936)
3937 SELECT
3938 fk_info.constraint_schema,
3939 fk_info.constraint_name,
3940 fk_info.ordinal_position,
3941 fk_info.constrained_column,
3942 constraint_info.table_schema AS referred_table_schema,
3943 constraint_info.table_name AS referred_table_name,
3944 constraint_info.column_name AS referred_column,
3945 fk_info.match_option,
3946 fk_info.update_rule,
3947 fk_info.delete_rule
3948 FROM
3949 fk_info INNER JOIN constraint_info ON
3950 constraint_info.constraint_schema =
3951 fk_info.unique_constraint_schema
3952 AND constraint_info.constraint_name =
3953 fk_info.unique_constraint_name
3954 AND constraint_info.ordinal_position = fk_info.ordinal_position
3955 UNION
3956 SELECT
3957 fk_info.constraint_schema,
3958 fk_info.constraint_name,
3959 fk_info.ordinal_position,
3960 fk_info.constrained_column,
3961 index_info.table_schema AS referred_table_schema,
3962 index_info.table_name AS referred_table_name,
3963 index_info.column_name AS referred_column,
3964 fk_info.match_option,
3965 fk_info.update_rule,
3966 fk_info.delete_rule
3967 FROM
3968 fk_info INNER JOIN index_info ON
3969 index_info.index_schema = fk_info.unique_constraint_schema
3970 AND index_info.index_name = fk_info.unique_constraint_name
3971 AND index_info.ordinal_position = fk_info.ordinal_position
3972
3973 ORDER BY fk_info.constraint_schema, fk_info.constraint_name,
3974 fk_info.ordinal_position
3975"""
3976 )
3977 .bindparams(
3978 sql.bindparam("tablename", tablename, ischema.CoerceUnicode()),
3979 sql.bindparam("owner", owner, ischema.CoerceUnicode()),
3980 )
3981 .columns(
3982 constraint_schema=sqltypes.Unicode(),
3983 constraint_name=sqltypes.Unicode(),
3984 table_schema=sqltypes.Unicode(),
3985 table_name=sqltypes.Unicode(),
3986 constrained_column=sqltypes.Unicode(),
3987 referred_table_schema=sqltypes.Unicode(),
3988 referred_table_name=sqltypes.Unicode(),
3989 referred_column=sqltypes.Unicode(),
3990 )
3991 )
3992
3993 # group rows by constraint ID, to handle multi-column FKs
3994 fkeys = util.defaultdict(
3995 lambda: {
3996 "name": None,
3997 "constrained_columns": [],
3998 "referred_schema": None,
3999 "referred_table": None,
4000 "referred_columns": [],
4001 "options": {},
4002 }
4003 )
4004
4005 for r in connection.execute(s).all():
4006 (
4007 _, # constraint schema
4008 rfknm,
4009 _, # ordinal position
4010 scol,
4011 rschema,
4012 rtbl,
4013 rcol,
4014 # TODO: we support match=<keyword> for foreign keys so
4015 # we can support this also, PG has match=FULL for example
4016 # but this seems to not be a valid value for SQL Server
4017 _, # match rule
4018 fkuprule,
4019 fkdelrule,
4020 ) = r
4021
4022 rec = fkeys[rfknm]
4023 rec["name"] = rfknm
4024
4025 if fkuprule != "NO ACTION":
4026 rec["options"]["onupdate"] = fkuprule
4027
4028 if fkdelrule != "NO ACTION":
4029 rec["options"]["ondelete"] = fkdelrule
4030
4031 if not rec["referred_table"]:
4032 rec["referred_table"] = rtbl
4033 if schema is not None or owner != rschema:
4034 if dbname:
4035 rschema = dbname + "." + rschema
4036 rec["referred_schema"] = rschema
4037
4038 local_cols, remote_cols = (
4039 rec["constrained_columns"],
4040 rec["referred_columns"],
4041 )
4042
4043 local_cols.append(scol)
4044 remote_cols.append(rcol)
4045
4046 if fkeys:
4047 return list(fkeys.values())
4048 else:
4049 return self._default_or_error(
4050 connection,
4051 tablename,
4052 owner,
4053 ReflectionDefaults.foreign_keys,
4054 **kw,
4055 )