Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/base.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1200 statements  

1# dialects/mssql/base.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9""" 

10.. dialect:: mssql 

11 :name: Microsoft SQL Server 

12 :normal_support: 2012+ 

13 :best_effort: 2005+ 

14 

15.. _mssql_external_dialects: 

16 

17External Dialects 

18----------------- 

19 

20In addition to the above DBAPI layers with native SQLAlchemy support, there 

21are third-party dialects for other DBAPI layers that are compatible 

22with SQL Server. See the "External Dialects" list on the 

23:ref:`dialect_toplevel` page. 

24 

25.. _mssql_identity: 

26 

27Auto Increment Behavior / IDENTITY Columns 

28------------------------------------------ 

29 

30SQL Server provides so-called "auto incrementing" behavior using the 

31``IDENTITY`` construct, which can be placed on any single integer column in a 

32table. SQLAlchemy considers ``IDENTITY`` within its default "autoincrement" 

33behavior for an integer primary key column, described at 

34:paramref:`_schema.Column.autoincrement`. This means that by default, 

35the first integer primary key column in a :class:`_schema.Table` will be 

36considered to be the identity column - unless it is associated with a 

37:class:`.Sequence` - and will generate DDL as such:: 

38 

39 from sqlalchemy import Table, MetaData, Column, Integer 

40 

41 m = MetaData() 

42 t = Table( 

43 "t", 

44 m, 

45 Column("id", Integer, primary_key=True), 

46 Column("x", Integer), 

47 ) 

48 m.create_all(engine) 

49 

50The above example will generate DDL as: 

51 

52.. sourcecode:: sql 

53 

54 CREATE TABLE t ( 

55 id INTEGER NOT NULL IDENTITY, 

56 x INTEGER NULL, 

57 PRIMARY KEY (id) 

58 ) 

59 

60For the case where this default generation of ``IDENTITY`` is not desired, 

61specify ``False`` for the :paramref:`_schema.Column.autoincrement` flag, 

62on the first integer primary key column:: 

63 

64 m = MetaData() 

65 t = Table( 

66 "t", 

67 m, 

68 Column("id", Integer, primary_key=True, autoincrement=False), 

69 Column("x", Integer), 

70 ) 

71 m.create_all(engine) 

72 

73To add the ``IDENTITY`` keyword to a non-primary key column, specify 

74``True`` for the :paramref:`_schema.Column.autoincrement` flag on the desired 

75:class:`_schema.Column` object, and ensure that 

76:paramref:`_schema.Column.autoincrement` 

77is set to ``False`` on any integer primary key column:: 

78 

79 m = MetaData() 

80 t = Table( 

81 "t", 

82 m, 

83 Column("id", Integer, primary_key=True, autoincrement=False), 

84 Column("x", Integer, autoincrement=True), 

85 ) 

86 m.create_all(engine) 

87 

88.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct 

89 in a :class:`_schema.Column` to specify the start and increment 

90 parameters of an IDENTITY. These replace 

91 the use of the :class:`.Sequence` object in order to specify these values. 

92 

93.. deprecated:: 1.4 

94 

95 The ``mssql_identity_start`` and ``mssql_identity_increment`` parameters 

96 to :class:`_schema.Column` are deprecated and should we replaced by 

97 an :class:`_schema.Identity` object. Specifying both ways of configuring 

98 an IDENTITY will result in a compile error. 

99 These options are also no longer returned as part of the 

100 ``dialect_options`` key in :meth:`_reflection.Inspector.get_columns`. 

101 Use the information in the ``identity`` key instead. 

102 

103.. deprecated:: 1.3 

104 

105 The use of :class:`.Sequence` to specify IDENTITY characteristics is 

106 deprecated and will be removed in a future release. Please use 

107 the :class:`_schema.Identity` object parameters 

108 :paramref:`_schema.Identity.start` and 

109 :paramref:`_schema.Identity.increment`. 

110 

111.. versionchanged:: 1.4 Removed the ability to use a :class:`.Sequence` 

112 object to modify IDENTITY characteristics. :class:`.Sequence` objects 

113 now only manipulate true T-SQL SEQUENCE types. 

114 

115.. note:: 

116 

117 There can only be one IDENTITY column on the table. When using 

118 ``autoincrement=True`` to enable the IDENTITY keyword, SQLAlchemy does not 

119 guard against multiple columns specifying the option simultaneously. The 

120 SQL Server database will instead reject the ``CREATE TABLE`` statement. 

121 

122.. note:: 

123 

124 An INSERT statement which attempts to provide a value for a column that is 

125 marked with IDENTITY will be rejected by SQL Server. In order for the 

126 value to be accepted, a session-level option "SET IDENTITY_INSERT" must be 

127 enabled. The SQLAlchemy SQL Server dialect will perform this operation 

128 automatically when using a core :class:`_expression.Insert` 

129 construct; if the 

130 execution specifies a value for the IDENTITY column, the "IDENTITY_INSERT" 

131 option will be enabled for the span of that statement's invocation.However, 

132 this scenario is not high performing and should not be relied upon for 

133 normal use. If a table doesn't actually require IDENTITY behavior in its 

134 integer primary key column, the keyword should be disabled when creating 

135 the table by ensuring that ``autoincrement=False`` is set. 

136 

137Controlling "Start" and "Increment" 

138^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

139 

140Specific control over the "start" and "increment" values for 

141the ``IDENTITY`` generator are provided using the 

142:paramref:`_schema.Identity.start` and :paramref:`_schema.Identity.increment` 

143parameters passed to the :class:`_schema.Identity` object:: 

144 

145 from sqlalchemy import Table, Integer, Column, Identity 

146 

147 test = Table( 

148 "test", 

149 metadata, 

150 Column( 

151 "id", Integer, primary_key=True, Identity(start=100, increment=10) 

152 ), 

153 Column("name", String(20)), 

154 ) 

155 

156The CREATE TABLE for the above :class:`_schema.Table` object would be: 

157 

158.. sourcecode:: sql 

159 

160 CREATE TABLE test ( 

161 id INTEGER NOT NULL IDENTITY(100,10) PRIMARY KEY, 

162 name VARCHAR(20) NULL, 

163 ) 

164 

165.. note:: 

166 

167 The :class:`_schema.Identity` object supports many other parameter in 

168 addition to ``start`` and ``increment``. These are not supported by 

169 SQL Server and will be ignored when generating the CREATE TABLE ddl. 

170 

171.. versionchanged:: 1.3.19 The :class:`_schema.Identity` object is 

172 now used to affect the 

173 ``IDENTITY`` generator for a :class:`_schema.Column` under SQL Server. 

174 Previously, the :class:`.Sequence` object was used. As SQL Server now 

175 supports real sequences as a separate construct, :class:`.Sequence` will be 

176 functional in the normal way starting from SQLAlchemy version 1.4. 

177 

178 

179Using IDENTITY with Non-Integer numeric types 

180^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

181 

182SQL Server also allows ``IDENTITY`` to be used with ``NUMERIC`` columns. To 

183implement this pattern smoothly in SQLAlchemy, the primary datatype of the 

184column should remain as ``Integer``, however the underlying implementation 

185type deployed to the SQL Server database can be specified as ``Numeric`` using 

186:meth:`.TypeEngine.with_variant`:: 

187 

188 from sqlalchemy import Column 

189 from sqlalchemy import Integer 

190 from sqlalchemy import Numeric 

191 from sqlalchemy import String 

192 from sqlalchemy.ext.declarative import declarative_base 

193 

194 Base = declarative_base() 

195 

196 

197 class TestTable(Base): 

198 __tablename__ = "test" 

199 id = Column( 

200 Integer().with_variant(Numeric(10, 0), "mssql"), 

201 primary_key=True, 

202 autoincrement=True, 

203 ) 

204 name = Column(String) 

205 

206In the above example, ``Integer().with_variant()`` provides clear usage 

207information that accurately describes the intent of the code. The general 

208restriction that ``autoincrement`` only applies to ``Integer`` is established 

209at the metadata level and not at the per-dialect level. 

210 

211When using the above pattern, the primary key identifier that comes back from 

212the insertion of a row, which is also the value that would be assigned to an 

213ORM object such as ``TestTable`` above, will be an instance of ``Decimal()`` 

214and not ``int`` when using SQL Server. The numeric return type of the 

215:class:`_types.Numeric` type can be changed to return floats by passing False 

216to :paramref:`_types.Numeric.asdecimal`. To normalize the return type of the 

217above ``Numeric(10, 0)`` to return Python ints (which also support "long" 

218integer values in Python 3), use :class:`_types.TypeDecorator` as follows:: 

219 

220 from sqlalchemy import TypeDecorator 

221 

222 

223 class NumericAsInteger(TypeDecorator): 

224 "normalize floating point return values into ints" 

225 

226 impl = Numeric(10, 0, asdecimal=False) 

227 cache_ok = True 

228 

229 def process_result_value(self, value, dialect): 

230 if value is not None: 

231 value = int(value) 

232 return value 

233 

234 

235 class TestTable(Base): 

236 __tablename__ = "test" 

237 id = Column( 

238 Integer().with_variant(NumericAsInteger, "mssql"), 

239 primary_key=True, 

240 autoincrement=True, 

241 ) 

242 name = Column(String) 

243 

244.. _mssql_insert_behavior: 

245 

246INSERT behavior 

247^^^^^^^^^^^^^^^^ 

248 

249Handling of the ``IDENTITY`` column at INSERT time involves two key 

250techniques. The most common is being able to fetch the "last inserted value" 

251for a given ``IDENTITY`` column, a process which SQLAlchemy performs 

252implicitly in many cases, most importantly within the ORM. 

253 

254The process for fetching this value has several variants: 

255 

256* In the vast majority of cases, RETURNING is used in conjunction with INSERT 

257 statements on SQL Server in order to get newly generated primary key values: 

258 

259 .. sourcecode:: sql 

260 

261 INSERT INTO t (x) OUTPUT inserted.id VALUES (?) 

262 

263 As of SQLAlchemy 2.0, the :ref:`engine_insertmanyvalues` feature is also 

264 used by default to optimize many-row INSERT statements; for SQL Server 

265 the feature takes place for both RETURNING and-non RETURNING 

266 INSERT statements. 

267 

268 .. versionchanged:: 2.0.10 The :ref:`engine_insertmanyvalues` feature for 

269 SQL Server was temporarily disabled for SQLAlchemy version 2.0.9 due to 

270 issues with row ordering. As of 2.0.10 the feature is re-enabled, with 

271 special case handling for the unit of work's requirement for RETURNING to 

272 be ordered. 

273 

274* When RETURNING is not available or has been disabled via 

275 ``implicit_returning=False``, either the ``scope_identity()`` function or 

276 the ``@@identity`` variable is used; behavior varies by backend: 

277 

278 * when using PyODBC, the phrase ``; select scope_identity()`` will be 

279 appended to the end of the INSERT statement; a second result set will be 

280 fetched in order to receive the value. Given a table as:: 

281 

282 t = Table( 

283 "t", 

284 metadata, 

285 Column("id", Integer, primary_key=True), 

286 Column("x", Integer), 

287 implicit_returning=False, 

288 ) 

289 

290 an INSERT will look like: 

291 

292 .. sourcecode:: sql 

293 

294 INSERT INTO t (x) VALUES (?); select scope_identity() 

295 

296 * Other dialects such as pymssql will call upon 

297 ``SELECT scope_identity() AS lastrowid`` subsequent to an INSERT 

298 statement. If the flag ``use_scope_identity=False`` is passed to 

299 :func:`_sa.create_engine`, 

300 the statement ``SELECT @@identity AS lastrowid`` 

301 is used instead. 

302 

303A table that contains an ``IDENTITY`` column will prohibit an INSERT statement 

304that refers to the identity column explicitly. The SQLAlchemy dialect will 

305detect when an INSERT construct, created using a core 

306:func:`_expression.insert` 

307construct (not a plain string SQL), refers to the identity column, and 

308in this case will emit ``SET IDENTITY_INSERT ON`` prior to the insert 

309statement proceeding, and ``SET IDENTITY_INSERT OFF`` subsequent to the 

310execution. Given this example:: 

311 

312 m = MetaData() 

313 t = Table( 

314 "t", m, Column("id", Integer, primary_key=True), Column("x", Integer) 

315 ) 

316 m.create_all(engine) 

317 

318 with engine.begin() as conn: 

319 conn.execute(t.insert(), {"id": 1, "x": 1}, {"id": 2, "x": 2}) 

320 

321The above column will be created with IDENTITY, however the INSERT statement 

322we emit is specifying explicit values. In the echo output we can see 

323how SQLAlchemy handles this: 

324 

325.. sourcecode:: sql 

326 

327 CREATE TABLE t ( 

328 id INTEGER NOT NULL IDENTITY(1,1), 

329 x INTEGER NULL, 

330 PRIMARY KEY (id) 

331 ) 

332 

333 COMMIT 

334 SET IDENTITY_INSERT t ON 

335 INSERT INTO t (id, x) VALUES (?, ?) 

336 ((1, 1), (2, 2)) 

337 SET IDENTITY_INSERT t OFF 

338 COMMIT 

339 

340 

341 

342This is an auxiliary use case suitable for testing and bulk insert scenarios. 

343 

344SEQUENCE support 

345---------------- 

346 

347The :class:`.Sequence` object creates "real" sequences, i.e., 

348``CREATE SEQUENCE``: 

349 

350.. sourcecode:: pycon+sql 

351 

352 >>> from sqlalchemy import Sequence 

353 >>> from sqlalchemy.schema import CreateSequence 

354 >>> from sqlalchemy.dialects import mssql 

355 >>> print( 

356 ... CreateSequence(Sequence("my_seq", start=1)).compile( 

357 ... dialect=mssql.dialect() 

358 ... ) 

359 ... ) 

360 {printsql}CREATE SEQUENCE my_seq START WITH 1 

361 

362For integer primary key generation, SQL Server's ``IDENTITY`` construct should 

363generally be preferred vs. sequence. 

364 

365.. tip:: 

366 

367 The default start value for T-SQL is ``-2**63`` instead of 1 as 

368 in most other SQL databases. Users should explicitly set the 

369 :paramref:`.Sequence.start` to 1 if that's the expected default:: 

370 

371 seq = Sequence("my_sequence", start=1) 

372 

373.. versionadded:: 1.4 added SQL Server support for :class:`.Sequence` 

374 

375.. versionchanged:: 2.0 The SQL Server dialect will no longer implicitly 

376 render "START WITH 1" for ``CREATE SEQUENCE``, which was the behavior 

377 first implemented in version 1.4. 

378 

379MAX on VARCHAR / NVARCHAR 

380------------------------- 

381 

382SQL Server supports the special string "MAX" within the 

383:class:`_types.VARCHAR` and :class:`_types.NVARCHAR` datatypes, 

384to indicate "maximum length possible". The dialect currently handles this as 

385a length of "None" in the base type, rather than supplying a 

386dialect-specific version of these types, so that a base type 

387specified such as ``VARCHAR(None)`` can assume "unlengthed" behavior on 

388more than one backend without using dialect-specific types. 

389 

390To build a SQL Server VARCHAR or NVARCHAR with MAX length, use None:: 

391 

392 my_table = Table( 

393 "my_table", 

394 metadata, 

395 Column("my_data", VARCHAR(None)), 

396 Column("my_n_data", NVARCHAR(None)), 

397 ) 

398 

399Collation Support 

400----------------- 

401 

402Character collations are supported by the base string types, 

403specified by the string argument "collation":: 

404 

405 from sqlalchemy import VARCHAR 

406 

407 Column("login", VARCHAR(32, collation="Latin1_General_CI_AS")) 

408 

409When such a column is associated with a :class:`_schema.Table`, the 

410CREATE TABLE statement for this column will yield: 

411 

412.. sourcecode:: sql 

413 

414 login VARCHAR(32) COLLATE Latin1_General_CI_AS NULL 

415 

416LIMIT/OFFSET Support 

417-------------------- 

418 

419MSSQL has added support for LIMIT / OFFSET as of SQL Server 2012, via the 

420"OFFSET n ROWS" and "FETCH NEXT n ROWS" clauses. SQLAlchemy supports these 

421syntaxes automatically if SQL Server 2012 or greater is detected. 

422 

423.. versionchanged:: 1.4 support added for SQL Server "OFFSET n ROWS" and 

424 "FETCH NEXT n ROWS" syntax. 

425 

426For statements that specify only LIMIT and no OFFSET, all versions of SQL 

427Server support the TOP keyword. This syntax is used for all SQL Server 

428versions when no OFFSET clause is present. A statement such as:: 

429 

430 select(some_table).limit(5) 

431 

432will render similarly to: 

433 

434.. sourcecode:: sql 

435 

436 SELECT TOP 5 col1, col2.. FROM table 

437 

438For versions of SQL Server prior to SQL Server 2012, a statement that uses 

439LIMIT and OFFSET, or just OFFSET alone, will be rendered using the 

440``ROW_NUMBER()`` window function. A statement such as:: 

441 

442 select(some_table).order_by(some_table.c.col3).limit(5).offset(10) 

443 

444will render similarly to: 

445 

446.. sourcecode:: sql 

447 

448 SELECT anon_1.col1, anon_1.col2 FROM (SELECT col1, col2, 

449 ROW_NUMBER() OVER (ORDER BY col3) AS 

450 mssql_rn FROM table WHERE t.x = :x_1) AS 

451 anon_1 WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1 

452 

453Note that when using LIMIT and/or OFFSET, whether using the older 

454or newer SQL Server syntaxes, the statement must have an ORDER BY as well, 

455else a :class:`.CompileError` is raised. 

456 

457.. _mssql_comment_support: 

458 

459DDL Comment Support 

460-------------------- 

461 

462Comment support, which includes DDL rendering for attributes such as 

463:paramref:`_schema.Table.comment` and :paramref:`_schema.Column.comment`, as 

464well as the ability to reflect these comments, is supported assuming a 

465supported version of SQL Server is in use. If a non-supported version such as 

466Azure Synapse is detected at first-connect time (based on the presence 

467of the ``fn_listextendedproperty`` SQL function), comment support including 

468rendering and table-comment reflection is disabled, as both features rely upon 

469SQL Server stored procedures and functions that are not available on all 

470backend types. 

471 

472To force comment support to be on or off, bypassing autodetection, set the 

473parameter ``supports_comments`` within :func:`_sa.create_engine`:: 

474 

475 e = create_engine("mssql+pyodbc://u:p@dsn", supports_comments=False) 

476 

477.. versionadded:: 2.0 Added support for table and column comments for 

478 the SQL Server dialect, including DDL generation and reflection. 

479 

480.. _mssql_isolation_level: 

481 

482Transaction Isolation Level 

483--------------------------- 

484 

485All SQL Server dialects support setting of transaction isolation level 

486both via a dialect-specific parameter 

487:paramref:`_sa.create_engine.isolation_level` 

488accepted by :func:`_sa.create_engine`, 

489as well as the :paramref:`.Connection.execution_options.isolation_level` 

490argument as passed to 

491:meth:`_engine.Connection.execution_options`. 

492This feature works by issuing the 

493command ``SET TRANSACTION ISOLATION LEVEL <level>`` for 

494each new connection. 

495 

496To set isolation level using :func:`_sa.create_engine`:: 

497 

498 engine = create_engine( 

499 "mssql+pyodbc://scott:tiger@ms_2008", isolation_level="REPEATABLE READ" 

500 ) 

501 

502To set using per-connection execution options:: 

503 

504 connection = engine.connect() 

505 connection = connection.execution_options(isolation_level="READ COMMITTED") 

506 

507Valid values for ``isolation_level`` include: 

508 

509* ``AUTOCOMMIT`` - pyodbc / pymssql-specific 

510* ``READ COMMITTED`` 

511* ``READ UNCOMMITTED`` 

512* ``REPEATABLE READ`` 

513* ``SERIALIZABLE`` 

514* ``SNAPSHOT`` - specific to SQL Server 

515 

516There are also more options for isolation level configurations, such as 

517"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply 

518different isolation level settings. See the discussion at 

519:ref:`dbapi_autocommit` for background. 

520 

521.. seealso:: 

522 

523 :ref:`dbapi_autocommit` 

524 

525.. _mssql_reset_on_return: 

526 

527Temporary Table / Resource Reset for Connection Pooling 

528------------------------------------------------------- 

529 

530The :class:`.QueuePool` connection pool implementation used 

531by the SQLAlchemy :class:`.Engine` object includes 

532:ref:`reset on return <pool_reset_on_return>` behavior that will invoke 

533the DBAPI ``.rollback()`` method when connections are returned to the pool. 

534While this rollback will clear out the immediate state used by the previous 

535transaction, it does not cover a wider range of session-level state, including 

536temporary tables as well as other server state such as prepared statement 

537handles and statement caches. An undocumented SQL Server procedure known 

538as ``sp_reset_connection`` is known to be a workaround for this issue which 

539will reset most of the session state that builds up on a connection, including 

540temporary tables. 

541 

542To install ``sp_reset_connection`` as the means of performing reset-on-return, 

543the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated in the 

544example below. The :paramref:`_sa.create_engine.pool_reset_on_return` parameter 

545is set to ``None`` so that the custom scheme can replace the default behavior 

546completely. The custom hook implementation calls ``.rollback()`` in any case, 

547as it's usually important that the DBAPI's own tracking of commit/rollback 

548will remain consistent with the state of the transaction:: 

549 

550 from sqlalchemy import create_engine 

551 from sqlalchemy import event 

552 

553 mssql_engine = create_engine( 

554 "mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server", 

555 # disable default reset-on-return scheme 

556 pool_reset_on_return=None, 

557 ) 

558 

559 

560 @event.listens_for(mssql_engine, "reset") 

561 def _reset_mssql(dbapi_connection, connection_record, reset_state): 

562 if not reset_state.terminate_only: 

563 dbapi_connection.execute("{call sys.sp_reset_connection}") 

564 

565 # so that the DBAPI itself knows that the connection has been 

566 # reset 

567 dbapi_connection.rollback() 

568 

569.. versionchanged:: 2.0.0b3 Added additional state arguments to 

570 the :meth:`.PoolEvents.reset` event and additionally ensured the event 

571 is invoked for all "reset" occurrences, so that it's appropriate 

572 as a place for custom "reset" handlers. Previous schemes which 

573 use the :meth:`.PoolEvents.checkin` handler remain usable as well. 

574 

575.. seealso:: 

576 

577 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation 

578 

579Nullability 

580----------- 

581MSSQL has support for three levels of column nullability. The default 

582nullability allows nulls and is explicit in the CREATE TABLE 

583construct: 

584 

585.. sourcecode:: sql 

586 

587 name VARCHAR(20) NULL 

588 

589If ``nullable=None`` is specified then no specification is made. In 

590other words the database's configured default is used. This will 

591render: 

592 

593.. sourcecode:: sql 

594 

595 name VARCHAR(20) 

596 

597If ``nullable`` is ``True`` or ``False`` then the column will be 

598``NULL`` or ``NOT NULL`` respectively. 

599 

600Date / Time Handling 

601-------------------- 

602DATE and TIME are supported. Bind parameters are converted 

603to datetime.datetime() objects as required by most MSSQL drivers, 

604and results are processed from strings if needed. 

605The DATE and TIME types are not available for MSSQL 2005 and 

606previous - if a server version below 2008 is detected, DDL 

607for these types will be issued as DATETIME. 

608 

609.. _mssql_large_type_deprecation: 

610 

611Large Text/Binary Type Deprecation 

612---------------------------------- 

613 

614Per 

615`SQL Server 2012/2014 Documentation <https://technet.microsoft.com/en-us/library/ms187993.aspx>`_, 

616the ``NTEXT``, ``TEXT`` and ``IMAGE`` datatypes are to be removed from SQL 

617Server in a future release. SQLAlchemy normally relates these types to the 

618:class:`.UnicodeText`, :class:`_expression.TextClause` and 

619:class:`.LargeBinary` datatypes. 

620 

621In order to accommodate this change, a new flag ``deprecate_large_types`` 

622is added to the dialect, which will be automatically set based on detection 

623of the server version in use, if not otherwise set by the user. The 

624behavior of this flag is as follows: 

625 

626* When this flag is ``True``, the :class:`.UnicodeText`, 

627 :class:`_expression.TextClause` and 

628 :class:`.LargeBinary` datatypes, when used to render DDL, will render the 

629 types ``NVARCHAR(max)``, ``VARCHAR(max)``, and ``VARBINARY(max)``, 

630 respectively. This is a new behavior as of the addition of this flag. 

631 

632* When this flag is ``False``, the :class:`.UnicodeText`, 

633 :class:`_expression.TextClause` and 

634 :class:`.LargeBinary` datatypes, when used to render DDL, will render the 

635 types ``NTEXT``, ``TEXT``, and ``IMAGE``, 

636 respectively. This is the long-standing behavior of these types. 

637 

638* The flag begins with the value ``None``, before a database connection is 

639 established. If the dialect is used to render DDL without the flag being 

640 set, it is interpreted the same as ``False``. 

641 

642* On first connection, the dialect detects if SQL Server version 2012 or 

643 greater is in use; if the flag is still at ``None``, it sets it to ``True`` 

644 or ``False`` based on whether 2012 or greater is detected. 

645 

646* The flag can be set to either ``True`` or ``False`` when the dialect 

647 is created, typically via :func:`_sa.create_engine`:: 

648 

649 eng = create_engine( 

650 "mssql+pymssql://user:pass@host/db", deprecate_large_types=True 

651 ) 

652 

653* Complete control over whether the "old" or "new" types are rendered is 

654 available in all SQLAlchemy versions by using the UPPERCASE type objects 

655 instead: :class:`_types.NVARCHAR`, :class:`_types.VARCHAR`, 

656 :class:`_types.VARBINARY`, :class:`_types.TEXT`, :class:`_mssql.NTEXT`, 

657 :class:`_mssql.IMAGE` 

658 will always remain fixed and always output exactly that 

659 type. 

660 

661.. _multipart_schema_names: 

662 

663Multipart Schema Names 

664---------------------- 

665 

666SQL Server schemas sometimes require multiple parts to their "schema" 

667qualifier, that is, including the database name and owner name as separate 

668tokens, such as ``mydatabase.dbo.some_table``. These multipart names can be set 

669at once using the :paramref:`_schema.Table.schema` argument of 

670:class:`_schema.Table`:: 

671 

672 Table( 

673 "some_table", 

674 metadata, 

675 Column("q", String(50)), 

676 schema="mydatabase.dbo", 

677 ) 

678 

679When performing operations such as table or component reflection, a schema 

680argument that contains a dot will be split into separate 

681"database" and "owner" components in order to correctly query the SQL 

682Server information schema tables, as these two values are stored separately. 

683Additionally, when rendering the schema name for DDL or SQL, the two 

684components will be quoted separately for case sensitive names and other 

685special characters. Given an argument as below:: 

686 

687 Table( 

688 "some_table", 

689 metadata, 

690 Column("q", String(50)), 

691 schema="MyDataBase.dbo", 

692 ) 

693 

694The above schema would be rendered as ``[MyDataBase].dbo``, and also in 

695reflection, would be reflected using "dbo" as the owner and "MyDataBase" 

696as the database name. 

697 

698To control how the schema name is broken into database / owner, 

699specify brackets (which in SQL Server are quoting characters) in the name. 

700Below, the "owner" will be considered as ``MyDataBase.dbo`` and the 

701"database" will be None:: 

702 

703 Table( 

704 "some_table", 

705 metadata, 

706 Column("q", String(50)), 

707 schema="[MyDataBase.dbo]", 

708 ) 

709 

710To individually specify both database and owner name with special characters 

711or embedded dots, use two sets of brackets:: 

712 

713 Table( 

714 "some_table", 

715 metadata, 

716 Column("q", String(50)), 

717 schema="[MyDataBase.Period].[MyOwner.Dot]", 

718 ) 

719 

720.. versionchanged:: 1.2 the SQL Server dialect now treats brackets as 

721 identifier delimiters splitting the schema into separate database 

722 and owner tokens, to allow dots within either name itself. 

723 

724.. _legacy_schema_rendering: 

725 

726Legacy Schema Mode 

727------------------ 

728 

729Very old versions of the MSSQL dialect introduced the behavior such that a 

730schema-qualified table would be auto-aliased when used in a 

731SELECT statement; given a table:: 

732 

733 account_table = Table( 

734 "account", 

735 metadata, 

736 Column("id", Integer, primary_key=True), 

737 Column("info", String(100)), 

738 schema="customer_schema", 

739 ) 

740 

741this legacy mode of rendering would assume that "customer_schema.account" 

742would not be accepted by all parts of the SQL statement, as illustrated 

743below: 

744 

745.. sourcecode:: pycon+sql 

746 

747 >>> eng = create_engine("mssql+pymssql://mydsn", legacy_schema_aliasing=True) 

748 >>> print(account_table.select().compile(eng)) 

749 {printsql}SELECT account_1.id, account_1.info 

750 FROM customer_schema.account AS account_1 

751 

752This mode of behavior is now off by default, as it appears to have served 

753no purpose; however in the case that legacy applications rely upon it, 

754it is available using the ``legacy_schema_aliasing`` argument to 

755:func:`_sa.create_engine` as illustrated above. 

756 

757.. deprecated:: 1.4 

758 

759 The ``legacy_schema_aliasing`` flag is now 

760 deprecated and will be removed in a future release. 

761 

762.. _mssql_indexes: 

763 

764Clustered Index Support 

765----------------------- 

766 

767The MSSQL dialect supports clustered indexes (and primary keys) via the 

768``mssql_clustered`` option. This option is available to :class:`.Index`, 

769:class:`.UniqueConstraint`. and :class:`.PrimaryKeyConstraint`. 

770For indexes this option can be combined with the ``mssql_columnstore`` one 

771to create a clustered columnstore index. 

772 

773To generate a clustered index:: 

774 

775 Index("my_index", table.c.x, mssql_clustered=True) 

776 

777which renders the index as ``CREATE CLUSTERED INDEX my_index ON table (x)``. 

778 

779To generate a clustered primary key use:: 

780 

781 Table( 

782 "my_table", 

783 metadata, 

784 Column("x", ...), 

785 Column("y", ...), 

786 PrimaryKeyConstraint("x", "y", mssql_clustered=True), 

787 ) 

788 

789which will render the table, for example, as: 

790 

791.. sourcecode:: sql 

792 

793 CREATE TABLE my_table ( 

794 x INTEGER NOT NULL, 

795 y INTEGER NOT NULL, 

796 PRIMARY KEY CLUSTERED (x, y) 

797 ) 

798 

799Similarly, we can generate a clustered unique constraint using:: 

800 

801 Table( 

802 "my_table", 

803 metadata, 

804 Column("x", ...), 

805 Column("y", ...), 

806 PrimaryKeyConstraint("x"), 

807 UniqueConstraint("y", mssql_clustered=True), 

808 ) 

809 

810To explicitly request a non-clustered primary key (for example, when 

811a separate clustered index is desired), use:: 

812 

813 Table( 

814 "my_table", 

815 metadata, 

816 Column("x", ...), 

817 Column("y", ...), 

818 PrimaryKeyConstraint("x", "y", mssql_clustered=False), 

819 ) 

820 

821which will render the table, for example, as: 

822 

823.. sourcecode:: sql 

824 

825 CREATE TABLE my_table ( 

826 x INTEGER NOT NULL, 

827 y INTEGER NOT NULL, 

828 PRIMARY KEY NONCLUSTERED (x, y) 

829 ) 

830 

831Columnstore Index Support 

832------------------------- 

833 

834The MSSQL dialect supports columnstore indexes via the ``mssql_columnstore`` 

835option. This option is available to :class:`.Index`. It be combined with 

836the ``mssql_clustered`` option to create a clustered columnstore index. 

837 

838To generate a columnstore index:: 

839 

840 Index("my_index", table.c.x, mssql_columnstore=True) 

841 

842which renders the index as ``CREATE COLUMNSTORE INDEX my_index ON table (x)``. 

843 

844To generate a clustered columnstore index provide no columns:: 

845 

846 idx = Index("my_index", mssql_clustered=True, mssql_columnstore=True) 

847 # required to associate the index with the table 

848 table.append_constraint(idx) 

849 

850the above renders the index as 

851``CREATE CLUSTERED COLUMNSTORE INDEX my_index ON table``. 

852 

853.. versionadded:: 2.0.18 

854 

855MSSQL-Specific Index Options 

856----------------------------- 

857 

858In addition to clustering, the MSSQL dialect supports other special options 

859for :class:`.Index`. 

860 

861INCLUDE 

862^^^^^^^ 

863 

864The ``mssql_include`` option renders INCLUDE(colname) for the given string 

865names:: 

866 

867 Index("my_index", table.c.x, mssql_include=["y"]) 

868 

869would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)`` 

870 

871.. _mssql_index_where: 

872 

873Filtered Indexes 

874^^^^^^^^^^^^^^^^ 

875 

876The ``mssql_where`` option renders WHERE(condition) for the given string 

877names:: 

878 

879 Index("my_index", table.c.x, mssql_where=table.c.x > 10) 

880 

881would render the index as ``CREATE INDEX my_index ON table (x) WHERE x > 10``. 

882 

883.. versionadded:: 1.3.4 

884 

885Index ordering 

886^^^^^^^^^^^^^^ 

887 

888Index ordering is available via functional expressions, such as:: 

889 

890 Index("my_index", table.c.x.desc()) 

891 

892would render the index as ``CREATE INDEX my_index ON table (x DESC)`` 

893 

894.. seealso:: 

895 

896 :ref:`schema_indexes_functional` 

897 

898Compatibility Levels 

899-------------------- 

900MSSQL supports the notion of setting compatibility levels at the 

901database level. This allows, for instance, to run a database that 

902is compatible with SQL2000 while running on a SQL2005 database 

903server. ``server_version_info`` will always return the database 

904server version information (in this case SQL2005) and not the 

905compatibility level information. Because of this, if running under 

906a backwards compatibility mode SQLAlchemy may attempt to use T-SQL 

907statements that are unable to be parsed by the database server. 

908 

909.. _mssql_triggers: 

910 

911Triggers 

912-------- 

913 

914SQLAlchemy by default uses OUTPUT INSERTED to get at newly 

915generated primary key values via IDENTITY columns or other 

916server side defaults. MS-SQL does not 

917allow the usage of OUTPUT INSERTED on tables that have triggers. 

918To disable the usage of OUTPUT INSERTED on a per-table basis, 

919specify ``implicit_returning=False`` for each :class:`_schema.Table` 

920which has triggers:: 

921 

922 Table( 

923 "mytable", 

924 metadata, 

925 Column("id", Integer, primary_key=True), 

926 # ..., 

927 implicit_returning=False, 

928 ) 

929 

930Declarative form:: 

931 

932 class MyClass(Base): 

933 # ... 

934 __table_args__ = {"implicit_returning": False} 

935 

936.. _mssql_rowcount_versioning: 

937 

938Rowcount Support / ORM Versioning 

939--------------------------------- 

940 

941The SQL Server drivers may have limited ability to return the number 

942of rows updated from an UPDATE or DELETE statement. 

943 

944As of this writing, the PyODBC driver is not able to return a rowcount when 

945OUTPUT INSERTED is used. Previous versions of SQLAlchemy therefore had 

946limitations for features such as the "ORM Versioning" feature that relies upon 

947accurate rowcounts in order to match version numbers with matched rows. 

948 

949SQLAlchemy 2.0 now retrieves the "rowcount" manually for these particular use 

950cases based on counting the rows that arrived back within RETURNING; so while 

951the driver still has this limitation, the ORM Versioning feature is no longer 

952impacted by it. As of SQLAlchemy 2.0.5, ORM versioning has been fully 

953re-enabled for the pyodbc driver. 

954 

955.. versionchanged:: 2.0.5 ORM versioning support is restored for the pyodbc 

956 driver. Previously, a warning would be emitted during ORM flush that 

957 versioning was not supported. 

958 

959 

960Enabling Snapshot Isolation 

961--------------------------- 

962 

963SQL Server has a default transaction 

964isolation mode that locks entire tables, and causes even mildly concurrent 

965applications to have long held locks and frequent deadlocks. 

966Enabling snapshot isolation for the database as a whole is recommended 

967for modern levels of concurrency support. This is accomplished via the 

968following ALTER DATABASE commands executed at the SQL prompt: 

969 

970.. sourcecode:: sql 

971 

972 ALTER DATABASE MyDatabase SET ALLOW_SNAPSHOT_ISOLATION ON 

973 

974 ALTER DATABASE MyDatabase SET READ_COMMITTED_SNAPSHOT ON 

975 

976Background on SQL Server snapshot isolation is available at 

977https://msdn.microsoft.com/en-us/library/ms175095.aspx. 

978 

979""" # noqa 

980 

981from __future__ import annotations 

982 

983import codecs 

984import datetime 

985import operator 

986import re 

987from typing import Any 

988from typing import overload 

989from typing import TYPE_CHECKING 

990from uuid import UUID as _python_UUID 

991 

992from . import information_schema as ischema 

993from .json import JSON 

994from .json import JSONIndexType 

995from .json import JSONPathType 

996from ... import exc 

997from ... import Identity 

998from ... import schema as sa_schema 

999from ... import Sequence 

1000from ... import sql 

1001from ... import text 

1002from ... import util 

1003from ...engine import cursor as _cursor 

1004from ...engine import default 

1005from ...engine import reflection 

1006from ...engine.reflection import ReflectionDefaults 

1007from ...sql import coercions 

1008from ...sql import compiler 

1009from ...sql import elements 

1010from ...sql import expression 

1011from ...sql import func 

1012from ...sql import quoted_name 

1013from ...sql import roles 

1014from ...sql import sqltypes 

1015from ...sql import try_cast as try_cast # noqa: F401 

1016from ...sql import util as sql_util 

1017from ...sql._typing import is_sql_compiler 

1018from ...sql.compiler import InsertmanyvaluesSentinelOpts 

1019from ...sql.elements import TryCast as TryCast # noqa: F401 

1020from ...types import BIGINT 

1021from ...types import BINARY 

1022from ...types import CHAR 

1023from ...types import DATE 

1024from ...types import DATETIME 

1025from ...types import DECIMAL 

1026from ...types import FLOAT 

1027from ...types import INTEGER 

1028from ...types import NCHAR 

1029from ...types import NUMERIC 

1030from ...types import NVARCHAR 

1031from ...types import SMALLINT 

1032from ...types import TEXT 

1033from ...types import VARCHAR 

1034from ...util import update_wrapper 

1035from ...util.typing import Literal 

1036 

1037if TYPE_CHECKING: 

1038 from ...sql.ddl import DropIndex 

1039 from ...sql.dml import DMLState 

1040 from ...sql.selectable import TableClause 

1041 

1042# https://sqlserverbuilds.blogspot.com/ 

1043MS_2017_VERSION = (14,) 

1044MS_2016_VERSION = (13,) 

1045MS_2014_VERSION = (12,) 

1046MS_2012_VERSION = (11,) 

1047MS_2008_VERSION = (10,) 

1048MS_2005_VERSION = (9,) 

1049MS_2000_VERSION = (8,) 

1050 

1051RESERVED_WORDS = { 

1052 "add", 

1053 "all", 

1054 "alter", 

1055 "and", 

1056 "any", 

1057 "as", 

1058 "asc", 

1059 "authorization", 

1060 "backup", 

1061 "begin", 

1062 "between", 

1063 "break", 

1064 "browse", 

1065 "bulk", 

1066 "by", 

1067 "cascade", 

1068 "case", 

1069 "check", 

1070 "checkpoint", 

1071 "close", 

1072 "clustered", 

1073 "coalesce", 

1074 "collate", 

1075 "column", 

1076 "commit", 

1077 "compute", 

1078 "constraint", 

1079 "contains", 

1080 "containstable", 

1081 "continue", 

1082 "convert", 

1083 "create", 

1084 "cross", 

1085 "current", 

1086 "current_date", 

1087 "current_time", 

1088 "current_timestamp", 

1089 "current_user", 

1090 "cursor", 

1091 "database", 

1092 "dbcc", 

1093 "deallocate", 

1094 "declare", 

1095 "default", 

1096 "delete", 

1097 "deny", 

1098 "desc", 

1099 "disk", 

1100 "distinct", 

1101 "distributed", 

1102 "double", 

1103 "drop", 

1104 "dump", 

1105 "else", 

1106 "end", 

1107 "errlvl", 

1108 "escape", 

1109 "except", 

1110 "exec", 

1111 "execute", 

1112 "exists", 

1113 "exit", 

1114 "external", 

1115 "fetch", 

1116 "file", 

1117 "fillfactor", 

1118 "for", 

1119 "foreign", 

1120 "freetext", 

1121 "freetexttable", 

1122 "from", 

1123 "full", 

1124 "function", 

1125 "goto", 

1126 "grant", 

1127 "group", 

1128 "having", 

1129 "holdlock", 

1130 "identity", 

1131 "identity_insert", 

1132 "identitycol", 

1133 "if", 

1134 "in", 

1135 "index", 

1136 "inner", 

1137 "insert", 

1138 "intersect", 

1139 "into", 

1140 "is", 

1141 "join", 

1142 "key", 

1143 "kill", 

1144 "left", 

1145 "like", 

1146 "lineno", 

1147 "load", 

1148 "merge", 

1149 "national", 

1150 "nocheck", 

1151 "nonclustered", 

1152 "not", 

1153 "null", 

1154 "nullif", 

1155 "of", 

1156 "off", 

1157 "offsets", 

1158 "on", 

1159 "open", 

1160 "opendatasource", 

1161 "openquery", 

1162 "openrowset", 

1163 "openxml", 

1164 "option", 

1165 "or", 

1166 "order", 

1167 "outer", 

1168 "over", 

1169 "percent", 

1170 "pivot", 

1171 "plan", 

1172 "precision", 

1173 "primary", 

1174 "print", 

1175 "proc", 

1176 "procedure", 

1177 "public", 

1178 "raiserror", 

1179 "read", 

1180 "readtext", 

1181 "reconfigure", 

1182 "references", 

1183 "replication", 

1184 "restore", 

1185 "restrict", 

1186 "return", 

1187 "revert", 

1188 "revoke", 

1189 "right", 

1190 "rollback", 

1191 "rowcount", 

1192 "rowguidcol", 

1193 "rule", 

1194 "save", 

1195 "schema", 

1196 "securityaudit", 

1197 "select", 

1198 "session_user", 

1199 "set", 

1200 "setuser", 

1201 "shutdown", 

1202 "some", 

1203 "statistics", 

1204 "system_user", 

1205 "table", 

1206 "tablesample", 

1207 "textsize", 

1208 "then", 

1209 "to", 

1210 "top", 

1211 "tran", 

1212 "transaction", 

1213 "trigger", 

1214 "truncate", 

1215 "tsequal", 

1216 "union", 

1217 "unique", 

1218 "unpivot", 

1219 "update", 

1220 "updatetext", 

1221 "use", 

1222 "user", 

1223 "values", 

1224 "varying", 

1225 "view", 

1226 "waitfor", 

1227 "when", 

1228 "where", 

1229 "while", 

1230 "with", 

1231 "writetext", 

1232} 

1233 

1234 

1235class REAL(sqltypes.REAL): 

1236 """the SQL Server REAL datatype.""" 

1237 

1238 def __init__(self, **kw): 

1239 # REAL is a synonym for FLOAT(24) on SQL server. 

1240 # it is only accepted as the word "REAL" in DDL, the numeric 

1241 # precision value is not allowed to be present 

1242 kw.setdefault("precision", 24) 

1243 super().__init__(**kw) 

1244 

1245 

1246class DOUBLE_PRECISION(sqltypes.DOUBLE_PRECISION): 

1247 """the SQL Server DOUBLE PRECISION datatype. 

1248 

1249 .. versionadded:: 2.0.11 

1250 

1251 """ 

1252 

1253 def __init__(self, **kw): 

1254 # DOUBLE PRECISION is a synonym for FLOAT(53) on SQL server. 

1255 # it is only accepted as the word "DOUBLE PRECISION" in DDL, 

1256 # the numeric precision value is not allowed to be present 

1257 kw.setdefault("precision", 53) 

1258 super().__init__(**kw) 

1259 

1260 

1261class TINYINT(sqltypes.Integer): 

1262 __visit_name__ = "TINYINT" 

1263 

1264 

1265# MSSQL DATE/TIME types have varied behavior, sometimes returning 

1266# strings. MSDate/TIME check for everything, and always 

1267# filter bind parameters into datetime objects (required by pyodbc, 

1268# not sure about other dialects). 

1269 

1270 

1271class _MSDate(sqltypes.Date): 

1272 def bind_processor(self, dialect): 

1273 def process(value): 

1274 if type(value) == datetime.date: 

1275 return datetime.datetime(value.year, value.month, value.day) 

1276 else: 

1277 return value 

1278 

1279 return process 

1280 

1281 _reg = re.compile(r"(\d+)-(\d+)-(\d+)") 

1282 

1283 def result_processor(self, dialect, coltype): 

1284 def process(value): 

1285 if isinstance(value, datetime.datetime): 

1286 return value.date() 

1287 elif isinstance(value, str): 

1288 m = self._reg.match(value) 

1289 if not m: 

1290 raise ValueError( 

1291 "could not parse %r as a date value" % (value,) 

1292 ) 

1293 return datetime.date(*[int(x or 0) for x in m.groups()]) 

1294 else: 

1295 return value 

1296 

1297 return process 

1298 

1299 

1300class TIME(sqltypes.TIME): 

1301 def __init__(self, precision=None, **kwargs): 

1302 self.precision = precision 

1303 super().__init__() 

1304 

1305 __zero_date = datetime.date(1900, 1, 1) 

1306 

1307 def bind_processor(self, dialect): 

1308 def process(value): 

1309 if isinstance(value, datetime.datetime): 

1310 value = datetime.datetime.combine( 

1311 self.__zero_date, value.time() 

1312 ) 

1313 elif isinstance(value, datetime.time): 

1314 """issue #5339 

1315 per: https://github.com/mkleehammer/pyodbc/wiki/Tips-and-Tricks-by-Database-Platform#time-columns 

1316 pass TIME value as string 

1317 """ # noqa 

1318 value = str(value) 

1319 return value 

1320 

1321 return process 

1322 

1323 _reg = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d{0,6}))?") 

1324 

1325 def result_processor(self, dialect, coltype): 

1326 def process(value): 

1327 if isinstance(value, datetime.datetime): 

1328 return value.time() 

1329 elif isinstance(value, str): 

1330 m = self._reg.match(value) 

1331 if not m: 

1332 raise ValueError( 

1333 "could not parse %r as a time value" % (value,) 

1334 ) 

1335 return datetime.time(*[int(x or 0) for x in m.groups()]) 

1336 else: 

1337 return value 

1338 

1339 return process 

1340 

1341 

1342_MSTime = TIME 

1343 

1344 

1345class _BASETIMEIMPL(TIME): 

1346 __visit_name__ = "_BASETIMEIMPL" 

1347 

1348 

1349class _DateTimeBase: 

1350 def bind_processor(self, dialect): 

1351 def process(value): 

1352 if type(value) == datetime.date: 

1353 return datetime.datetime(value.year, value.month, value.day) 

1354 else: 

1355 return value 

1356 

1357 return process 

1358 

1359 

1360class _MSDateTime(_DateTimeBase, sqltypes.DateTime): 

1361 pass 

1362 

1363 

1364class SMALLDATETIME(_DateTimeBase, sqltypes.DateTime): 

1365 __visit_name__ = "SMALLDATETIME" 

1366 

1367 

1368class DATETIME2(_DateTimeBase, sqltypes.DateTime): 

1369 __visit_name__ = "DATETIME2" 

1370 

1371 def __init__(self, precision=None, **kw): 

1372 super().__init__(**kw) 

1373 self.precision = precision 

1374 

1375 

1376class DATETIMEOFFSET(_DateTimeBase, sqltypes.DateTime): 

1377 __visit_name__ = "DATETIMEOFFSET" 

1378 

1379 def __init__(self, precision=None, **kw): 

1380 super().__init__(**kw) 

1381 self.precision = precision 

1382 

1383 

1384class _UnicodeLiteral: 

1385 def literal_processor(self, dialect): 

1386 def process(value): 

1387 value = value.replace("'", "''") 

1388 

1389 if dialect.identifier_preparer._double_percents: 

1390 value = value.replace("%", "%%") 

1391 

1392 return "N'%s'" % value 

1393 

1394 return process 

1395 

1396 

1397class _MSUnicode(_UnicodeLiteral, sqltypes.Unicode): 

1398 pass 

1399 

1400 

1401class _MSUnicodeText(_UnicodeLiteral, sqltypes.UnicodeText): 

1402 pass 

1403 

1404 

1405class TIMESTAMP(sqltypes._Binary): 

1406 """Implement the SQL Server TIMESTAMP type. 

1407 

1408 Note this is **completely different** than the SQL Standard 

1409 TIMESTAMP type, which is not supported by SQL Server. It 

1410 is a read-only datatype that does not support INSERT of values. 

1411 

1412 .. versionadded:: 1.2 

1413 

1414 .. seealso:: 

1415 

1416 :class:`_mssql.ROWVERSION` 

1417 

1418 """ 

1419 

1420 __visit_name__ = "TIMESTAMP" 

1421 

1422 # expected by _Binary to be present 

1423 length = None 

1424 

1425 def __init__(self, convert_int=False): 

1426 """Construct a TIMESTAMP or ROWVERSION type. 

1427 

1428 :param convert_int: if True, binary integer values will 

1429 be converted to integers on read. 

1430 

1431 .. versionadded:: 1.2 

1432 

1433 """ 

1434 self.convert_int = convert_int 

1435 

1436 def result_processor(self, dialect, coltype): 

1437 super_ = super().result_processor(dialect, coltype) 

1438 if self.convert_int: 

1439 

1440 def process(value): 

1441 if super_: 

1442 value = super_(value) 

1443 if value is not None: 

1444 # https://stackoverflow.com/a/30403242/34549 

1445 value = int(codecs.encode(value, "hex"), 16) 

1446 return value 

1447 

1448 return process 

1449 else: 

1450 return super_ 

1451 

1452 

1453class ROWVERSION(TIMESTAMP): 

1454 """Implement the SQL Server ROWVERSION type. 

1455 

1456 The ROWVERSION datatype is a SQL Server synonym for the TIMESTAMP 

1457 datatype, however current SQL Server documentation suggests using 

1458 ROWVERSION for new datatypes going forward. 

1459 

1460 The ROWVERSION datatype does **not** reflect (e.g. introspect) from the 

1461 database as itself; the returned datatype will be 

1462 :class:`_mssql.TIMESTAMP`. 

1463 

1464 This is a read-only datatype that does not support INSERT of values. 

1465 

1466 .. versionadded:: 1.2 

1467 

1468 .. seealso:: 

1469 

1470 :class:`_mssql.TIMESTAMP` 

1471 

1472 """ 

1473 

1474 __visit_name__ = "ROWVERSION" 

1475 

1476 

1477class NTEXT(sqltypes.UnicodeText): 

1478 """MSSQL NTEXT type, for variable-length unicode text up to 2^30 

1479 characters.""" 

1480 

1481 __visit_name__ = "NTEXT" 

1482 

1483 

1484class VARBINARY(sqltypes.VARBINARY, sqltypes.LargeBinary): 

1485 """The MSSQL VARBINARY type. 

1486 

1487 This type adds additional features to the core :class:`_types.VARBINARY` 

1488 type, including "deprecate_large_types" mode where 

1489 either ``VARBINARY(max)`` or IMAGE is rendered, as well as the SQL 

1490 Server ``FILESTREAM`` option. 

1491 

1492 .. seealso:: 

1493 

1494 :ref:`mssql_large_type_deprecation` 

1495 

1496 """ 

1497 

1498 __visit_name__ = "VARBINARY" 

1499 

1500 def __init__(self, length=None, filestream=False): 

1501 """ 

1502 Construct a VARBINARY type. 

1503 

1504 :param length: optional, a length for the column for use in 

1505 DDL statements, for those binary types that accept a length, 

1506 such as the MySQL BLOB type. 

1507 

1508 :param filestream=False: if True, renders the ``FILESTREAM`` keyword 

1509 in the table definition. In this case ``length`` must be ``None`` 

1510 or ``'max'``. 

1511 

1512 .. versionadded:: 1.4.31 

1513 

1514 """ 

1515 

1516 self.filestream = filestream 

1517 if self.filestream and length not in (None, "max"): 

1518 raise ValueError( 

1519 "length must be None or 'max' when setting filestream" 

1520 ) 

1521 super().__init__(length=length) 

1522 

1523 

1524class IMAGE(sqltypes.LargeBinary): 

1525 __visit_name__ = "IMAGE" 

1526 

1527 

1528class XML(sqltypes.Text): 

1529 """MSSQL XML type. 

1530 

1531 This is a placeholder type for reflection purposes that does not include 

1532 any Python-side datatype support. It also does not currently support 

1533 additional arguments, such as "CONTENT", "DOCUMENT", 

1534 "xml_schema_collection". 

1535 

1536 """ 

1537 

1538 __visit_name__ = "XML" 

1539 

1540 

1541class BIT(sqltypes.Boolean): 

1542 """MSSQL BIT type. 

1543 

1544 Both pyodbc and pymssql return values from BIT columns as 

1545 Python <class 'bool'> so just subclass Boolean. 

1546 

1547 """ 

1548 

1549 __visit_name__ = "BIT" 

1550 

1551 

1552class MONEY(sqltypes.TypeEngine): 

1553 __visit_name__ = "MONEY" 

1554 

1555 

1556class SMALLMONEY(sqltypes.TypeEngine): 

1557 __visit_name__ = "SMALLMONEY" 

1558 

1559 

1560class MSUUid(sqltypes.Uuid): 

1561 def bind_processor(self, dialect): 

1562 if self.native_uuid: 

1563 # this is currently assuming pyodbc; might not work for 

1564 # some other mssql driver 

1565 return None 

1566 else: 

1567 if self.as_uuid: 

1568 

1569 def process(value): 

1570 if value is not None: 

1571 value = value.hex 

1572 return value 

1573 

1574 return process 

1575 else: 

1576 

1577 def process(value): 

1578 if value is not None: 

1579 value = value.replace("-", "").replace("''", "'") 

1580 return value 

1581 

1582 return process 

1583 

1584 def literal_processor(self, dialect): 

1585 if self.native_uuid: 

1586 

1587 def process(value): 

1588 return f"""'{str(value).replace("''", "'")}'""" 

1589 

1590 return process 

1591 else: 

1592 if self.as_uuid: 

1593 

1594 def process(value): 

1595 return f"""'{value.hex}'""" 

1596 

1597 return process 

1598 else: 

1599 

1600 def process(value): 

1601 return f"""'{ 

1602 value.replace("-", "").replace("'", "''") 

1603 }'""" 

1604 

1605 return process 

1606 

1607 

1608class UNIQUEIDENTIFIER(sqltypes.Uuid[sqltypes._UUID_RETURN]): 

1609 __visit_name__ = "UNIQUEIDENTIFIER" 

1610 

1611 @overload 

1612 def __init__( 

1613 self: UNIQUEIDENTIFIER[_python_UUID], as_uuid: Literal[True] = ... 

1614 ): ... 

1615 

1616 @overload 

1617 def __init__( 

1618 self: UNIQUEIDENTIFIER[str], as_uuid: Literal[False] = ... 

1619 ): ... 

1620 

1621 def __init__(self, as_uuid: bool = True): 

1622 """Construct a :class:`_mssql.UNIQUEIDENTIFIER` type. 

1623 

1624 

1625 :param as_uuid=True: if True, values will be interpreted 

1626 as Python uuid objects, converting to/from string via the 

1627 DBAPI. 

1628 

1629 .. versionchanged: 2.0 Added direct "uuid" support to the 

1630 :class:`_mssql.UNIQUEIDENTIFIER` datatype; uuid interpretation 

1631 defaults to ``True``. 

1632 

1633 """ 

1634 self.as_uuid = as_uuid 

1635 self.native_uuid = True 

1636 

1637 

1638class SQL_VARIANT(sqltypes.TypeEngine): 

1639 __visit_name__ = "SQL_VARIANT" 

1640 

1641 

1642# old names. 

1643MSDateTime = _MSDateTime 

1644MSDate = _MSDate 

1645MSReal = REAL 

1646MSTinyInteger = TINYINT 

1647MSTime = TIME 

1648MSSmallDateTime = SMALLDATETIME 

1649MSDateTime2 = DATETIME2 

1650MSDateTimeOffset = DATETIMEOFFSET 

1651MSText = TEXT 

1652MSNText = NTEXT 

1653MSString = VARCHAR 

1654MSNVarchar = NVARCHAR 

1655MSChar = CHAR 

1656MSNChar = NCHAR 

1657MSBinary = BINARY 

1658MSVarBinary = VARBINARY 

1659MSImage = IMAGE 

1660MSBit = BIT 

1661MSMoney = MONEY 

1662MSSmallMoney = SMALLMONEY 

1663MSUniqueIdentifier = UNIQUEIDENTIFIER 

1664MSVariant = SQL_VARIANT 

1665 

1666ischema_names = { 

1667 "int": INTEGER, 

1668 "bigint": BIGINT, 

1669 "smallint": SMALLINT, 

1670 "tinyint": TINYINT, 

1671 "varchar": VARCHAR, 

1672 "nvarchar": NVARCHAR, 

1673 "char": CHAR, 

1674 "nchar": NCHAR, 

1675 "text": TEXT, 

1676 "ntext": NTEXT, 

1677 "decimal": DECIMAL, 

1678 "numeric": NUMERIC, 

1679 "float": FLOAT, 

1680 "datetime": DATETIME, 

1681 "datetime2": DATETIME2, 

1682 "datetimeoffset": DATETIMEOFFSET, 

1683 "date": DATE, 

1684 "time": TIME, 

1685 "smalldatetime": SMALLDATETIME, 

1686 "binary": BINARY, 

1687 "varbinary": VARBINARY, 

1688 "bit": BIT, 

1689 "real": REAL, 

1690 "double precision": DOUBLE_PRECISION, 

1691 "image": IMAGE, 

1692 "xml": XML, 

1693 "timestamp": TIMESTAMP, 

1694 "money": MONEY, 

1695 "smallmoney": SMALLMONEY, 

1696 "uniqueidentifier": UNIQUEIDENTIFIER, 

1697 "sql_variant": SQL_VARIANT, 

1698} 

1699 

1700 

1701class MSTypeCompiler(compiler.GenericTypeCompiler): 

1702 def _extend(self, spec, type_, length=None): 

1703 """Extend a string-type declaration with standard SQL 

1704 COLLATE annotations. 

1705 

1706 """ 

1707 

1708 if getattr(type_, "collation", None): 

1709 collation = "COLLATE %s" % type_.collation 

1710 else: 

1711 collation = None 

1712 

1713 if not length: 

1714 length = type_.length 

1715 

1716 if length: 

1717 spec = spec + "(%s)" % length 

1718 

1719 return " ".join([c for c in (spec, collation) if c is not None]) 

1720 

1721 def visit_double(self, type_, **kw): 

1722 return self.visit_DOUBLE_PRECISION(type_, **kw) 

1723 

1724 def visit_FLOAT(self, type_, **kw): 

1725 precision = getattr(type_, "precision", None) 

1726 if precision is None: 

1727 return "FLOAT" 

1728 else: 

1729 return "FLOAT(%(precision)s)" % {"precision": precision} 

1730 

1731 def visit_TINYINT(self, type_, **kw): 

1732 return "TINYINT" 

1733 

1734 def visit_TIME(self, type_, **kw): 

1735 precision = getattr(type_, "precision", None) 

1736 if precision is not None: 

1737 return "TIME(%s)" % precision 

1738 else: 

1739 return "TIME" 

1740 

1741 def visit_TIMESTAMP(self, type_, **kw): 

1742 return "TIMESTAMP" 

1743 

1744 def visit_ROWVERSION(self, type_, **kw): 

1745 return "ROWVERSION" 

1746 

1747 def visit_datetime(self, type_, **kw): 

1748 if type_.timezone: 

1749 return self.visit_DATETIMEOFFSET(type_, **kw) 

1750 else: 

1751 return self.visit_DATETIME(type_, **kw) 

1752 

1753 def visit_DATETIMEOFFSET(self, type_, **kw): 

1754 precision = getattr(type_, "precision", None) 

1755 if precision is not None: 

1756 return "DATETIMEOFFSET(%s)" % type_.precision 

1757 else: 

1758 return "DATETIMEOFFSET" 

1759 

1760 def visit_DATETIME2(self, type_, **kw): 

1761 precision = getattr(type_, "precision", None) 

1762 if precision is not None: 

1763 return "DATETIME2(%s)" % precision 

1764 else: 

1765 return "DATETIME2" 

1766 

1767 def visit_SMALLDATETIME(self, type_, **kw): 

1768 return "SMALLDATETIME" 

1769 

1770 def visit_unicode(self, type_, **kw): 

1771 return self.visit_NVARCHAR(type_, **kw) 

1772 

1773 def visit_text(self, type_, **kw): 

1774 if self.dialect.deprecate_large_types: 

1775 return self.visit_VARCHAR(type_, **kw) 

1776 else: 

1777 return self.visit_TEXT(type_, **kw) 

1778 

1779 def visit_unicode_text(self, type_, **kw): 

1780 if self.dialect.deprecate_large_types: 

1781 return self.visit_NVARCHAR(type_, **kw) 

1782 else: 

1783 return self.visit_NTEXT(type_, **kw) 

1784 

1785 def visit_NTEXT(self, type_, **kw): 

1786 return self._extend("NTEXT", type_) 

1787 

1788 def visit_TEXT(self, type_, **kw): 

1789 return self._extend("TEXT", type_) 

1790 

1791 def visit_VARCHAR(self, type_, **kw): 

1792 return self._extend("VARCHAR", type_, length=type_.length or "max") 

1793 

1794 def visit_CHAR(self, type_, **kw): 

1795 return self._extend("CHAR", type_) 

1796 

1797 def visit_NCHAR(self, type_, **kw): 

1798 return self._extend("NCHAR", type_) 

1799 

1800 def visit_NVARCHAR(self, type_, **kw): 

1801 return self._extend("NVARCHAR", type_, length=type_.length or "max") 

1802 

1803 def visit_date(self, type_, **kw): 

1804 if self.dialect.server_version_info < MS_2008_VERSION: 

1805 return self.visit_DATETIME(type_, **kw) 

1806 else: 

1807 return self.visit_DATE(type_, **kw) 

1808 

1809 def visit__BASETIMEIMPL(self, type_, **kw): 

1810 return self.visit_time(type_, **kw) 

1811 

1812 def visit_time(self, type_, **kw): 

1813 if self.dialect.server_version_info < MS_2008_VERSION: 

1814 return self.visit_DATETIME(type_, **kw) 

1815 else: 

1816 return self.visit_TIME(type_, **kw) 

1817 

1818 def visit_large_binary(self, type_, **kw): 

1819 if self.dialect.deprecate_large_types: 

1820 return self.visit_VARBINARY(type_, **kw) 

1821 else: 

1822 return self.visit_IMAGE(type_, **kw) 

1823 

1824 def visit_IMAGE(self, type_, **kw): 

1825 return "IMAGE" 

1826 

1827 def visit_XML(self, type_, **kw): 

1828 return "XML" 

1829 

1830 def visit_VARBINARY(self, type_, **kw): 

1831 text = self._extend("VARBINARY", type_, length=type_.length or "max") 

1832 if getattr(type_, "filestream", False): 

1833 text += " FILESTREAM" 

1834 return text 

1835 

1836 def visit_boolean(self, type_, **kw): 

1837 return self.visit_BIT(type_) 

1838 

1839 def visit_BIT(self, type_, **kw): 

1840 return "BIT" 

1841 

1842 def visit_JSON(self, type_, **kw): 

1843 # this is a bit of a break with SQLAlchemy's convention of 

1844 # "UPPERCASE name goes to UPPERCASE type name with no modification" 

1845 return self._extend("NVARCHAR", type_, length="max") 

1846 

1847 def visit_MONEY(self, type_, **kw): 

1848 return "MONEY" 

1849 

1850 def visit_SMALLMONEY(self, type_, **kw): 

1851 return "SMALLMONEY" 

1852 

1853 def visit_uuid(self, type_, **kw): 

1854 if type_.native_uuid: 

1855 return self.visit_UNIQUEIDENTIFIER(type_, **kw) 

1856 else: 

1857 return super().visit_uuid(type_, **kw) 

1858 

1859 def visit_UNIQUEIDENTIFIER(self, type_, **kw): 

1860 return "UNIQUEIDENTIFIER" 

1861 

1862 def visit_SQL_VARIANT(self, type_, **kw): 

1863 return "SQL_VARIANT" 

1864 

1865 

1866class MSExecutionContext(default.DefaultExecutionContext): 

1867 _enable_identity_insert = False 

1868 _select_lastrowid = False 

1869 _lastrowid = None 

1870 

1871 dialect: MSDialect 

1872 

1873 def _opt_encode(self, statement): 

1874 if self.compiled and self.compiled.schema_translate_map: 

1875 rst = self.compiled.preparer._render_schema_translates 

1876 statement = rst(statement, self.compiled.schema_translate_map) 

1877 

1878 return statement 

1879 

1880 def pre_exec(self): 

1881 """Activate IDENTITY_INSERT if needed.""" 

1882 

1883 if self.isinsert: 

1884 if TYPE_CHECKING: 

1885 assert is_sql_compiler(self.compiled) 

1886 assert isinstance(self.compiled.compile_state, DMLState) 

1887 assert isinstance( 

1888 self.compiled.compile_state.dml_table, TableClause 

1889 ) 

1890 

1891 tbl = self.compiled.compile_state.dml_table 

1892 id_column = tbl._autoincrement_column 

1893 

1894 if id_column is not None and ( 

1895 not isinstance(id_column.default, Sequence) 

1896 ): 

1897 insert_has_identity = True 

1898 compile_state = self.compiled.dml_compile_state 

1899 self._enable_identity_insert = ( 

1900 id_column.key in self.compiled_parameters[0] 

1901 ) or ( 

1902 compile_state._dict_parameters 

1903 and (id_column.key in compile_state._insert_col_keys) 

1904 ) 

1905 

1906 else: 

1907 insert_has_identity = False 

1908 self._enable_identity_insert = False 

1909 

1910 self._select_lastrowid = ( 

1911 not self.compiled.inline 

1912 and insert_has_identity 

1913 and not self.compiled.effective_returning 

1914 and not self._enable_identity_insert 

1915 and not self.executemany 

1916 ) 

1917 

1918 if self._enable_identity_insert: 

1919 self.root_connection._cursor_execute( 

1920 self.cursor, 

1921 self._opt_encode( 

1922 "SET IDENTITY_INSERT %s ON" 

1923 % self.identifier_preparer.format_table(tbl) 

1924 ), 

1925 (), 

1926 self, 

1927 ) 

1928 

1929 def post_exec(self): 

1930 """Disable IDENTITY_INSERT if enabled.""" 

1931 

1932 conn = self.root_connection 

1933 

1934 if self.isinsert or self.isupdate or self.isdelete: 

1935 self._rowcount = self.cursor.rowcount 

1936 

1937 if self._select_lastrowid: 

1938 if self.dialect.use_scope_identity: 

1939 conn._cursor_execute( 

1940 self.cursor, 

1941 "SELECT scope_identity() AS lastrowid", 

1942 (), 

1943 self, 

1944 ) 

1945 else: 

1946 conn._cursor_execute( 

1947 self.cursor, "SELECT @@identity AS lastrowid", (), self 

1948 ) 

1949 # fetchall() ensures the cursor is consumed without closing it 

1950 row = self.cursor.fetchall()[0] 

1951 self._lastrowid = int(row[0]) 

1952 

1953 self.cursor_fetch_strategy = _cursor._NO_CURSOR_DML 

1954 elif ( 

1955 self.compiled is not None 

1956 and is_sql_compiler(self.compiled) 

1957 and self.compiled.effective_returning 

1958 ): 

1959 self.cursor_fetch_strategy = ( 

1960 _cursor.FullyBufferedCursorFetchStrategy( 

1961 self.cursor, 

1962 self.cursor.description, 

1963 self.cursor.fetchall(), 

1964 ) 

1965 ) 

1966 

1967 if self._enable_identity_insert: 

1968 if TYPE_CHECKING: 

1969 assert is_sql_compiler(self.compiled) 

1970 assert isinstance(self.compiled.compile_state, DMLState) 

1971 assert isinstance( 

1972 self.compiled.compile_state.dml_table, TableClause 

1973 ) 

1974 conn._cursor_execute( 

1975 self.cursor, 

1976 self._opt_encode( 

1977 "SET IDENTITY_INSERT %s OFF" 

1978 % self.identifier_preparer.format_table( 

1979 self.compiled.compile_state.dml_table 

1980 ) 

1981 ), 

1982 (), 

1983 self, 

1984 ) 

1985 

1986 def get_lastrowid(self): 

1987 return self._lastrowid 

1988 

1989 def handle_dbapi_exception(self, e): 

1990 if self._enable_identity_insert: 

1991 try: 

1992 self.cursor.execute( 

1993 self._opt_encode( 

1994 "SET IDENTITY_INSERT %s OFF" 

1995 % self.identifier_preparer.format_table( 

1996 self.compiled.compile_state.dml_table 

1997 ) 

1998 ) 

1999 ) 

2000 except Exception: 

2001 pass 

2002 

2003 def fire_sequence(self, seq, type_): 

2004 return self._execute_scalar( 

2005 ( 

2006 "SELECT NEXT VALUE FOR %s" 

2007 % self.identifier_preparer.format_sequence(seq) 

2008 ), 

2009 type_, 

2010 ) 

2011 

2012 def get_insert_default(self, column): 

2013 if ( 

2014 isinstance(column, sa_schema.Column) 

2015 and column is column.table._autoincrement_column 

2016 and isinstance(column.default, sa_schema.Sequence) 

2017 and column.default.optional 

2018 ): 

2019 return None 

2020 return super().get_insert_default(column) 

2021 

2022 

2023class MSSQLCompiler(compiler.SQLCompiler): 

2024 returning_precedes_values = True 

2025 

2026 extract_map = util.update_copy( 

2027 compiler.SQLCompiler.extract_map, 

2028 { 

2029 "doy": "dayofyear", 

2030 "dow": "weekday", 

2031 "milliseconds": "millisecond", 

2032 "microseconds": "microsecond", 

2033 }, 

2034 ) 

2035 

2036 def __init__(self, *args, **kwargs): 

2037 self.tablealiases = {} 

2038 super().__init__(*args, **kwargs) 

2039 

2040 def _format_frame_clause(self, range_, **kw): 

2041 kw["literal_execute"] = True 

2042 return super()._format_frame_clause(range_, **kw) 

2043 

2044 def _with_legacy_schema_aliasing(fn): 

2045 def decorate(self, *arg, **kw): 

2046 if self.dialect.legacy_schema_aliasing: 

2047 return fn(self, *arg, **kw) 

2048 else: 

2049 super_ = getattr(super(MSSQLCompiler, self), fn.__name__) 

2050 return super_(*arg, **kw) 

2051 

2052 return decorate 

2053 

2054 def visit_now_func(self, fn, **kw): 

2055 return "CURRENT_TIMESTAMP" 

2056 

2057 def visit_current_date_func(self, fn, **kw): 

2058 return "GETDATE()" 

2059 

2060 def visit_length_func(self, fn, **kw): 

2061 return "LEN%s" % self.function_argspec(fn, **kw) 

2062 

2063 def visit_char_length_func(self, fn, **kw): 

2064 return "LEN%s" % self.function_argspec(fn, **kw) 

2065 

2066 def visit_aggregate_strings_func(self, fn, **kw): 

2067 expr = fn.clauses.clauses[0]._compiler_dispatch(self, **kw) 

2068 kw["literal_execute"] = True 

2069 delimiter = fn.clauses.clauses[1]._compiler_dispatch(self, **kw) 

2070 return f"string_agg({expr}, {delimiter})" 

2071 

2072 def visit_concat_op_expression_clauselist( 

2073 self, clauselist, operator, **kw 

2074 ): 

2075 return " + ".join(self.process(elem, **kw) for elem in clauselist) 

2076 

2077 def visit_concat_op_binary(self, binary, operator, **kw): 

2078 return "%s + %s" % ( 

2079 self.process(binary.left, **kw), 

2080 self.process(binary.right, **kw), 

2081 ) 

2082 

2083 def visit_true(self, expr, **kw): 

2084 return "1" 

2085 

2086 def visit_false(self, expr, **kw): 

2087 return "0" 

2088 

2089 def visit_match_op_binary(self, binary, operator, **kw): 

2090 return "CONTAINS (%s, %s)" % ( 

2091 self.process(binary.left, **kw), 

2092 self.process(binary.right, **kw), 

2093 ) 

2094 

2095 def get_select_precolumns(self, select, **kw): 

2096 """MS-SQL puts TOP, it's version of LIMIT here""" 

2097 

2098 s = super().get_select_precolumns(select, **kw) 

2099 

2100 if select._has_row_limiting_clause and self._use_top(select): 

2101 # ODBC drivers and possibly others 

2102 # don't support bind params in the SELECT clause on SQL Server. 

2103 # so have to use literal here. 

2104 kw["literal_execute"] = True 

2105 s += "TOP %s " % self.process( 

2106 self._get_limit_or_fetch(select), **kw 

2107 ) 

2108 if select._fetch_clause is not None: 

2109 if select._fetch_clause_options["percent"]: 

2110 s += "PERCENT " 

2111 if select._fetch_clause_options["with_ties"]: 

2112 s += "WITH TIES " 

2113 

2114 return s 

2115 

2116 def get_from_hint_text(self, table, text): 

2117 return text 

2118 

2119 def get_crud_hint_text(self, table, text): 

2120 return text 

2121 

2122 def _get_limit_or_fetch(self, select): 

2123 if select._fetch_clause is None: 

2124 return select._limit_clause 

2125 else: 

2126 return select._fetch_clause 

2127 

2128 def _use_top(self, select): 

2129 return (select._offset_clause is None) and ( 

2130 select._simple_int_clause(select._limit_clause) 

2131 or ( 

2132 # limit can use TOP with is by itself. fetch only uses TOP 

2133 # when it needs to because of PERCENT and/or WITH TIES 

2134 # TODO: Why? shouldn't we use TOP always ? 

2135 select._simple_int_clause(select._fetch_clause) 

2136 and ( 

2137 select._fetch_clause_options["percent"] 

2138 or select._fetch_clause_options["with_ties"] 

2139 ) 

2140 ) 

2141 ) 

2142 

2143 def limit_clause(self, cs, **kwargs): 

2144 return "" 

2145 

2146 def _check_can_use_fetch_limit(self, select): 

2147 # to use ROW_NUMBER(), an ORDER BY is required. 

2148 # OFFSET are FETCH are options of the ORDER BY clause 

2149 if not select._order_by_clause.clauses: 

2150 raise exc.CompileError( 

2151 "MSSQL requires an order_by when " 

2152 "using an OFFSET or a non-simple " 

2153 "LIMIT clause" 

2154 ) 

2155 

2156 if select._fetch_clause_options is not None and ( 

2157 select._fetch_clause_options["percent"] 

2158 or select._fetch_clause_options["with_ties"] 

2159 ): 

2160 raise exc.CompileError( 

2161 "MSSQL needs TOP to use PERCENT and/or WITH TIES. " 

2162 "Only simple fetch without offset can be used." 

2163 ) 

2164 

2165 def _row_limit_clause(self, select, **kw): 

2166 """MSSQL 2012 supports OFFSET/FETCH operators 

2167 Use it instead subquery with row_number 

2168 

2169 """ 

2170 

2171 if self.dialect._supports_offset_fetch and not self._use_top(select): 

2172 self._check_can_use_fetch_limit(select) 

2173 

2174 return self.fetch_clause( 

2175 select, 

2176 fetch_clause=self._get_limit_or_fetch(select), 

2177 require_offset=True, 

2178 **kw, 

2179 ) 

2180 

2181 else: 

2182 return "" 

2183 

2184 def visit_try_cast(self, element, **kw): 

2185 return "TRY_CAST (%s AS %s)" % ( 

2186 self.process(element.clause, **kw), 

2187 self.process(element.typeclause, **kw), 

2188 ) 

2189 

2190 def translate_select_structure(self, select_stmt, **kwargs): 

2191 """Look for ``LIMIT`` and OFFSET in a select statement, and if 

2192 so tries to wrap it in a subquery with ``row_number()`` criterion. 

2193 MSSQL 2012 and above are excluded 

2194 

2195 """ 

2196 select = select_stmt 

2197 

2198 if ( 

2199 select._has_row_limiting_clause 

2200 and not self.dialect._supports_offset_fetch 

2201 and not self._use_top(select) 

2202 and not getattr(select, "_mssql_visit", None) 

2203 ): 

2204 self._check_can_use_fetch_limit(select) 

2205 

2206 _order_by_clauses = [ 

2207 sql_util.unwrap_label_reference(elem) 

2208 for elem in select._order_by_clause.clauses 

2209 ] 

2210 

2211 limit_clause = self._get_limit_or_fetch(select) 

2212 offset_clause = select._offset_clause 

2213 

2214 select = select._generate() 

2215 select._mssql_visit = True 

2216 select = ( 

2217 select.add_columns( 

2218 sql.func.ROW_NUMBER() 

2219 .over(order_by=_order_by_clauses) 

2220 .label("mssql_rn") 

2221 ) 

2222 .order_by(None) 

2223 .alias() 

2224 ) 

2225 

2226 mssql_rn = sql.column("mssql_rn") 

2227 limitselect = sql.select( 

2228 *[c for c in select.c if c.key != "mssql_rn"] 

2229 ) 

2230 if offset_clause is not None: 

2231 limitselect = limitselect.where(mssql_rn > offset_clause) 

2232 if limit_clause is not None: 

2233 limitselect = limitselect.where( 

2234 mssql_rn <= (limit_clause + offset_clause) 

2235 ) 

2236 else: 

2237 limitselect = limitselect.where(mssql_rn <= (limit_clause)) 

2238 return limitselect 

2239 else: 

2240 return select 

2241 

2242 @_with_legacy_schema_aliasing 

2243 def visit_table(self, table, mssql_aliased=False, iscrud=False, **kwargs): 

2244 if mssql_aliased is table or iscrud: 

2245 return super().visit_table(table, **kwargs) 

2246 

2247 # alias schema-qualified tables 

2248 alias = self._schema_aliased_table(table) 

2249 if alias is not None: 

2250 return self.process(alias, mssql_aliased=table, **kwargs) 

2251 else: 

2252 return super().visit_table(table, **kwargs) 

2253 

2254 @_with_legacy_schema_aliasing 

2255 def visit_alias(self, alias, **kw): 

2256 # translate for schema-qualified table aliases 

2257 kw["mssql_aliased"] = alias.element 

2258 return super().visit_alias(alias, **kw) 

2259 

2260 @_with_legacy_schema_aliasing 

2261 def visit_column(self, column, add_to_result_map=None, **kw): 

2262 if ( 

2263 column.table is not None 

2264 and (not self.isupdate and not self.isdelete) 

2265 or self.is_subquery() 

2266 ): 

2267 # translate for schema-qualified table aliases 

2268 t = self._schema_aliased_table(column.table) 

2269 if t is not None: 

2270 converted = elements._corresponding_column_or_error(t, column) 

2271 if add_to_result_map is not None: 

2272 add_to_result_map( 

2273 column.name, 

2274 column.name, 

2275 (column, column.name, column.key), 

2276 column.type, 

2277 ) 

2278 

2279 return super().visit_column(converted, **kw) 

2280 

2281 return super().visit_column( 

2282 column, add_to_result_map=add_to_result_map, **kw 

2283 ) 

2284 

2285 def _schema_aliased_table(self, table): 

2286 if getattr(table, "schema", None) is not None: 

2287 if table not in self.tablealiases: 

2288 self.tablealiases[table] = table.alias() 

2289 return self.tablealiases[table] 

2290 else: 

2291 return None 

2292 

2293 def visit_extract(self, extract, **kw): 

2294 field = self.extract_map.get(extract.field, extract.field) 

2295 return "DATEPART(%s, %s)" % (field, self.process(extract.expr, **kw)) 

2296 

2297 def visit_savepoint(self, savepoint_stmt, **kw): 

2298 return "SAVE TRANSACTION %s" % self.preparer.format_savepoint( 

2299 savepoint_stmt 

2300 ) 

2301 

2302 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw): 

2303 return "ROLLBACK TRANSACTION %s" % self.preparer.format_savepoint( 

2304 savepoint_stmt 

2305 ) 

2306 

2307 def visit_binary(self, binary, **kwargs): 

2308 """Move bind parameters to the right-hand side of an operator, where 

2309 possible. 

2310 

2311 """ 

2312 if ( 

2313 isinstance(binary.left, expression.BindParameter) 

2314 and binary.operator == operator.eq 

2315 and not isinstance(binary.right, expression.BindParameter) 

2316 ): 

2317 return self.process( 

2318 expression.BinaryExpression( 

2319 binary.right, binary.left, binary.operator 

2320 ), 

2321 **kwargs, 

2322 ) 

2323 return super().visit_binary(binary, **kwargs) 

2324 

2325 def returning_clause( 

2326 self, stmt, returning_cols, *, populate_result_map, **kw 

2327 ): 

2328 # SQL server returning clause requires that the columns refer to 

2329 # the virtual table names "inserted" or "deleted". Here, we make 

2330 # a simple alias of our table with that name, and then adapt the 

2331 # columns we have from the list of RETURNING columns to that new name 

2332 # so that they render as "inserted.<colname>" / "deleted.<colname>". 

2333 

2334 if stmt.is_insert or stmt.is_update: 

2335 target = stmt.table.alias("inserted") 

2336 elif stmt.is_delete: 

2337 target = stmt.table.alias("deleted") 

2338 else: 

2339 assert False, "expected Insert, Update or Delete statement" 

2340 

2341 adapter = sql_util.ClauseAdapter(target) 

2342 

2343 # adapter.traverse() takes a column from our target table and returns 

2344 # the one that is linked to the "inserted" / "deleted" tables. So in 

2345 # order to retrieve these values back from the result (e.g. like 

2346 # row[column]), tell the compiler to also add the original unadapted 

2347 # column to the result map. Before #4877, these were (unknowingly) 

2348 # falling back using string name matching in the result set which 

2349 # necessarily used an expensive KeyError in order to match. 

2350 

2351 columns = [ 

2352 self._label_returning_column( 

2353 stmt, 

2354 adapter.traverse(column), 

2355 populate_result_map, 

2356 {"result_map_targets": (column,)}, 

2357 fallback_label_name=fallback_label_name, 

2358 column_is_repeated=repeated, 

2359 name=name, 

2360 proxy_name=proxy_name, 

2361 **kw, 

2362 ) 

2363 for ( 

2364 name, 

2365 proxy_name, 

2366 fallback_label_name, 

2367 column, 

2368 repeated, 

2369 ) in stmt._generate_columns_plus_names( 

2370 True, cols=expression._select_iterables(returning_cols) 

2371 ) 

2372 ] 

2373 

2374 return "OUTPUT " + ", ".join(columns) 

2375 

2376 def get_cte_preamble(self, recursive): 

2377 # SQL Server finds it too inconvenient to accept 

2378 # an entirely optional, SQL standard specified, 

2379 # "RECURSIVE" word with their "WITH", 

2380 # so here we go 

2381 return "WITH" 

2382 

2383 def label_select_column(self, select, column, asfrom): 

2384 if isinstance(column, expression.Function): 

2385 return column.label(None) 

2386 else: 

2387 return super().label_select_column(select, column, asfrom) 

2388 

2389 def for_update_clause(self, select, **kw): 

2390 # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which 

2391 # SQLAlchemy doesn't use 

2392 return "" 

2393 

2394 def order_by_clause(self, select, **kw): 

2395 # MSSQL only allows ORDER BY in subqueries if there is a LIMIT: 

2396 # "The ORDER BY clause is invalid in views, inline functions, 

2397 # derived tables, subqueries, and common table expressions, 

2398 # unless TOP, OFFSET or FOR XML is also specified." 

2399 if ( 

2400 self.is_subquery() 

2401 and not self._use_top(select) 

2402 and ( 

2403 select._offset is None 

2404 or not self.dialect._supports_offset_fetch 

2405 ) 

2406 ): 

2407 # avoid processing the order by clause if we won't end up 

2408 # using it, because we don't want all the bind params tacked 

2409 # onto the positional list if that is what the dbapi requires 

2410 return "" 

2411 

2412 order_by = self.process(select._order_by_clause, **kw) 

2413 

2414 if order_by: 

2415 return " ORDER BY " + order_by 

2416 else: 

2417 return "" 

2418 

2419 def update_from_clause( 

2420 self, update_stmt, from_table, extra_froms, from_hints, **kw 

2421 ): 

2422 """Render the UPDATE..FROM clause specific to MSSQL. 

2423 

2424 In MSSQL, if the UPDATE statement involves an alias of the table to 

2425 be updated, then the table itself must be added to the FROM list as 

2426 well. Otherwise, it is optional. Here, we add it regardless. 

2427 

2428 """ 

2429 return "FROM " + ", ".join( 

2430 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw) 

2431 for t in [from_table] + extra_froms 

2432 ) 

2433 

2434 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw): 

2435 """If we have extra froms make sure we render any alias as hint.""" 

2436 ashint = False 

2437 if extra_froms: 

2438 ashint = True 

2439 return from_table._compiler_dispatch( 

2440 self, asfrom=True, iscrud=True, ashint=ashint, **kw 

2441 ) 

2442 

2443 def delete_extra_from_clause( 

2444 self, delete_stmt, from_table, extra_froms, from_hints, **kw 

2445 ): 

2446 """Render the DELETE .. FROM clause specific to MSSQL. 

2447 

2448 Yes, it has the FROM keyword twice. 

2449 

2450 """ 

2451 return "FROM " + ", ".join( 

2452 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw) 

2453 for t in [from_table] + extra_froms 

2454 ) 

2455 

2456 def visit_empty_set_expr(self, type_, **kw): 

2457 return "SELECT 1 WHERE 1!=1" 

2458 

2459 def visit_is_distinct_from_binary(self, binary, operator, **kw): 

2460 return "NOT EXISTS (SELECT %s INTERSECT SELECT %s)" % ( 

2461 self.process(binary.left), 

2462 self.process(binary.right), 

2463 ) 

2464 

2465 def visit_is_not_distinct_from_binary(self, binary, operator, **kw): 

2466 return "EXISTS (SELECT %s INTERSECT SELECT %s)" % ( 

2467 self.process(binary.left), 

2468 self.process(binary.right), 

2469 ) 

2470 

2471 def _render_json_extract_from_binary(self, binary, operator, **kw): 

2472 # note we are intentionally calling upon the process() calls in the 

2473 # order in which they appear in the SQL String as this is used 

2474 # by positional parameter rendering 

2475 

2476 if binary.type._type_affinity is sqltypes.JSON: 

2477 return "JSON_QUERY(%s, %s)" % ( 

2478 self.process(binary.left, **kw), 

2479 self.process(binary.right, **kw), 

2480 ) 

2481 

2482 # as with other dialects, start with an explicit test for NULL 

2483 case_expression = "CASE JSON_VALUE(%s, %s) WHEN NULL THEN NULL" % ( 

2484 self.process(binary.left, **kw), 

2485 self.process(binary.right, **kw), 

2486 ) 

2487 

2488 if binary.type._type_affinity is sqltypes.Integer: 

2489 type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS INTEGER)" % ( 

2490 self.process(binary.left, **kw), 

2491 self.process(binary.right, **kw), 

2492 ) 

2493 elif binary.type._type_affinity is sqltypes.Numeric: 

2494 type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS %s)" % ( 

2495 self.process(binary.left, **kw), 

2496 self.process(binary.right, **kw), 

2497 ( 

2498 "FLOAT" 

2499 if isinstance(binary.type, sqltypes.Float) 

2500 else "NUMERIC(%s, %s)" 

2501 % (binary.type.precision, binary.type.scale) 

2502 ), 

2503 ) 

2504 elif binary.type._type_affinity is sqltypes.Boolean: 

2505 # the NULL handling is particularly weird with boolean, so 

2506 # explicitly return numeric (BIT) constants 

2507 type_expression = ( 

2508 "WHEN 'true' THEN 1 WHEN 'false' THEN 0 ELSE NULL" 

2509 ) 

2510 elif binary.type._type_affinity is sqltypes.String: 

2511 # TODO: does this comment (from mysql) apply to here, too? 

2512 # this fails with a JSON value that's a four byte unicode 

2513 # string. SQLite has the same problem at the moment 

2514 type_expression = "ELSE JSON_VALUE(%s, %s)" % ( 

2515 self.process(binary.left, **kw), 

2516 self.process(binary.right, **kw), 

2517 ) 

2518 else: 

2519 # other affinity....this is not expected right now 

2520 type_expression = "ELSE JSON_QUERY(%s, %s)" % ( 

2521 self.process(binary.left, **kw), 

2522 self.process(binary.right, **kw), 

2523 ) 

2524 

2525 return case_expression + " " + type_expression + " END" 

2526 

2527 def visit_json_getitem_op_binary(self, binary, operator, **kw): 

2528 return self._render_json_extract_from_binary(binary, operator, **kw) 

2529 

2530 def visit_json_path_getitem_op_binary(self, binary, operator, **kw): 

2531 return self._render_json_extract_from_binary(binary, operator, **kw) 

2532 

2533 def visit_sequence(self, seq, **kw): 

2534 return "NEXT VALUE FOR %s" % self.preparer.format_sequence(seq) 

2535 

2536 

2537class MSSQLStrictCompiler(MSSQLCompiler): 

2538 """A subclass of MSSQLCompiler which disables the usage of bind 

2539 parameters where not allowed natively by MS-SQL. 

2540 

2541 A dialect may use this compiler on a platform where native 

2542 binds are used. 

2543 

2544 """ 

2545 

2546 ansi_bind_rules = True 

2547 

2548 def visit_in_op_binary(self, binary, operator, **kw): 

2549 kw["literal_execute"] = True 

2550 return "%s IN %s" % ( 

2551 self.process(binary.left, **kw), 

2552 self.process(binary.right, **kw), 

2553 ) 

2554 

2555 def visit_not_in_op_binary(self, binary, operator, **kw): 

2556 kw["literal_execute"] = True 

2557 return "%s NOT IN %s" % ( 

2558 self.process(binary.left, **kw), 

2559 self.process(binary.right, **kw), 

2560 ) 

2561 

2562 def render_literal_value(self, value, type_): 

2563 """ 

2564 For date and datetime values, convert to a string 

2565 format acceptable to MSSQL. That seems to be the 

2566 so-called ODBC canonical date format which looks 

2567 like this: 

2568 

2569 yyyy-mm-dd hh:mi:ss.mmm(24h) 

2570 

2571 For other data types, call the base class implementation. 

2572 """ 

2573 # datetime and date are both subclasses of datetime.date 

2574 if issubclass(type(value), datetime.date): 

2575 # SQL Server wants single quotes around the date string. 

2576 return "'" + str(value) + "'" 

2577 else: 

2578 return super().render_literal_value(value, type_) 

2579 

2580 

2581class MSDDLCompiler(compiler.DDLCompiler): 

2582 def get_column_specification(self, column, **kwargs): 

2583 colspec = self.preparer.format_column(column) 

2584 

2585 # type is not accepted in a computed column 

2586 if column.computed is not None: 

2587 colspec += " " + self.process(column.computed) 

2588 else: 

2589 colspec += " " + self.dialect.type_compiler_instance.process( 

2590 column.type, type_expression=column 

2591 ) 

2592 

2593 if column.nullable is not None: 

2594 if ( 

2595 not column.nullable 

2596 or column.primary_key 

2597 or isinstance(column.default, sa_schema.Sequence) 

2598 or column.autoincrement is True 

2599 or column.identity 

2600 ): 

2601 colspec += " NOT NULL" 

2602 elif column.computed is None: 

2603 # don't specify "NULL" for computed columns 

2604 colspec += " NULL" 

2605 

2606 if column.table is None: 

2607 raise exc.CompileError( 

2608 "mssql requires Table-bound columns " 

2609 "in order to generate DDL" 

2610 ) 

2611 

2612 d_opt = column.dialect_options["mssql"] 

2613 start = d_opt["identity_start"] 

2614 increment = d_opt["identity_increment"] 

2615 if start is not None or increment is not None: 

2616 if column.identity: 

2617 raise exc.CompileError( 

2618 "Cannot specify options 'mssql_identity_start' and/or " 

2619 "'mssql_identity_increment' while also using the " 

2620 "'Identity' construct." 

2621 ) 

2622 util.warn_deprecated( 

2623 "The dialect options 'mssql_identity_start' and " 

2624 "'mssql_identity_increment' are deprecated. " 

2625 "Use the 'Identity' object instead.", 

2626 "1.4", 

2627 ) 

2628 

2629 if column.identity: 

2630 colspec += self.process(column.identity, **kwargs) 

2631 elif ( 

2632 column is column.table._autoincrement_column 

2633 or column.autoincrement is True 

2634 ) and ( 

2635 not isinstance(column.default, Sequence) or column.default.optional 

2636 ): 

2637 colspec += self.process(Identity(start=start, increment=increment)) 

2638 else: 

2639 default = self.get_column_default_string(column) 

2640 if default is not None: 

2641 colspec += " DEFAULT " + default 

2642 

2643 return colspec 

2644 

2645 def visit_create_index(self, create, include_schema=False, **kw): 

2646 index = create.element 

2647 self._verify_index_table(index) 

2648 preparer = self.preparer 

2649 text = "CREATE " 

2650 if index.unique: 

2651 text += "UNIQUE " 

2652 

2653 # handle clustering option 

2654 clustered = index.dialect_options["mssql"]["clustered"] 

2655 if clustered is not None: 

2656 if clustered: 

2657 text += "CLUSTERED " 

2658 else: 

2659 text += "NONCLUSTERED " 

2660 

2661 # handle columnstore option (has no negative value) 

2662 columnstore = index.dialect_options["mssql"]["columnstore"] 

2663 if columnstore: 

2664 text += "COLUMNSTORE " 

2665 

2666 text += "INDEX %s ON %s" % ( 

2667 self._prepared_index_name(index, include_schema=include_schema), 

2668 preparer.format_table(index.table), 

2669 ) 

2670 

2671 # in some case mssql allows indexes with no columns defined 

2672 if len(index.expressions) > 0: 

2673 text += " (%s)" % ", ".join( 

2674 self.sql_compiler.process( 

2675 expr, include_table=False, literal_binds=True 

2676 ) 

2677 for expr in index.expressions 

2678 ) 

2679 

2680 # handle other included columns 

2681 if index.dialect_options["mssql"]["include"]: 

2682 inclusions = [ 

2683 index.table.c[col] if isinstance(col, str) else col 

2684 for col in index.dialect_options["mssql"]["include"] 

2685 ] 

2686 

2687 text += " INCLUDE (%s)" % ", ".join( 

2688 [preparer.quote(c.name) for c in inclusions] 

2689 ) 

2690 

2691 whereclause = index.dialect_options["mssql"]["where"] 

2692 

2693 if whereclause is not None: 

2694 whereclause = coercions.expect( 

2695 roles.DDLExpressionRole, whereclause 

2696 ) 

2697 

2698 where_compiled = self.sql_compiler.process( 

2699 whereclause, include_table=False, literal_binds=True 

2700 ) 

2701 text += " WHERE " + where_compiled 

2702 

2703 return text 

2704 

2705 def visit_drop_index(self, drop: DropIndex, **kw: Any) -> str: 

2706 index_name = self._prepared_index_name( 

2707 drop.element, include_schema=False 

2708 ) 

2709 table_name = self.preparer.format_table(drop.element.table) 

2710 if_exists = " IF EXISTS" if drop.if_exists else "" 

2711 return f"\nDROP INDEX{if_exists} {index_name} ON {table_name}" 

2712 

2713 def visit_primary_key_constraint(self, constraint, **kw): 

2714 if len(constraint) == 0: 

2715 return "" 

2716 text = "" 

2717 if constraint.name is not None: 

2718 text += "CONSTRAINT %s " % self.preparer.format_constraint( 

2719 constraint 

2720 ) 

2721 text += "PRIMARY KEY " 

2722 

2723 clustered = constraint.dialect_options["mssql"]["clustered"] 

2724 if clustered is not None: 

2725 if clustered: 

2726 text += "CLUSTERED " 

2727 else: 

2728 text += "NONCLUSTERED " 

2729 

2730 text += "(%s)" % ", ".join( 

2731 self.preparer.quote(c.name) for c in constraint 

2732 ) 

2733 text += self.define_constraint_deferrability(constraint) 

2734 return text 

2735 

2736 def visit_unique_constraint(self, constraint, **kw): 

2737 if len(constraint) == 0: 

2738 return "" 

2739 text = "" 

2740 if constraint.name is not None: 

2741 formatted_name = self.preparer.format_constraint(constraint) 

2742 if formatted_name is not None: 

2743 text += "CONSTRAINT %s " % formatted_name 

2744 text += "UNIQUE %s" % self.define_unique_constraint_distinct( 

2745 constraint, **kw 

2746 ) 

2747 clustered = constraint.dialect_options["mssql"]["clustered"] 

2748 if clustered is not None: 

2749 if clustered: 

2750 text += "CLUSTERED " 

2751 else: 

2752 text += "NONCLUSTERED " 

2753 

2754 text += "(%s)" % ", ".join( 

2755 self.preparer.quote(c.name) for c in constraint 

2756 ) 

2757 text += self.define_constraint_deferrability(constraint) 

2758 return text 

2759 

2760 def visit_computed_column(self, generated, **kw): 

2761 text = "AS (%s)" % self.sql_compiler.process( 

2762 generated.sqltext, include_table=False, literal_binds=True 

2763 ) 

2764 # explicitly check for True|False since None means server default 

2765 if generated.persisted is True: 

2766 text += " PERSISTED" 

2767 return text 

2768 

2769 def visit_set_table_comment(self, create, **kw): 

2770 schema = self.preparer.schema_for_object(create.element) 

2771 schema_name = schema if schema else self.dialect.default_schema_name 

2772 return ( 

2773 "execute sp_addextendedproperty 'MS_Description', " 

2774 "{}, 'schema', {}, 'table', {}".format( 

2775 self.sql_compiler.render_literal_value( 

2776 create.element.comment, sqltypes.NVARCHAR() 

2777 ), 

2778 self.preparer.quote_schema(schema_name), 

2779 self.preparer.format_table(create.element, use_schema=False), 

2780 ) 

2781 ) 

2782 

2783 def visit_drop_table_comment(self, drop, **kw): 

2784 schema = self.preparer.schema_for_object(drop.element) 

2785 schema_name = schema if schema else self.dialect.default_schema_name 

2786 return ( 

2787 "execute sp_dropextendedproperty 'MS_Description', 'schema', " 

2788 "{}, 'table', {}".format( 

2789 self.preparer.quote_schema(schema_name), 

2790 self.preparer.format_table(drop.element, use_schema=False), 

2791 ) 

2792 ) 

2793 

2794 def visit_set_column_comment(self, create, **kw): 

2795 schema = self.preparer.schema_for_object(create.element.table) 

2796 schema_name = schema if schema else self.dialect.default_schema_name 

2797 return ( 

2798 "execute sp_addextendedproperty 'MS_Description', " 

2799 "{}, 'schema', {}, 'table', {}, 'column', {}".format( 

2800 self.sql_compiler.render_literal_value( 

2801 create.element.comment, sqltypes.NVARCHAR() 

2802 ), 

2803 self.preparer.quote_schema(schema_name), 

2804 self.preparer.format_table( 

2805 create.element.table, use_schema=False 

2806 ), 

2807 self.preparer.format_column(create.element), 

2808 ) 

2809 ) 

2810 

2811 def visit_drop_column_comment(self, drop, **kw): 

2812 schema = self.preparer.schema_for_object(drop.element.table) 

2813 schema_name = schema if schema else self.dialect.default_schema_name 

2814 return ( 

2815 "execute sp_dropextendedproperty 'MS_Description', 'schema', " 

2816 "{}, 'table', {}, 'column', {}".format( 

2817 self.preparer.quote_schema(schema_name), 

2818 self.preparer.format_table( 

2819 drop.element.table, use_schema=False 

2820 ), 

2821 self.preparer.format_column(drop.element), 

2822 ) 

2823 ) 

2824 

2825 def visit_create_sequence(self, create, **kw): 

2826 prefix = None 

2827 if create.element.data_type is not None: 

2828 data_type = create.element.data_type 

2829 prefix = " AS %s" % self.type_compiler.process(data_type) 

2830 return super().visit_create_sequence(create, prefix=prefix, **kw) 

2831 

2832 def visit_identity_column(self, identity, **kw): 

2833 text = " IDENTITY" 

2834 if identity.start is not None or identity.increment is not None: 

2835 start = 1 if identity.start is None else identity.start 

2836 increment = 1 if identity.increment is None else identity.increment 

2837 text += "(%s,%s)" % (start, increment) 

2838 return text 

2839 

2840 

2841class MSIdentifierPreparer(compiler.IdentifierPreparer): 

2842 reserved_words = RESERVED_WORDS 

2843 

2844 def __init__(self, dialect): 

2845 super().__init__( 

2846 dialect, 

2847 initial_quote="[", 

2848 final_quote="]", 

2849 quote_case_sensitive_collations=False, 

2850 ) 

2851 

2852 def _escape_identifier(self, value): 

2853 return value.replace("]", "]]") 

2854 

2855 def _unescape_identifier(self, value): 

2856 return value.replace("]]", "]") 

2857 

2858 def quote_schema(self, schema, force=None): 

2859 """Prepare a quoted table and schema name.""" 

2860 

2861 # need to re-implement the deprecation warning entirely 

2862 if force is not None: 

2863 # not using the util.deprecated_params() decorator in this 

2864 # case because of the additional function call overhead on this 

2865 # very performance-critical spot. 

2866 util.warn_deprecated( 

2867 "The IdentifierPreparer.quote_schema.force parameter is " 

2868 "deprecated and will be removed in a future release. This " 

2869 "flag has no effect on the behavior of the " 

2870 "IdentifierPreparer.quote method; please refer to " 

2871 "quoted_name().", 

2872 version="1.3", 

2873 ) 

2874 

2875 dbname, owner = _schema_elements(schema) 

2876 if dbname: 

2877 result = "%s.%s" % (self.quote(dbname), self.quote(owner)) 

2878 elif owner: 

2879 result = self.quote(owner) 

2880 else: 

2881 result = "" 

2882 return result 

2883 

2884 

2885def _db_plus_owner_listing(fn): 

2886 def wrap(dialect, connection, schema=None, **kw): 

2887 dbname, owner = _owner_plus_db(dialect, schema) 

2888 return _switch_db( 

2889 dbname, 

2890 connection, 

2891 fn, 

2892 dialect, 

2893 connection, 

2894 dbname, 

2895 owner, 

2896 schema, 

2897 **kw, 

2898 ) 

2899 

2900 return update_wrapper(wrap, fn) 

2901 

2902 

2903def _db_plus_owner(fn): 

2904 def wrap(dialect, connection, tablename, schema=None, **kw): 

2905 dbname, owner = _owner_plus_db(dialect, schema) 

2906 return _switch_db( 

2907 dbname, 

2908 connection, 

2909 fn, 

2910 dialect, 

2911 connection, 

2912 tablename, 

2913 dbname, 

2914 owner, 

2915 schema, 

2916 **kw, 

2917 ) 

2918 

2919 return update_wrapper(wrap, fn) 

2920 

2921 

2922def _switch_db(dbname, connection, fn, *arg, **kw): 

2923 if dbname: 

2924 current_db = connection.exec_driver_sql("select db_name()").scalar() 

2925 if current_db != dbname: 

2926 connection.exec_driver_sql( 

2927 "use %s" % connection.dialect.identifier_preparer.quote(dbname) 

2928 ) 

2929 try: 

2930 return fn(*arg, **kw) 

2931 finally: 

2932 if dbname and current_db != dbname: 

2933 connection.exec_driver_sql( 

2934 "use %s" 

2935 % connection.dialect.identifier_preparer.quote(current_db) 

2936 ) 

2937 

2938 

2939def _owner_plus_db(dialect, schema): 

2940 if not schema: 

2941 return None, dialect.default_schema_name 

2942 else: 

2943 return _schema_elements(schema) 

2944 

2945 

2946_memoized_schema = util.LRUCache() 

2947 

2948 

2949def _schema_elements(schema): 

2950 if isinstance(schema, quoted_name) and schema.quote: 

2951 return None, schema 

2952 

2953 if schema in _memoized_schema: 

2954 return _memoized_schema[schema] 

2955 

2956 # tests for this function are in: 

2957 # test/dialect/mssql/test_reflection.py -> 

2958 # OwnerPlusDBTest.test_owner_database_pairs 

2959 # test/dialect/mssql/test_compiler.py -> test_force_schema_* 

2960 # test/dialect/mssql/test_compiler.py -> test_schema_many_tokens_* 

2961 # 

2962 

2963 if schema.startswith("__[SCHEMA_"): 

2964 return None, schema 

2965 

2966 push = [] 

2967 symbol = "" 

2968 bracket = False 

2969 has_brackets = False 

2970 for token in re.split(r"(\[|\]|\.)", schema): 

2971 if not token: 

2972 continue 

2973 if token == "[": 

2974 bracket = True 

2975 has_brackets = True 

2976 elif token == "]": 

2977 bracket = False 

2978 elif not bracket and token == ".": 

2979 if has_brackets: 

2980 push.append("[%s]" % symbol) 

2981 else: 

2982 push.append(symbol) 

2983 symbol = "" 

2984 has_brackets = False 

2985 else: 

2986 symbol += token 

2987 if symbol: 

2988 push.append(symbol) 

2989 if len(push) > 1: 

2990 dbname, owner = ".".join(push[0:-1]), push[-1] 

2991 

2992 # test for internal brackets 

2993 if re.match(r".*\].*\[.*", dbname[1:-1]): 

2994 dbname = quoted_name(dbname, quote=False) 

2995 else: 

2996 dbname = dbname.lstrip("[").rstrip("]") 

2997 

2998 elif len(push): 

2999 dbname, owner = None, push[0] 

3000 else: 

3001 dbname, owner = None, None 

3002 

3003 _memoized_schema[schema] = dbname, owner 

3004 return dbname, owner 

3005 

3006 

3007class MSDialect(default.DefaultDialect): 

3008 # will assume it's at least mssql2005 

3009 name = "mssql" 

3010 supports_statement_cache = True 

3011 supports_default_values = True 

3012 supports_empty_insert = False 

3013 favor_returning_over_lastrowid = True 

3014 

3015 returns_native_bytes = True 

3016 

3017 supports_comments = True 

3018 supports_default_metavalue = False 

3019 """dialect supports INSERT... VALUES (DEFAULT) syntax - 

3020 SQL Server **does** support this, but **not** for the IDENTITY column, 

3021 so we can't turn this on. 

3022 

3023 """ 

3024 

3025 # supports_native_uuid is partial here, so we implement our 

3026 # own impl type 

3027 

3028 execution_ctx_cls = MSExecutionContext 

3029 use_scope_identity = True 

3030 max_identifier_length = 128 

3031 schema_name = "dbo" 

3032 

3033 insert_returning = True 

3034 update_returning = True 

3035 delete_returning = True 

3036 update_returning_multifrom = True 

3037 delete_returning_multifrom = True 

3038 

3039 colspecs = { 

3040 sqltypes.DateTime: _MSDateTime, 

3041 sqltypes.Date: _MSDate, 

3042 sqltypes.JSON: JSON, 

3043 sqltypes.JSON.JSONIndexType: JSONIndexType, 

3044 sqltypes.JSON.JSONPathType: JSONPathType, 

3045 sqltypes.Time: _BASETIMEIMPL, 

3046 sqltypes.Unicode: _MSUnicode, 

3047 sqltypes.UnicodeText: _MSUnicodeText, 

3048 DATETIMEOFFSET: DATETIMEOFFSET, 

3049 DATETIME2: DATETIME2, 

3050 SMALLDATETIME: SMALLDATETIME, 

3051 DATETIME: DATETIME, 

3052 sqltypes.Uuid: MSUUid, 

3053 } 

3054 

3055 engine_config_types = default.DefaultDialect.engine_config_types.union( 

3056 {"legacy_schema_aliasing": util.asbool} 

3057 ) 

3058 

3059 ischema_names = ischema_names 

3060 

3061 supports_sequences = True 

3062 sequences_optional = True 

3063 # This is actually used for autoincrement, where itentity is used that 

3064 # starts with 1. 

3065 # for sequences T-SQL's actual default is -9223372036854775808 

3066 default_sequence_base = 1 

3067 

3068 supports_native_boolean = False 

3069 non_native_boolean_check_constraint = False 

3070 supports_unicode_binds = True 

3071 postfetch_lastrowid = True 

3072 

3073 # may be changed at server inspection time for older SQL server versions 

3074 supports_multivalues_insert = True 

3075 

3076 use_insertmanyvalues = True 

3077 

3078 # note pyodbc will set this to False if fast_executemany is set, 

3079 # as of SQLAlchemy 2.0.9 

3080 use_insertmanyvalues_wo_returning = True 

3081 

3082 insertmanyvalues_implicit_sentinel = ( 

3083 InsertmanyvaluesSentinelOpts.AUTOINCREMENT 

3084 | InsertmanyvaluesSentinelOpts.IDENTITY 

3085 | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT 

3086 ) 

3087 

3088 # "The incoming request has too many parameters. The server supports a " 

3089 # "maximum of 2100 parameters." 

3090 # in fact you can have 2099 parameters. 

3091 insertmanyvalues_max_parameters = 2099 

3092 

3093 _supports_offset_fetch = False 

3094 _supports_nvarchar_max = False 

3095 

3096 legacy_schema_aliasing = False 

3097 

3098 server_version_info = () 

3099 

3100 statement_compiler = MSSQLCompiler 

3101 ddl_compiler = MSDDLCompiler 

3102 type_compiler_cls = MSTypeCompiler 

3103 preparer = MSIdentifierPreparer 

3104 

3105 construct_arguments = [ 

3106 (sa_schema.PrimaryKeyConstraint, {"clustered": None}), 

3107 (sa_schema.UniqueConstraint, {"clustered": None}), 

3108 ( 

3109 sa_schema.Index, 

3110 { 

3111 "clustered": None, 

3112 "include": None, 

3113 "where": None, 

3114 "columnstore": None, 

3115 }, 

3116 ), 

3117 ( 

3118 sa_schema.Column, 

3119 {"identity_start": None, "identity_increment": None}, 

3120 ), 

3121 ] 

3122 

3123 def __init__( 

3124 self, 

3125 query_timeout=None, 

3126 use_scope_identity=True, 

3127 schema_name="dbo", 

3128 deprecate_large_types=None, 

3129 supports_comments=None, 

3130 json_serializer=None, 

3131 json_deserializer=None, 

3132 legacy_schema_aliasing=None, 

3133 ignore_no_transaction_on_rollback=False, 

3134 **opts, 

3135 ): 

3136 self.query_timeout = int(query_timeout or 0) 

3137 self.schema_name = schema_name 

3138 

3139 self.use_scope_identity = use_scope_identity 

3140 self.deprecate_large_types = deprecate_large_types 

3141 self.ignore_no_transaction_on_rollback = ( 

3142 ignore_no_transaction_on_rollback 

3143 ) 

3144 self._user_defined_supports_comments = uds = supports_comments 

3145 if uds is not None: 

3146 self.supports_comments = uds 

3147 

3148 if legacy_schema_aliasing is not None: 

3149 util.warn_deprecated( 

3150 "The legacy_schema_aliasing parameter is " 

3151 "deprecated and will be removed in a future release.", 

3152 "1.4", 

3153 ) 

3154 self.legacy_schema_aliasing = legacy_schema_aliasing 

3155 

3156 super().__init__(**opts) 

3157 

3158 self._json_serializer = json_serializer 

3159 self._json_deserializer = json_deserializer 

3160 

3161 def do_savepoint(self, connection, name): 

3162 # give the DBAPI a push 

3163 connection.exec_driver_sql("IF @@TRANCOUNT = 0 BEGIN TRANSACTION") 

3164 super().do_savepoint(connection, name) 

3165 

3166 def do_release_savepoint(self, connection, name): 

3167 # SQL Server does not support RELEASE SAVEPOINT 

3168 pass 

3169 

3170 def do_rollback(self, dbapi_connection): 

3171 try: 

3172 super().do_rollback(dbapi_connection) 

3173 except self.dbapi.ProgrammingError as e: 

3174 if self.ignore_no_transaction_on_rollback and re.match( 

3175 r".*\b111214\b", str(e) 

3176 ): 

3177 util.warn( 

3178 "ProgrammingError 111214 " 

3179 "'No corresponding transaction found.' " 

3180 "has been suppressed via " 

3181 "ignore_no_transaction_on_rollback=True" 

3182 ) 

3183 else: 

3184 raise 

3185 

3186 _isolation_lookup = { 

3187 "SERIALIZABLE", 

3188 "READ UNCOMMITTED", 

3189 "READ COMMITTED", 

3190 "REPEATABLE READ", 

3191 "SNAPSHOT", 

3192 } 

3193 

3194 def get_isolation_level_values(self, dbapi_connection): 

3195 return list(self._isolation_lookup) 

3196 

3197 def set_isolation_level(self, dbapi_connection, level): 

3198 cursor = dbapi_connection.cursor() 

3199 cursor.execute(f"SET TRANSACTION ISOLATION LEVEL {level}") 

3200 cursor.close() 

3201 if level == "SNAPSHOT": 

3202 dbapi_connection.commit() 

3203 

3204 def get_isolation_level(self, dbapi_connection): 

3205 cursor = dbapi_connection.cursor() 

3206 view_name = "sys.system_views" 

3207 try: 

3208 cursor.execute( 

3209 ( 

3210 "SELECT name FROM {} WHERE name IN " 

3211 "('dm_exec_sessions', 'dm_pdw_nodes_exec_sessions')" 

3212 ).format(view_name) 

3213 ) 

3214 row = cursor.fetchone() 

3215 if not row: 

3216 raise NotImplementedError( 

3217 "Can't fetch isolation level on this particular " 

3218 "SQL Server version." 

3219 ) 

3220 

3221 view_name = f"sys.{row[0]}" 

3222 

3223 cursor.execute( 

3224 """ 

3225 SELECT CASE transaction_isolation_level 

3226 WHEN 0 THEN NULL 

3227 WHEN 1 THEN 'READ UNCOMMITTED' 

3228 WHEN 2 THEN 'READ COMMITTED' 

3229 WHEN 3 THEN 'REPEATABLE READ' 

3230 WHEN 4 THEN 'SERIALIZABLE' 

3231 WHEN 5 THEN 'SNAPSHOT' END 

3232 AS TRANSACTION_ISOLATION_LEVEL 

3233 FROM {} 

3234 where session_id = @@SPID 

3235 """.format( 

3236 view_name 

3237 ) 

3238 ) 

3239 except self.dbapi.Error as err: 

3240 raise NotImplementedError( 

3241 "Can't fetch isolation level; encountered error {} when " 

3242 'attempting to query the "{}" view.'.format(err, view_name) 

3243 ) from err 

3244 else: 

3245 row = cursor.fetchone() 

3246 return row[0].upper() 

3247 finally: 

3248 cursor.close() 

3249 

3250 def initialize(self, connection): 

3251 super().initialize(connection) 

3252 self._setup_version_attributes() 

3253 self._setup_supports_nvarchar_max(connection) 

3254 self._setup_supports_comments(connection) 

3255 

3256 def _setup_version_attributes(self): 

3257 if self.server_version_info[0] not in list(range(8, 17)): 

3258 util.warn( 

3259 "Unrecognized server version info '%s'. Some SQL Server " 

3260 "features may not function properly." 

3261 % ".".join(str(x) for x in self.server_version_info) 

3262 ) 

3263 

3264 if self.server_version_info >= MS_2008_VERSION: 

3265 self.supports_multivalues_insert = True 

3266 else: 

3267 self.supports_multivalues_insert = False 

3268 

3269 if self.deprecate_large_types is None: 

3270 self.deprecate_large_types = ( 

3271 self.server_version_info >= MS_2012_VERSION 

3272 ) 

3273 

3274 self._supports_offset_fetch = ( 

3275 self.server_version_info and self.server_version_info[0] >= 11 

3276 ) 

3277 

3278 def _setup_supports_nvarchar_max(self, connection): 

3279 try: 

3280 connection.scalar( 

3281 sql.text("SELECT CAST('test max support' AS NVARCHAR(max))") 

3282 ) 

3283 except exc.DBAPIError: 

3284 self._supports_nvarchar_max = False 

3285 else: 

3286 self._supports_nvarchar_max = True 

3287 

3288 def _setup_supports_comments(self, connection): 

3289 if self._user_defined_supports_comments is not None: 

3290 return 

3291 

3292 try: 

3293 connection.scalar( 

3294 sql.text( 

3295 "SELECT 1 FROM fn_listextendedproperty" 

3296 "(default, default, default, default, " 

3297 "default, default, default)" 

3298 ) 

3299 ) 

3300 except exc.DBAPIError: 

3301 self.supports_comments = False 

3302 else: 

3303 self.supports_comments = True 

3304 

3305 def _get_default_schema_name(self, connection): 

3306 query = sql.text("SELECT schema_name()") 

3307 default_schema_name = connection.scalar(query) 

3308 if default_schema_name is not None: 

3309 # guard against the case where the default_schema_name is being 

3310 # fed back into a table reflection function. 

3311 return quoted_name(default_schema_name, quote=True) 

3312 else: 

3313 return self.schema_name 

3314 

3315 @_db_plus_owner 

3316 def has_table(self, connection, tablename, dbname, owner, schema, **kw): 

3317 self._ensure_has_table_connection(connection) 

3318 

3319 return self._internal_has_table(connection, tablename, owner, **kw) 

3320 

3321 @reflection.cache 

3322 @_db_plus_owner 

3323 def has_sequence( 

3324 self, connection, sequencename, dbname, owner, schema, **kw 

3325 ): 

3326 sequences = ischema.sequences 

3327 

3328 s = sql.select(sequences.c.sequence_name).where( 

3329 sequences.c.sequence_name == sequencename 

3330 ) 

3331 

3332 if owner: 

3333 s = s.where(sequences.c.sequence_schema == owner) 

3334 

3335 c = connection.execute(s) 

3336 

3337 return c.first() is not None 

3338 

3339 @reflection.cache 

3340 @_db_plus_owner_listing 

3341 def get_sequence_names(self, connection, dbname, owner, schema, **kw): 

3342 sequences = ischema.sequences 

3343 

3344 s = sql.select(sequences.c.sequence_name) 

3345 if owner: 

3346 s = s.where(sequences.c.sequence_schema == owner) 

3347 

3348 c = connection.execute(s) 

3349 

3350 return [row[0] for row in c] 

3351 

3352 @reflection.cache 

3353 def get_schema_names(self, connection, **kw): 

3354 s = sql.select(ischema.schemata.c.schema_name).order_by( 

3355 ischema.schemata.c.schema_name 

3356 ) 

3357 schema_names = [r[0] for r in connection.execute(s)] 

3358 return schema_names 

3359 

3360 @reflection.cache 

3361 @_db_plus_owner_listing 

3362 def get_table_names(self, connection, dbname, owner, schema, **kw): 

3363 tables = ischema.tables 

3364 s = ( 

3365 sql.select(tables.c.table_name) 

3366 .where( 

3367 sql.and_( 

3368 tables.c.table_schema == owner, 

3369 tables.c.table_type == "BASE TABLE", 

3370 ) 

3371 ) 

3372 .order_by(tables.c.table_name) 

3373 ) 

3374 table_names = [r[0] for r in connection.execute(s)] 

3375 return table_names 

3376 

3377 @reflection.cache 

3378 @_db_plus_owner_listing 

3379 def get_view_names(self, connection, dbname, owner, schema, **kw): 

3380 tables = ischema.tables 

3381 s = ( 

3382 sql.select(tables.c.table_name) 

3383 .where( 

3384 sql.and_( 

3385 tables.c.table_schema == owner, 

3386 tables.c.table_type == "VIEW", 

3387 ) 

3388 ) 

3389 .order_by(tables.c.table_name) 

3390 ) 

3391 view_names = [r[0] for r in connection.execute(s)] 

3392 return view_names 

3393 

3394 @reflection.cache 

3395 def _internal_has_table(self, connection, tablename, owner, **kw): 

3396 if tablename.startswith("#"): # temporary table 

3397 # mssql does not support temporary views 

3398 # SQL Error [4103] [S0001]: "#v": Temporary views are not allowed 

3399 return bool( 

3400 connection.scalar( 

3401 # U filters on user tables only. 

3402 text("SELECT object_id(:table_name, 'U')"), 

3403 {"table_name": f"tempdb.dbo.[{tablename}]"}, 

3404 ) 

3405 ) 

3406 else: 

3407 tables = ischema.tables 

3408 

3409 s = sql.select(tables.c.table_name).where( 

3410 sql.and_( 

3411 sql.or_( 

3412 tables.c.table_type == "BASE TABLE", 

3413 tables.c.table_type == "VIEW", 

3414 ), 

3415 tables.c.table_name == tablename, 

3416 ) 

3417 ) 

3418 

3419 if owner: 

3420 s = s.where(tables.c.table_schema == owner) 

3421 

3422 c = connection.execute(s) 

3423 

3424 return c.first() is not None 

3425 

3426 def _default_or_error(self, connection, tablename, owner, method, **kw): 

3427 # TODO: try to avoid having to run a separate query here 

3428 if self._internal_has_table(connection, tablename, owner, **kw): 

3429 return method() 

3430 else: 

3431 raise exc.NoSuchTableError(f"{owner}.{tablename}") 

3432 

3433 @reflection.cache 

3434 @_db_plus_owner 

3435 def get_indexes(self, connection, tablename, dbname, owner, schema, **kw): 

3436 filter_definition = ( 

3437 "ind.filter_definition" 

3438 if self.server_version_info >= MS_2008_VERSION 

3439 else "NULL as filter_definition" 

3440 ) 

3441 rp = connection.execution_options(future_result=True).execute( 

3442 sql.text( 

3443 f""" 

3444select 

3445 ind.index_id, 

3446 ind.is_unique, 

3447 ind.name, 

3448 ind.type, 

3449 {filter_definition} 

3450from 

3451 sys.indexes as ind 

3452join sys.tables as tab on 

3453 ind.object_id = tab.object_id 

3454join sys.schemas as sch on 

3455 sch.schema_id = tab.schema_id 

3456where 

3457 tab.name = :tabname 

3458 and sch.name = :schname 

3459 and ind.is_primary_key = 0 

3460 and ind.type != 0 

3461order by 

3462 ind.name 

3463 """ 

3464 ) 

3465 .bindparams( 

3466 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()), 

3467 sql.bindparam("schname", owner, ischema.CoerceUnicode()), 

3468 ) 

3469 .columns(name=sqltypes.Unicode()) 

3470 ) 

3471 indexes = {} 

3472 for row in rp.mappings(): 

3473 indexes[row["index_id"]] = current = { 

3474 "name": row["name"], 

3475 "unique": row["is_unique"] == 1, 

3476 "column_names": [], 

3477 "include_columns": [], 

3478 "dialect_options": {}, 

3479 } 

3480 

3481 do = current["dialect_options"] 

3482 index_type = row["type"] 

3483 if index_type in {1, 2}: 

3484 do["mssql_clustered"] = index_type == 1 

3485 if index_type in {5, 6}: 

3486 do["mssql_clustered"] = index_type == 5 

3487 do["mssql_columnstore"] = True 

3488 if row["filter_definition"] is not None: 

3489 do["mssql_where"] = row["filter_definition"] 

3490 

3491 rp = connection.execution_options(future_result=True).execute( 

3492 sql.text( 

3493 """ 

3494select 

3495 ind_col.index_id, 

3496 col.name, 

3497 ind_col.is_included_column 

3498from 

3499 sys.columns as col 

3500join sys.tables as tab on 

3501 tab.object_id = col.object_id 

3502join sys.index_columns as ind_col on 

3503 ind_col.column_id = col.column_id 

3504 and ind_col.object_id = tab.object_id 

3505join sys.schemas as sch on 

3506 sch.schema_id = tab.schema_id 

3507where 

3508 tab.name = :tabname 

3509 and sch.name = :schname 

3510order by 

3511 ind_col.index_id, 

3512 ind_col.key_ordinal 

3513 """ 

3514 ) 

3515 .bindparams( 

3516 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()), 

3517 sql.bindparam("schname", owner, ischema.CoerceUnicode()), 

3518 ) 

3519 .columns(name=sqltypes.Unicode()) 

3520 ) 

3521 for row in rp.mappings(): 

3522 if row["index_id"] not in indexes: 

3523 continue 

3524 index_def = indexes[row["index_id"]] 

3525 is_colstore = index_def["dialect_options"].get("mssql_columnstore") 

3526 is_clustered = index_def["dialect_options"].get("mssql_clustered") 

3527 if not (is_colstore and is_clustered): 

3528 # a clustered columnstore index includes all columns but does 

3529 # not want them in the index definition 

3530 if row["is_included_column"] and not is_colstore: 

3531 # a noncludsted columnstore index reports that includes 

3532 # columns but requires that are listed as normal columns 

3533 index_def["include_columns"].append(row["name"]) 

3534 else: 

3535 index_def["column_names"].append(row["name"]) 

3536 for index_info in indexes.values(): 

3537 # NOTE: "root level" include_columns is legacy, now part of 

3538 # dialect_options (issue #7382) 

3539 index_info["dialect_options"]["mssql_include"] = index_info[ 

3540 "include_columns" 

3541 ] 

3542 

3543 if indexes: 

3544 return list(indexes.values()) 

3545 else: 

3546 return self._default_or_error( 

3547 connection, tablename, owner, ReflectionDefaults.indexes, **kw 

3548 ) 

3549 

3550 @reflection.cache 

3551 @_db_plus_owner 

3552 def get_view_definition( 

3553 self, connection, viewname, dbname, owner, schema, **kw 

3554 ): 

3555 view_def = connection.execute( 

3556 sql.text( 

3557 "select mod.definition " 

3558 "from sys.sql_modules as mod " 

3559 "join sys.views as views on mod.object_id = views.object_id " 

3560 "join sys.schemas as sch on views.schema_id = sch.schema_id " 

3561 "where views.name=:viewname and sch.name=:schname" 

3562 ).bindparams( 

3563 sql.bindparam("viewname", viewname, ischema.CoerceUnicode()), 

3564 sql.bindparam("schname", owner, ischema.CoerceUnicode()), 

3565 ) 

3566 ).scalar() 

3567 if view_def: 

3568 return view_def 

3569 else: 

3570 raise exc.NoSuchTableError(f"{owner}.{viewname}") 

3571 

3572 @reflection.cache 

3573 def get_table_comment(self, connection, table_name, schema=None, **kw): 

3574 if not self.supports_comments: 

3575 raise NotImplementedError( 

3576 "Can't get table comments on current SQL Server version in use" 

3577 ) 

3578 

3579 schema_name = schema if schema else self.default_schema_name 

3580 COMMENT_SQL = """ 

3581 SELECT cast(com.value as nvarchar(max)) 

3582 FROM fn_listextendedproperty('MS_Description', 

3583 'schema', :schema, 'table', :table, NULL, NULL 

3584 ) as com; 

3585 """ 

3586 

3587 comment = connection.execute( 

3588 sql.text(COMMENT_SQL).bindparams( 

3589 sql.bindparam("schema", schema_name, ischema.CoerceUnicode()), 

3590 sql.bindparam("table", table_name, ischema.CoerceUnicode()), 

3591 ) 

3592 ).scalar() 

3593 if comment: 

3594 return {"text": comment} 

3595 else: 

3596 return self._default_or_error( 

3597 connection, 

3598 table_name, 

3599 None, 

3600 ReflectionDefaults.table_comment, 

3601 **kw, 

3602 ) 

3603 

3604 def _temp_table_name_like_pattern(self, tablename): 

3605 # LIKE uses '%' to match zero or more characters and '_' to match any 

3606 # single character. We want to match literal underscores, so T-SQL 

3607 # requires that we enclose them in square brackets. 

3608 return tablename + ( 

3609 ("[_][_][_]%") if not tablename.startswith("##") else "" 

3610 ) 

3611 

3612 def _get_internal_temp_table_name(self, connection, tablename): 

3613 # it's likely that schema is always "dbo", but since we can 

3614 # get it here, let's get it. 

3615 # see https://stackoverflow.com/questions/8311959/ 

3616 # specifying-schema-for-temporary-tables 

3617 

3618 try: 

3619 return connection.execute( 

3620 sql.text( 

3621 "select table_schema, table_name " 

3622 "from tempdb.information_schema.tables " 

3623 "where table_name like :p1" 

3624 ), 

3625 {"p1": self._temp_table_name_like_pattern(tablename)}, 

3626 ).one() 

3627 except exc.MultipleResultsFound as me: 

3628 raise exc.UnreflectableTableError( 

3629 "Found more than one temporary table named '%s' in tempdb " 

3630 "at this time. Cannot reliably resolve that name to its " 

3631 "internal table name." % tablename 

3632 ) from me 

3633 except exc.NoResultFound as ne: 

3634 raise exc.NoSuchTableError( 

3635 "Unable to find a temporary table named '%s' in tempdb." 

3636 % tablename 

3637 ) from ne 

3638 

3639 @reflection.cache 

3640 @_db_plus_owner 

3641 def get_columns(self, connection, tablename, dbname, owner, schema, **kw): 

3642 sys_columns = ischema.sys_columns 

3643 sys_types = ischema.sys_types 

3644 sys_default_constraints = ischema.sys_default_constraints 

3645 computed_cols = ischema.computed_columns 

3646 identity_cols = ischema.identity_columns 

3647 extended_properties = ischema.extended_properties 

3648 

3649 # to access sys tables, need an object_id. 

3650 # object_id() can normally match to the unquoted name even if it 

3651 # has special characters. however it also accepts quoted names, 

3652 # which means for the special case that the name itself has 

3653 # "quotes" (e.g. brackets for SQL Server) we need to "quote" (e.g. 

3654 # bracket) that name anyway. Fixed as part of #12654 

3655 

3656 is_temp_table = tablename.startswith("#") 

3657 if is_temp_table: 

3658 owner, tablename = self._get_internal_temp_table_name( 

3659 connection, tablename 

3660 ) 

3661 

3662 object_id_tokens = [self.identifier_preparer.quote(tablename)] 

3663 if owner: 

3664 object_id_tokens.insert(0, self.identifier_preparer.quote(owner)) 

3665 

3666 if is_temp_table: 

3667 object_id_tokens.insert(0, "tempdb") 

3668 

3669 object_id = func.object_id(".".join(object_id_tokens)) 

3670 

3671 whereclause = sys_columns.c.object_id == object_id 

3672 

3673 if self._supports_nvarchar_max: 

3674 computed_definition = computed_cols.c.definition 

3675 else: 

3676 # tds_version 4.2 does not support NVARCHAR(MAX) 

3677 computed_definition = sql.cast( 

3678 computed_cols.c.definition, NVARCHAR(4000) 

3679 ) 

3680 

3681 s = ( 

3682 sql.select( 

3683 sys_columns.c.name, 

3684 sys_types.c.name, 

3685 sys_columns.c.is_nullable, 

3686 sys_columns.c.max_length, 

3687 sys_columns.c.precision, 

3688 sys_columns.c.scale, 

3689 sys_default_constraints.c.definition, 

3690 sys_columns.c.collation_name, 

3691 computed_definition, 

3692 computed_cols.c.is_persisted, 

3693 identity_cols.c.is_identity, 

3694 identity_cols.c.seed_value, 

3695 identity_cols.c.increment_value, 

3696 extended_properties.c.value.label("comment"), 

3697 ) 

3698 .select_from(sys_columns) 

3699 .join( 

3700 sys_types, 

3701 onclause=sys_columns.c.user_type_id 

3702 == sys_types.c.user_type_id, 

3703 ) 

3704 .outerjoin( 

3705 sys_default_constraints, 

3706 sql.and_( 

3707 sys_default_constraints.c.object_id 

3708 == sys_columns.c.default_object_id, 

3709 sys_default_constraints.c.parent_column_id 

3710 == sys_columns.c.column_id, 

3711 ), 

3712 ) 

3713 .outerjoin( 

3714 computed_cols, 

3715 onclause=sql.and_( 

3716 computed_cols.c.object_id == sys_columns.c.object_id, 

3717 computed_cols.c.column_id == sys_columns.c.column_id, 

3718 ), 

3719 ) 

3720 .outerjoin( 

3721 identity_cols, 

3722 onclause=sql.and_( 

3723 identity_cols.c.object_id == sys_columns.c.object_id, 

3724 identity_cols.c.column_id == sys_columns.c.column_id, 

3725 ), 

3726 ) 

3727 .outerjoin( 

3728 extended_properties, 

3729 onclause=sql.and_( 

3730 extended_properties.c["class"] == 1, 

3731 extended_properties.c.name == "MS_Description", 

3732 sys_columns.c.object_id == extended_properties.c.major_id, 

3733 sys_columns.c.column_id == extended_properties.c.minor_id, 

3734 ), 

3735 ) 

3736 .where(whereclause) 

3737 .order_by(sys_columns.c.column_id) 

3738 ) 

3739 

3740 if is_temp_table: 

3741 exec_opts = {"schema_translate_map": {"sys": "tempdb.sys"}} 

3742 else: 

3743 exec_opts = {"schema_translate_map": {}} 

3744 c = connection.execution_options(**exec_opts).execute(s) 

3745 

3746 cols = [] 

3747 for row in c.mappings(): 

3748 name = row[sys_columns.c.name] 

3749 type_ = row[sys_types.c.name] 

3750 nullable = row[sys_columns.c.is_nullable] == 1 

3751 maxlen = row[sys_columns.c.max_length] 

3752 numericprec = row[sys_columns.c.precision] 

3753 numericscale = row[sys_columns.c.scale] 

3754 default = row[sys_default_constraints.c.definition] 

3755 collation = row[sys_columns.c.collation_name] 

3756 definition = row[computed_definition] 

3757 is_persisted = row[computed_cols.c.is_persisted] 

3758 is_identity = row[identity_cols.c.is_identity] 

3759 identity_start = row[identity_cols.c.seed_value] 

3760 identity_increment = row[identity_cols.c.increment_value] 

3761 comment = row[extended_properties.c.value] 

3762 

3763 coltype = self.ischema_names.get(type_, None) 

3764 

3765 kwargs = {} 

3766 

3767 if coltype in ( 

3768 MSBinary, 

3769 MSVarBinary, 

3770 sqltypes.LargeBinary, 

3771 ): 

3772 kwargs["length"] = maxlen if maxlen != -1 else None 

3773 elif coltype in ( 

3774 MSString, 

3775 MSChar, 

3776 MSText, 

3777 ): 

3778 kwargs["length"] = maxlen if maxlen != -1 else None 

3779 if collation: 

3780 kwargs["collation"] = collation 

3781 elif coltype in ( 

3782 MSNVarchar, 

3783 MSNChar, 

3784 MSNText, 

3785 ): 

3786 kwargs["length"] = maxlen // 2 if maxlen != -1 else None 

3787 if collation: 

3788 kwargs["collation"] = collation 

3789 

3790 if coltype is None: 

3791 util.warn( 

3792 "Did not recognize type '%s' of column '%s'" 

3793 % (type_, name) 

3794 ) 

3795 coltype = sqltypes.NULLTYPE 

3796 else: 

3797 if issubclass(coltype, sqltypes.Numeric): 

3798 kwargs["precision"] = numericprec 

3799 

3800 if not issubclass(coltype, sqltypes.Float): 

3801 kwargs["scale"] = numericscale 

3802 

3803 coltype = coltype(**kwargs) 

3804 cdict = { 

3805 "name": name, 

3806 "type": coltype, 

3807 "nullable": nullable, 

3808 "default": default, 

3809 "autoincrement": is_identity is not None, 

3810 "comment": comment, 

3811 } 

3812 

3813 if definition is not None and is_persisted is not None: 

3814 cdict["computed"] = { 

3815 "sqltext": definition, 

3816 "persisted": is_persisted, 

3817 } 

3818 

3819 if is_identity is not None: 

3820 # identity_start and identity_increment are Decimal or None 

3821 if identity_start is None or identity_increment is None: 

3822 cdict["identity"] = {} 

3823 else: 

3824 if isinstance(coltype, sqltypes.BigInteger): 

3825 start = int(identity_start) 

3826 increment = int(identity_increment) 

3827 elif isinstance(coltype, sqltypes.Integer): 

3828 start = int(identity_start) 

3829 increment = int(identity_increment) 

3830 else: 

3831 start = identity_start 

3832 increment = identity_increment 

3833 

3834 cdict["identity"] = { 

3835 "start": start, 

3836 "increment": increment, 

3837 } 

3838 

3839 cols.append(cdict) 

3840 

3841 if cols: 

3842 return cols 

3843 else: 

3844 return self._default_or_error( 

3845 connection, tablename, owner, ReflectionDefaults.columns, **kw 

3846 ) 

3847 

3848 @reflection.cache 

3849 @_db_plus_owner 

3850 def get_pk_constraint( 

3851 self, connection, tablename, dbname, owner, schema, **kw 

3852 ): 

3853 pkeys = [] 

3854 TC = ischema.constraints 

3855 C = ischema.key_constraints.alias("C") 

3856 

3857 # Primary key constraints 

3858 s = ( 

3859 sql.select( 

3860 C.c.column_name, 

3861 TC.c.constraint_type, 

3862 C.c.constraint_name, 

3863 func.objectproperty( 

3864 func.object_id( 

3865 C.c.table_schema + "." + C.c.constraint_name 

3866 ), 

3867 "CnstIsClustKey", 

3868 ).label("is_clustered"), 

3869 ) 

3870 .where( 

3871 sql.and_( 

3872 TC.c.constraint_name == C.c.constraint_name, 

3873 TC.c.table_schema == C.c.table_schema, 

3874 C.c.table_name == tablename, 

3875 C.c.table_schema == owner, 

3876 ), 

3877 ) 

3878 .order_by(TC.c.constraint_name, C.c.ordinal_position) 

3879 ) 

3880 c = connection.execution_options(future_result=True).execute(s) 

3881 constraint_name = None 

3882 is_clustered = None 

3883 for row in c.mappings(): 

3884 if "PRIMARY" in row[TC.c.constraint_type.name]: 

3885 pkeys.append(row["COLUMN_NAME"]) 

3886 if constraint_name is None: 

3887 constraint_name = row[C.c.constraint_name.name] 

3888 if is_clustered is None: 

3889 is_clustered = row["is_clustered"] 

3890 if pkeys: 

3891 return { 

3892 "constrained_columns": pkeys, 

3893 "name": constraint_name, 

3894 "dialect_options": {"mssql_clustered": is_clustered}, 

3895 } 

3896 else: 

3897 return self._default_or_error( 

3898 connection, 

3899 tablename, 

3900 owner, 

3901 ReflectionDefaults.pk_constraint, 

3902 **kw, 

3903 ) 

3904 

3905 @reflection.cache 

3906 @_db_plus_owner 

3907 def get_foreign_keys( 

3908 self, connection, tablename, dbname, owner, schema, **kw 

3909 ): 

3910 # Foreign key constraints 

3911 s = ( 

3912 text( 

3913 """\ 

3914WITH fk_info AS ( 

3915 SELECT 

3916 ischema_ref_con.constraint_schema, 

3917 ischema_ref_con.constraint_name, 

3918 ischema_key_col.ordinal_position, 

3919 ischema_key_col.table_schema, 

3920 ischema_key_col.table_name, 

3921 ischema_ref_con.unique_constraint_schema, 

3922 ischema_ref_con.unique_constraint_name, 

3923 ischema_ref_con.match_option, 

3924 ischema_ref_con.update_rule, 

3925 ischema_ref_con.delete_rule, 

3926 ischema_key_col.column_name AS constrained_column 

3927 FROM 

3928 INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS ischema_ref_con 

3929 INNER JOIN 

3930 INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col ON 

3931 ischema_key_col.table_schema = ischema_ref_con.constraint_schema 

3932 AND ischema_key_col.constraint_name = 

3933 ischema_ref_con.constraint_name 

3934 WHERE ischema_key_col.table_name = :tablename 

3935 AND ischema_key_col.table_schema = :owner 

3936), 

3937constraint_info AS ( 

3938 SELECT 

3939 ischema_key_col.constraint_schema, 

3940 ischema_key_col.constraint_name, 

3941 ischema_key_col.ordinal_position, 

3942 ischema_key_col.table_schema, 

3943 ischema_key_col.table_name, 

3944 ischema_key_col.column_name 

3945 FROM 

3946 INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col 

3947), 

3948index_info AS ( 

3949 SELECT 

3950 sys.schemas.name AS index_schema, 

3951 sys.indexes.name AS index_name, 

3952 sys.index_columns.key_ordinal AS ordinal_position, 

3953 sys.schemas.name AS table_schema, 

3954 sys.objects.name AS table_name, 

3955 sys.columns.name AS column_name 

3956 FROM 

3957 sys.indexes 

3958 INNER JOIN 

3959 sys.objects ON 

3960 sys.objects.object_id = sys.indexes.object_id 

3961 INNER JOIN 

3962 sys.schemas ON 

3963 sys.schemas.schema_id = sys.objects.schema_id 

3964 INNER JOIN 

3965 sys.index_columns ON 

3966 sys.index_columns.object_id = sys.objects.object_id 

3967 AND sys.index_columns.index_id = sys.indexes.index_id 

3968 INNER JOIN 

3969 sys.columns ON 

3970 sys.columns.object_id = sys.indexes.object_id 

3971 AND sys.columns.column_id = sys.index_columns.column_id 

3972) 

3973 SELECT 

3974 fk_info.constraint_schema, 

3975 fk_info.constraint_name, 

3976 fk_info.ordinal_position, 

3977 fk_info.constrained_column, 

3978 constraint_info.table_schema AS referred_table_schema, 

3979 constraint_info.table_name AS referred_table_name, 

3980 constraint_info.column_name AS referred_column, 

3981 fk_info.match_option, 

3982 fk_info.update_rule, 

3983 fk_info.delete_rule 

3984 FROM 

3985 fk_info INNER JOIN constraint_info ON 

3986 constraint_info.constraint_schema = 

3987 fk_info.unique_constraint_schema 

3988 AND constraint_info.constraint_name = 

3989 fk_info.unique_constraint_name 

3990 AND constraint_info.ordinal_position = fk_info.ordinal_position 

3991 UNION 

3992 SELECT 

3993 fk_info.constraint_schema, 

3994 fk_info.constraint_name, 

3995 fk_info.ordinal_position, 

3996 fk_info.constrained_column, 

3997 index_info.table_schema AS referred_table_schema, 

3998 index_info.table_name AS referred_table_name, 

3999 index_info.column_name AS referred_column, 

4000 fk_info.match_option, 

4001 fk_info.update_rule, 

4002 fk_info.delete_rule 

4003 FROM 

4004 fk_info INNER JOIN index_info ON 

4005 index_info.index_schema = fk_info.unique_constraint_schema 

4006 AND index_info.index_name = fk_info.unique_constraint_name 

4007 AND index_info.ordinal_position = fk_info.ordinal_position 

4008 AND NOT (index_info.table_schema = fk_info.table_schema 

4009 AND index_info.table_name = fk_info.table_name) 

4010 

4011 ORDER BY fk_info.constraint_schema, fk_info.constraint_name, 

4012 fk_info.ordinal_position 

4013""" 

4014 ) 

4015 .bindparams( 

4016 sql.bindparam("tablename", tablename, ischema.CoerceUnicode()), 

4017 sql.bindparam("owner", owner, ischema.CoerceUnicode()), 

4018 ) 

4019 .columns( 

4020 constraint_schema=sqltypes.Unicode(), 

4021 constraint_name=sqltypes.Unicode(), 

4022 table_schema=sqltypes.Unicode(), 

4023 table_name=sqltypes.Unicode(), 

4024 constrained_column=sqltypes.Unicode(), 

4025 referred_table_schema=sqltypes.Unicode(), 

4026 referred_table_name=sqltypes.Unicode(), 

4027 referred_column=sqltypes.Unicode(), 

4028 ) 

4029 ) 

4030 

4031 # group rows by constraint ID, to handle multi-column FKs 

4032 fkeys = util.defaultdict( 

4033 lambda: { 

4034 "name": None, 

4035 "constrained_columns": [], 

4036 "referred_schema": None, 

4037 "referred_table": None, 

4038 "referred_columns": [], 

4039 "options": {}, 

4040 } 

4041 ) 

4042 

4043 for r in connection.execute(s).all(): 

4044 ( 

4045 _, # constraint schema 

4046 rfknm, 

4047 _, # ordinal position 

4048 scol, 

4049 rschema, 

4050 rtbl, 

4051 rcol, 

4052 # TODO: we support match=<keyword> for foreign keys so 

4053 # we can support this also, PG has match=FULL for example 

4054 # but this seems to not be a valid value for SQL Server 

4055 _, # match rule 

4056 fkuprule, 

4057 fkdelrule, 

4058 ) = r 

4059 

4060 rec = fkeys[rfknm] 

4061 rec["name"] = rfknm 

4062 

4063 if fkuprule != "NO ACTION": 

4064 rec["options"]["onupdate"] = fkuprule 

4065 

4066 if fkdelrule != "NO ACTION": 

4067 rec["options"]["ondelete"] = fkdelrule 

4068 

4069 if not rec["referred_table"]: 

4070 rec["referred_table"] = rtbl 

4071 if schema is not None or owner != rschema: 

4072 if dbname: 

4073 rschema = dbname + "." + rschema 

4074 rec["referred_schema"] = rschema 

4075 

4076 local_cols, remote_cols = ( 

4077 rec["constrained_columns"], 

4078 rec["referred_columns"], 

4079 ) 

4080 

4081 local_cols.append(scol) 

4082 remote_cols.append(rcol) 

4083 

4084 if fkeys: 

4085 return list(fkeys.values()) 

4086 else: 

4087 return self._default_or_error( 

4088 connection, 

4089 tablename, 

4090 owner, 

4091 ReflectionDefaults.foreign_keys, 

4092 **kw, 

4093 )