Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/base.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1185 statements  

1# dialects/mssql/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9""" 

10.. dialect:: mssql 

11 :name: Microsoft SQL Server 

12 :normal_support: 2012+ 

13 :best_effort: 2005+ 

14 

15.. _mssql_external_dialects: 

16 

17External Dialects 

18----------------- 

19 

20In addition to the above DBAPI layers with native SQLAlchemy support, there 

21are third-party dialects for other DBAPI layers that are compatible 

22with SQL Server. See the "External Dialects" list on the 

23:ref:`dialect_toplevel` page. 

24 

25.. _mssql_identity: 

26 

27Auto Increment Behavior / IDENTITY Columns 

28------------------------------------------ 

29 

30SQL Server provides so-called "auto incrementing" behavior using the 

31``IDENTITY`` construct, which can be placed on any single integer column in a 

32table. SQLAlchemy considers ``IDENTITY`` within its default "autoincrement" 

33behavior for an integer primary key column, described at 

34:paramref:`_schema.Column.autoincrement`. This means that by default, 

35the first integer primary key column in a :class:`_schema.Table` will be 

36considered to be the identity column - unless it is associated with a 

37:class:`.Sequence` - and will generate DDL as such:: 

38 

39 from sqlalchemy import Table, MetaData, Column, Integer 

40 

41 m = MetaData() 

42 t = Table( 

43 "t", 

44 m, 

45 Column("id", Integer, primary_key=True), 

46 Column("x", Integer), 

47 ) 

48 m.create_all(engine) 

49 

50The above example will generate DDL as: 

51 

52.. sourcecode:: sql 

53 

54 CREATE TABLE t ( 

55 id INTEGER NOT NULL IDENTITY, 

56 x INTEGER NULL, 

57 PRIMARY KEY (id) 

58 ) 

59 

60For the case where this default generation of ``IDENTITY`` is not desired, 

61specify ``False`` for the :paramref:`_schema.Column.autoincrement` flag, 

62on the first integer primary key column:: 

63 

64 m = MetaData() 

65 t = Table( 

66 "t", 

67 m, 

68 Column("id", Integer, primary_key=True, autoincrement=False), 

69 Column("x", Integer), 

70 ) 

71 m.create_all(engine) 

72 

73To add the ``IDENTITY`` keyword to a non-primary key column, specify 

74``True`` for the :paramref:`_schema.Column.autoincrement` flag on the desired 

75:class:`_schema.Column` object, and ensure that 

76:paramref:`_schema.Column.autoincrement` 

77is set to ``False`` on any integer primary key column:: 

78 

79 m = MetaData() 

80 t = Table( 

81 "t", 

82 m, 

83 Column("id", Integer, primary_key=True, autoincrement=False), 

84 Column("x", Integer, autoincrement=True), 

85 ) 

86 m.create_all(engine) 

87 

88.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct 

89 in a :class:`_schema.Column` to specify the start and increment 

90 parameters of an IDENTITY. These replace 

91 the use of the :class:`.Sequence` object in order to specify these values. 

92 

93.. deprecated:: 1.4 

94 

95 The ``mssql_identity_start`` and ``mssql_identity_increment`` parameters 

96 to :class:`_schema.Column` are deprecated and should we replaced by 

97 an :class:`_schema.Identity` object. Specifying both ways of configuring 

98 an IDENTITY will result in a compile error. 

99 These options are also no longer returned as part of the 

100 ``dialect_options`` key in :meth:`_reflection.Inspector.get_columns`. 

101 Use the information in the ``identity`` key instead. 

102 

103.. deprecated:: 1.3 

104 

105 The use of :class:`.Sequence` to specify IDENTITY characteristics is 

106 deprecated and will be removed in a future release. Please use 

107 the :class:`_schema.Identity` object parameters 

108 :paramref:`_schema.Identity.start` and 

109 :paramref:`_schema.Identity.increment`. 

110 

111.. versionchanged:: 1.4 Removed the ability to use a :class:`.Sequence` 

112 object to modify IDENTITY characteristics. :class:`.Sequence` objects 

113 now only manipulate true T-SQL SEQUENCE types. 

114 

115.. note:: 

116 

117 There can only be one IDENTITY column on the table. When using 

118 ``autoincrement=True`` to enable the IDENTITY keyword, SQLAlchemy does not 

119 guard against multiple columns specifying the option simultaneously. The 

120 SQL Server database will instead reject the ``CREATE TABLE`` statement. 

121 

122.. note:: 

123 

124 An INSERT statement which attempts to provide a value for a column that is 

125 marked with IDENTITY will be rejected by SQL Server. In order for the 

126 value to be accepted, a session-level option "SET IDENTITY_INSERT" must be 

127 enabled. The SQLAlchemy SQL Server dialect will perform this operation 

128 automatically when using a core :class:`_expression.Insert` 

129 construct; if the 

130 execution specifies a value for the IDENTITY column, the "IDENTITY_INSERT" 

131 option will be enabled for the span of that statement's invocation.However, 

132 this scenario is not high performing and should not be relied upon for 

133 normal use. If a table doesn't actually require IDENTITY behavior in its 

134 integer primary key column, the keyword should be disabled when creating 

135 the table by ensuring that ``autoincrement=False`` is set. 

136 

137Controlling "Start" and "Increment" 

138^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

139 

140Specific control over the "start" and "increment" values for 

141the ``IDENTITY`` generator are provided using the 

142:paramref:`_schema.Identity.start` and :paramref:`_schema.Identity.increment` 

143parameters passed to the :class:`_schema.Identity` object:: 

144 

145 from sqlalchemy import Table, Integer, Column, Identity 

146 

147 test = Table( 

148 "test", 

149 metadata, 

150 Column( 

151 "id", Integer, primary_key=True, Identity(start=100, increment=10) 

152 ), 

153 Column("name", String(20)), 

154 ) 

155 

156The CREATE TABLE for the above :class:`_schema.Table` object would be: 

157 

158.. sourcecode:: sql 

159 

160 CREATE TABLE test ( 

161 id INTEGER NOT NULL IDENTITY(100,10) PRIMARY KEY, 

162 name VARCHAR(20) NULL, 

163 ) 

164 

165.. note:: 

166 

167 The :class:`_schema.Identity` object supports many other parameter in 

168 addition to ``start`` and ``increment``. These are not supported by 

169 SQL Server and will be ignored when generating the CREATE TABLE ddl. 

170 

171.. versionchanged:: 1.3.19 The :class:`_schema.Identity` object is 

172 now used to affect the 

173 ``IDENTITY`` generator for a :class:`_schema.Column` under SQL Server. 

174 Previously, the :class:`.Sequence` object was used. As SQL Server now 

175 supports real sequences as a separate construct, :class:`.Sequence` will be 

176 functional in the normal way starting from SQLAlchemy version 1.4. 

177 

178 

179Using IDENTITY with Non-Integer numeric types 

180^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

181 

182SQL Server also allows ``IDENTITY`` to be used with ``NUMERIC`` columns. To 

183implement this pattern smoothly in SQLAlchemy, the primary datatype of the 

184column should remain as ``Integer``, however the underlying implementation 

185type deployed to the SQL Server database can be specified as ``Numeric`` using 

186:meth:`.TypeEngine.with_variant`:: 

187 

188 from sqlalchemy import Column 

189 from sqlalchemy import Integer 

190 from sqlalchemy import Numeric 

191 from sqlalchemy import String 

192 from sqlalchemy.ext.declarative import declarative_base 

193 

194 Base = declarative_base() 

195 

196 

197 class TestTable(Base): 

198 __tablename__ = "test" 

199 id = Column( 

200 Integer().with_variant(Numeric(10, 0), "mssql"), 

201 primary_key=True, 

202 autoincrement=True, 

203 ) 

204 name = Column(String) 

205 

206In the above example, ``Integer().with_variant()`` provides clear usage 

207information that accurately describes the intent of the code. The general 

208restriction that ``autoincrement`` only applies to ``Integer`` is established 

209at the metadata level and not at the per-dialect level. 

210 

211When using the above pattern, the primary key identifier that comes back from 

212the insertion of a row, which is also the value that would be assigned to an 

213ORM object such as ``TestTable`` above, will be an instance of ``Decimal()`` 

214and not ``int`` when using SQL Server. The numeric return type of the 

215:class:`_types.Numeric` type can be changed to return floats by passing False 

216to :paramref:`_types.Numeric.asdecimal`. To normalize the return type of the 

217above ``Numeric(10, 0)`` to return Python ints (which also support "long" 

218integer values in Python 3), use :class:`_types.TypeDecorator` as follows:: 

219 

220 from sqlalchemy import TypeDecorator 

221 

222 

223 class NumericAsInteger(TypeDecorator): 

224 "normalize floating point return values into ints" 

225 

226 impl = Numeric(10, 0, asdecimal=False) 

227 cache_ok = True 

228 

229 def process_result_value(self, value, dialect): 

230 if value is not None: 

231 value = int(value) 

232 return value 

233 

234 

235 class TestTable(Base): 

236 __tablename__ = "test" 

237 id = Column( 

238 Integer().with_variant(NumericAsInteger, "mssql"), 

239 primary_key=True, 

240 autoincrement=True, 

241 ) 

242 name = Column(String) 

243 

244.. _mssql_insert_behavior: 

245 

246INSERT behavior 

247^^^^^^^^^^^^^^^^ 

248 

249Handling of the ``IDENTITY`` column at INSERT time involves two key 

250techniques. The most common is being able to fetch the "last inserted value" 

251for a given ``IDENTITY`` column, a process which SQLAlchemy performs 

252implicitly in many cases, most importantly within the ORM. 

253 

254The process for fetching this value has several variants: 

255 

256* In the vast majority of cases, RETURNING is used in conjunction with INSERT 

257 statements on SQL Server in order to get newly generated primary key values: 

258 

259 .. sourcecode:: sql 

260 

261 INSERT INTO t (x) OUTPUT inserted.id VALUES (?) 

262 

263 As of SQLAlchemy 2.0, the :ref:`engine_insertmanyvalues` feature is also 

264 used by default to optimize many-row INSERT statements; for SQL Server 

265 the feature takes place for both RETURNING and-non RETURNING 

266 INSERT statements. 

267 

268 .. versionchanged:: 2.0.10 The :ref:`engine_insertmanyvalues` feature for 

269 SQL Server was temporarily disabled for SQLAlchemy version 2.0.9 due to 

270 issues with row ordering. As of 2.0.10 the feature is re-enabled, with 

271 special case handling for the unit of work's requirement for RETURNING to 

272 be ordered. 

273 

274* When RETURNING is not available or has been disabled via 

275 ``implicit_returning=False``, either the ``scope_identity()`` function or 

276 the ``@@identity`` variable is used; behavior varies by backend: 

277 

278 * when using PyODBC, the phrase ``; select scope_identity()`` will be 

279 appended to the end of the INSERT statement; a second result set will be 

280 fetched in order to receive the value. Given a table as:: 

281 

282 t = Table( 

283 "t", 

284 metadata, 

285 Column("id", Integer, primary_key=True), 

286 Column("x", Integer), 

287 implicit_returning=False, 

288 ) 

289 

290 an INSERT will look like: 

291 

292 .. sourcecode:: sql 

293 

294 INSERT INTO t (x) VALUES (?); select scope_identity() 

295 

296 * Other dialects such as pymssql will call upon 

297 ``SELECT scope_identity() AS lastrowid`` subsequent to an INSERT 

298 statement. If the flag ``use_scope_identity=False`` is passed to 

299 :func:`_sa.create_engine`, 

300 the statement ``SELECT @@identity AS lastrowid`` 

301 is used instead. 

302 

303A table that contains an ``IDENTITY`` column will prohibit an INSERT statement 

304that refers to the identity column explicitly. The SQLAlchemy dialect will 

305detect when an INSERT construct, created using a core 

306:func:`_expression.insert` 

307construct (not a plain string SQL), refers to the identity column, and 

308in this case will emit ``SET IDENTITY_INSERT ON`` prior to the insert 

309statement proceeding, and ``SET IDENTITY_INSERT OFF`` subsequent to the 

310execution. Given this example:: 

311 

312 m = MetaData() 

313 t = Table( 

314 "t", m, Column("id", Integer, primary_key=True), Column("x", Integer) 

315 ) 

316 m.create_all(engine) 

317 

318 with engine.begin() as conn: 

319 conn.execute(t.insert(), {"id": 1, "x": 1}, {"id": 2, "x": 2}) 

320 

321The above column will be created with IDENTITY, however the INSERT statement 

322we emit is specifying explicit values. In the echo output we can see 

323how SQLAlchemy handles this: 

324 

325.. sourcecode:: sql 

326 

327 CREATE TABLE t ( 

328 id INTEGER NOT NULL IDENTITY(1,1), 

329 x INTEGER NULL, 

330 PRIMARY KEY (id) 

331 ) 

332 

333 COMMIT 

334 SET IDENTITY_INSERT t ON 

335 INSERT INTO t (id, x) VALUES (?, ?) 

336 ((1, 1), (2, 2)) 

337 SET IDENTITY_INSERT t OFF 

338 COMMIT 

339 

340 

341 

342This is an auxiliary use case suitable for testing and bulk insert scenarios. 

343 

344SEQUENCE support 

345---------------- 

346 

347The :class:`.Sequence` object creates "real" sequences, i.e., 

348``CREATE SEQUENCE``: 

349 

350.. sourcecode:: pycon+sql 

351 

352 >>> from sqlalchemy import Sequence 

353 >>> from sqlalchemy.schema import CreateSequence 

354 >>> from sqlalchemy.dialects import mssql 

355 >>> print( 

356 ... CreateSequence(Sequence("my_seq", start=1)).compile( 

357 ... dialect=mssql.dialect() 

358 ... ) 

359 ... ) 

360 {printsql}CREATE SEQUENCE my_seq START WITH 1 

361 

362For integer primary key generation, SQL Server's ``IDENTITY`` construct should 

363generally be preferred vs. sequence. 

364 

365.. tip:: 

366 

367 The default start value for T-SQL is ``-2**63`` instead of 1 as 

368 in most other SQL databases. Users should explicitly set the 

369 :paramref:`.Sequence.start` to 1 if that's the expected default:: 

370 

371 seq = Sequence("my_sequence", start=1) 

372 

373.. versionadded:: 1.4 added SQL Server support for :class:`.Sequence` 

374 

375.. versionchanged:: 2.0 The SQL Server dialect will no longer implicitly 

376 render "START WITH 1" for ``CREATE SEQUENCE``, which was the behavior 

377 first implemented in version 1.4. 

378 

379MAX on VARCHAR / NVARCHAR 

380------------------------- 

381 

382SQL Server supports the special string "MAX" within the 

383:class:`_types.VARCHAR` and :class:`_types.NVARCHAR` datatypes, 

384to indicate "maximum length possible". The dialect currently handles this as 

385a length of "None" in the base type, rather than supplying a 

386dialect-specific version of these types, so that a base type 

387specified such as ``VARCHAR(None)`` can assume "unlengthed" behavior on 

388more than one backend without using dialect-specific types. 

389 

390To build a SQL Server VARCHAR or NVARCHAR with MAX length, use None:: 

391 

392 my_table = Table( 

393 "my_table", 

394 metadata, 

395 Column("my_data", VARCHAR(None)), 

396 Column("my_n_data", NVARCHAR(None)), 

397 ) 

398 

399Collation Support 

400----------------- 

401 

402Character collations are supported by the base string types, 

403specified by the string argument "collation":: 

404 

405 from sqlalchemy import VARCHAR 

406 

407 Column("login", VARCHAR(32, collation="Latin1_General_CI_AS")) 

408 

409When such a column is associated with a :class:`_schema.Table`, the 

410CREATE TABLE statement for this column will yield: 

411 

412.. sourcecode:: sql 

413 

414 login VARCHAR(32) COLLATE Latin1_General_CI_AS NULL 

415 

416LIMIT/OFFSET Support 

417-------------------- 

418 

419MSSQL has added support for LIMIT / OFFSET as of SQL Server 2012, via the 

420"OFFSET n ROWS" and "FETCH NEXT n ROWS" clauses. SQLAlchemy supports these 

421syntaxes automatically if SQL Server 2012 or greater is detected. 

422 

423.. versionchanged:: 1.4 support added for SQL Server "OFFSET n ROWS" and 

424 "FETCH NEXT n ROWS" syntax. 

425 

426For statements that specify only LIMIT and no OFFSET, all versions of SQL 

427Server support the TOP keyword. This syntax is used for all SQL Server 

428versions when no OFFSET clause is present. A statement such as:: 

429 

430 select(some_table).limit(5) 

431 

432will render similarly to: 

433 

434.. sourcecode:: sql 

435 

436 SELECT TOP 5 col1, col2.. FROM table 

437 

438For versions of SQL Server prior to SQL Server 2012, a statement that uses 

439LIMIT and OFFSET, or just OFFSET alone, will be rendered using the 

440``ROW_NUMBER()`` window function. A statement such as:: 

441 

442 select(some_table).order_by(some_table.c.col3).limit(5).offset(10) 

443 

444will render similarly to: 

445 

446.. sourcecode:: sql 

447 

448 SELECT anon_1.col1, anon_1.col2 FROM (SELECT col1, col2, 

449 ROW_NUMBER() OVER (ORDER BY col3) AS 

450 mssql_rn FROM table WHERE t.x = :x_1) AS 

451 anon_1 WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1 

452 

453Note that when using LIMIT and/or OFFSET, whether using the older 

454or newer SQL Server syntaxes, the statement must have an ORDER BY as well, 

455else a :class:`.CompileError` is raised. 

456 

457.. _mssql_comment_support: 

458 

459DDL Comment Support 

460-------------------- 

461 

462Comment support, which includes DDL rendering for attributes such as 

463:paramref:`_schema.Table.comment` and :paramref:`_schema.Column.comment`, as 

464well as the ability to reflect these comments, is supported assuming a 

465supported version of SQL Server is in use. If a non-supported version such as 

466Azure Synapse is detected at first-connect time (based on the presence 

467of the ``fn_listextendedproperty`` SQL function), comment support including 

468rendering and table-comment reflection is disabled, as both features rely upon 

469SQL Server stored procedures and functions that are not available on all 

470backend types. 

471 

472To force comment support to be on or off, bypassing autodetection, set the 

473parameter ``supports_comments`` within :func:`_sa.create_engine`:: 

474 

475 e = create_engine("mssql+pyodbc://u:p@dsn", supports_comments=False) 

476 

477.. versionadded:: 2.0 Added support for table and column comments for 

478 the SQL Server dialect, including DDL generation and reflection. 

479 

480.. _mssql_isolation_level: 

481 

482Transaction Isolation Level 

483--------------------------- 

484 

485All SQL Server dialects support setting of transaction isolation level 

486both via a dialect-specific parameter 

487:paramref:`_sa.create_engine.isolation_level` 

488accepted by :func:`_sa.create_engine`, 

489as well as the :paramref:`.Connection.execution_options.isolation_level` 

490argument as passed to 

491:meth:`_engine.Connection.execution_options`. 

492This feature works by issuing the 

493command ``SET TRANSACTION ISOLATION LEVEL <level>`` for 

494each new connection. 

495 

496To set isolation level using :func:`_sa.create_engine`:: 

497 

498 engine = create_engine( 

499 "mssql+pyodbc://scott:tiger@ms_2008", isolation_level="REPEATABLE READ" 

500 ) 

501 

502To set using per-connection execution options:: 

503 

504 connection = engine.connect() 

505 connection = connection.execution_options(isolation_level="READ COMMITTED") 

506 

507Valid values for ``isolation_level`` include: 

508 

509* ``AUTOCOMMIT`` - pyodbc / pymssql-specific 

510* ``READ COMMITTED`` 

511* ``READ UNCOMMITTED`` 

512* ``REPEATABLE READ`` 

513* ``SERIALIZABLE`` 

514* ``SNAPSHOT`` - specific to SQL Server 

515 

516There are also more options for isolation level configurations, such as 

517"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply 

518different isolation level settings. See the discussion at 

519:ref:`dbapi_autocommit` for background. 

520 

521.. seealso:: 

522 

523 :ref:`dbapi_autocommit` 

524 

525.. _mssql_reset_on_return: 

526 

527Temporary Table / Resource Reset for Connection Pooling 

528------------------------------------------------------- 

529 

530The :class:`.QueuePool` connection pool implementation used 

531by the SQLAlchemy :class:`.Engine` object includes 

532:ref:`reset on return <pool_reset_on_return>` behavior that will invoke 

533the DBAPI ``.rollback()`` method when connections are returned to the pool. 

534While this rollback will clear out the immediate state used by the previous 

535transaction, it does not cover a wider range of session-level state, including 

536temporary tables as well as other server state such as prepared statement 

537handles and statement caches. An undocumented SQL Server procedure known 

538as ``sp_reset_connection`` is known to be a workaround for this issue which 

539will reset most of the session state that builds up on a connection, including 

540temporary tables. 

541 

542To install ``sp_reset_connection`` as the means of performing reset-on-return, 

543the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated in the 

544example below. The :paramref:`_sa.create_engine.pool_reset_on_return` parameter 

545is set to ``None`` so that the custom scheme can replace the default behavior 

546completely. The custom hook implementation calls ``.rollback()`` in any case, 

547as it's usually important that the DBAPI's own tracking of commit/rollback 

548will remain consistent with the state of the transaction:: 

549 

550 from sqlalchemy import create_engine 

551 from sqlalchemy import event 

552 

553 mssql_engine = create_engine( 

554 "mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server", 

555 # disable default reset-on-return scheme 

556 pool_reset_on_return=None, 

557 ) 

558 

559 

560 @event.listens_for(mssql_engine, "reset") 

561 def _reset_mssql(dbapi_connection, connection_record, reset_state): 

562 if not reset_state.terminate_only: 

563 dbapi_connection.execute("{call sys.sp_reset_connection}") 

564 

565 # so that the DBAPI itself knows that the connection has been 

566 # reset 

567 dbapi_connection.rollback() 

568 

569.. versionchanged:: 2.0.0b3 Added additional state arguments to 

570 the :meth:`.PoolEvents.reset` event and additionally ensured the event 

571 is invoked for all "reset" occurrences, so that it's appropriate 

572 as a place for custom "reset" handlers. Previous schemes which 

573 use the :meth:`.PoolEvents.checkin` handler remain usable as well. 

574 

575.. seealso:: 

576 

577 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation 

578 

579Nullability 

580----------- 

581MSSQL has support for three levels of column nullability. The default 

582nullability allows nulls and is explicit in the CREATE TABLE 

583construct: 

584 

585.. sourcecode:: sql 

586 

587 name VARCHAR(20) NULL 

588 

589If ``nullable=None`` is specified then no specification is made. In 

590other words the database's configured default is used. This will 

591render: 

592 

593.. sourcecode:: sql 

594 

595 name VARCHAR(20) 

596 

597If ``nullable`` is ``True`` or ``False`` then the column will be 

598``NULL`` or ``NOT NULL`` respectively. 

599 

600Date / Time Handling 

601-------------------- 

602DATE and TIME are supported. Bind parameters are converted 

603to datetime.datetime() objects as required by most MSSQL drivers, 

604and results are processed from strings if needed. 

605The DATE and TIME types are not available for MSSQL 2005 and 

606previous - if a server version below 2008 is detected, DDL 

607for these types will be issued as DATETIME. 

608 

609.. _mssql_large_type_deprecation: 

610 

611Large Text/Binary Type Deprecation 

612---------------------------------- 

613 

614Per 

615`SQL Server 2012/2014 Documentation <https://technet.microsoft.com/en-us/library/ms187993.aspx>`_, 

616the ``NTEXT``, ``TEXT`` and ``IMAGE`` datatypes are to be removed from SQL 

617Server in a future release. SQLAlchemy normally relates these types to the 

618:class:`.UnicodeText`, :class:`_expression.TextClause` and 

619:class:`.LargeBinary` datatypes. 

620 

621In order to accommodate this change, a new flag ``deprecate_large_types`` 

622is added to the dialect, which will be automatically set based on detection 

623of the server version in use, if not otherwise set by the user. The 

624behavior of this flag is as follows: 

625 

626* When this flag is ``True``, the :class:`.UnicodeText`, 

627 :class:`_expression.TextClause` and 

628 :class:`.LargeBinary` datatypes, when used to render DDL, will render the 

629 types ``NVARCHAR(max)``, ``VARCHAR(max)``, and ``VARBINARY(max)``, 

630 respectively. This is a new behavior as of the addition of this flag. 

631 

632* When this flag is ``False``, the :class:`.UnicodeText`, 

633 :class:`_expression.TextClause` and 

634 :class:`.LargeBinary` datatypes, when used to render DDL, will render the 

635 types ``NTEXT``, ``TEXT``, and ``IMAGE``, 

636 respectively. This is the long-standing behavior of these types. 

637 

638* The flag begins with the value ``None``, before a database connection is 

639 established. If the dialect is used to render DDL without the flag being 

640 set, it is interpreted the same as ``False``. 

641 

642* On first connection, the dialect detects if SQL Server version 2012 or 

643 greater is in use; if the flag is still at ``None``, it sets it to ``True`` 

644 or ``False`` based on whether 2012 or greater is detected. 

645 

646* The flag can be set to either ``True`` or ``False`` when the dialect 

647 is created, typically via :func:`_sa.create_engine`:: 

648 

649 eng = create_engine( 

650 "mssql+pymssql://user:pass@host/db", deprecate_large_types=True 

651 ) 

652 

653* Complete control over whether the "old" or "new" types are rendered is 

654 available in all SQLAlchemy versions by using the UPPERCASE type objects 

655 instead: :class:`_types.NVARCHAR`, :class:`_types.VARCHAR`, 

656 :class:`_types.VARBINARY`, :class:`_types.TEXT`, :class:`_mssql.NTEXT`, 

657 :class:`_mssql.IMAGE` 

658 will always remain fixed and always output exactly that 

659 type. 

660 

661.. _multipart_schema_names: 

662 

663Multipart Schema Names 

664---------------------- 

665 

666SQL Server schemas sometimes require multiple parts to their "schema" 

667qualifier, that is, including the database name and owner name as separate 

668tokens, such as ``mydatabase.dbo.some_table``. These multipart names can be set 

669at once using the :paramref:`_schema.Table.schema` argument of 

670:class:`_schema.Table`:: 

671 

672 Table( 

673 "some_table", 

674 metadata, 

675 Column("q", String(50)), 

676 schema="mydatabase.dbo", 

677 ) 

678 

679When performing operations such as table or component reflection, a schema 

680argument that contains a dot will be split into separate 

681"database" and "owner" components in order to correctly query the SQL 

682Server information schema tables, as these two values are stored separately. 

683Additionally, when rendering the schema name for DDL or SQL, the two 

684components will be quoted separately for case sensitive names and other 

685special characters. Given an argument as below:: 

686 

687 Table( 

688 "some_table", 

689 metadata, 

690 Column("q", String(50)), 

691 schema="MyDataBase.dbo", 

692 ) 

693 

694The above schema would be rendered as ``[MyDataBase].dbo``, and also in 

695reflection, would be reflected using "dbo" as the owner and "MyDataBase" 

696as the database name. 

697 

698To control how the schema name is broken into database / owner, 

699specify brackets (which in SQL Server are quoting characters) in the name. 

700Below, the "owner" will be considered as ``MyDataBase.dbo`` and the 

701"database" will be None:: 

702 

703 Table( 

704 "some_table", 

705 metadata, 

706 Column("q", String(50)), 

707 schema="[MyDataBase.dbo]", 

708 ) 

709 

710To individually specify both database and owner name with special characters 

711or embedded dots, use two sets of brackets:: 

712 

713 Table( 

714 "some_table", 

715 metadata, 

716 Column("q", String(50)), 

717 schema="[MyDataBase.Period].[MyOwner.Dot]", 

718 ) 

719 

720.. versionchanged:: 1.2 the SQL Server dialect now treats brackets as 

721 identifier delimiters splitting the schema into separate database 

722 and owner tokens, to allow dots within either name itself. 

723 

724.. _legacy_schema_rendering: 

725 

726Legacy Schema Mode 

727------------------ 

728 

729Very old versions of the MSSQL dialect introduced the behavior such that a 

730schema-qualified table would be auto-aliased when used in a 

731SELECT statement; given a table:: 

732 

733 account_table = Table( 

734 "account", 

735 metadata, 

736 Column("id", Integer, primary_key=True), 

737 Column("info", String(100)), 

738 schema="customer_schema", 

739 ) 

740 

741this legacy mode of rendering would assume that "customer_schema.account" 

742would not be accepted by all parts of the SQL statement, as illustrated 

743below: 

744 

745.. sourcecode:: pycon+sql 

746 

747 >>> eng = create_engine("mssql+pymssql://mydsn", legacy_schema_aliasing=True) 

748 >>> print(account_table.select().compile(eng)) 

749 {printsql}SELECT account_1.id, account_1.info 

750 FROM customer_schema.account AS account_1 

751 

752This mode of behavior is now off by default, as it appears to have served 

753no purpose; however in the case that legacy applications rely upon it, 

754it is available using the ``legacy_schema_aliasing`` argument to 

755:func:`_sa.create_engine` as illustrated above. 

756 

757.. deprecated:: 1.4 

758 

759 The ``legacy_schema_aliasing`` flag is now 

760 deprecated and will be removed in a future release. 

761 

762.. _mssql_indexes: 

763 

764Clustered Index Support 

765----------------------- 

766 

767The MSSQL dialect supports clustered indexes (and primary keys) via the 

768``mssql_clustered`` option. This option is available to :class:`.Index`, 

769:class:`.UniqueConstraint`. and :class:`.PrimaryKeyConstraint`. 

770For indexes this option can be combined with the ``mssql_columnstore`` one 

771to create a clustered columnstore index. 

772 

773To generate a clustered index:: 

774 

775 Index("my_index", table.c.x, mssql_clustered=True) 

776 

777which renders the index as ``CREATE CLUSTERED INDEX my_index ON table (x)``. 

778 

779To generate a clustered primary key use:: 

780 

781 Table( 

782 "my_table", 

783 metadata, 

784 Column("x", ...), 

785 Column("y", ...), 

786 PrimaryKeyConstraint("x", "y", mssql_clustered=True), 

787 ) 

788 

789which will render the table, for example, as: 

790 

791.. sourcecode:: sql 

792 

793 CREATE TABLE my_table ( 

794 x INTEGER NOT NULL, 

795 y INTEGER NOT NULL, 

796 PRIMARY KEY CLUSTERED (x, y) 

797 ) 

798 

799Similarly, we can generate a clustered unique constraint using:: 

800 

801 Table( 

802 "my_table", 

803 metadata, 

804 Column("x", ...), 

805 Column("y", ...), 

806 PrimaryKeyConstraint("x"), 

807 UniqueConstraint("y", mssql_clustered=True), 

808 ) 

809 

810To explicitly request a non-clustered primary key (for example, when 

811a separate clustered index is desired), use:: 

812 

813 Table( 

814 "my_table", 

815 metadata, 

816 Column("x", ...), 

817 Column("y", ...), 

818 PrimaryKeyConstraint("x", "y", mssql_clustered=False), 

819 ) 

820 

821which will render the table, for example, as: 

822 

823.. sourcecode:: sql 

824 

825 CREATE TABLE my_table ( 

826 x INTEGER NOT NULL, 

827 y INTEGER NOT NULL, 

828 PRIMARY KEY NONCLUSTERED (x, y) 

829 ) 

830 

831Columnstore Index Support 

832------------------------- 

833 

834The MSSQL dialect supports columnstore indexes via the ``mssql_columnstore`` 

835option. This option is available to :class:`.Index`. It be combined with 

836the ``mssql_clustered`` option to create a clustered columnstore index. 

837 

838To generate a columnstore index:: 

839 

840 Index("my_index", table.c.x, mssql_columnstore=True) 

841 

842which renders the index as ``CREATE COLUMNSTORE INDEX my_index ON table (x)``. 

843 

844To generate a clustered columnstore index provide no columns:: 

845 

846 idx = Index("my_index", mssql_clustered=True, mssql_columnstore=True) 

847 # required to associate the index with the table 

848 table.append_constraint(idx) 

849 

850the above renders the index as 

851``CREATE CLUSTERED COLUMNSTORE INDEX my_index ON table``. 

852 

853.. versionadded:: 2.0.18 

854 

855MSSQL-Specific Index Options 

856----------------------------- 

857 

858In addition to clustering, the MSSQL dialect supports other special options 

859for :class:`.Index`. 

860 

861INCLUDE 

862^^^^^^^ 

863 

864The ``mssql_include`` option renders INCLUDE(colname) for the given string 

865names:: 

866 

867 Index("my_index", table.c.x, mssql_include=["y"]) 

868 

869would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)`` 

870 

871.. _mssql_index_where: 

872 

873Filtered Indexes 

874^^^^^^^^^^^^^^^^ 

875 

876The ``mssql_where`` option renders WHERE(condition) for the given string 

877names:: 

878 

879 Index("my_index", table.c.x, mssql_where=table.c.x > 10) 

880 

881would render the index as ``CREATE INDEX my_index ON table (x) WHERE x > 10``. 

882 

883.. versionadded:: 1.3.4 

884 

885Index ordering 

886^^^^^^^^^^^^^^ 

887 

888Index ordering is available via functional expressions, such as:: 

889 

890 Index("my_index", table.c.x.desc()) 

891 

892would render the index as ``CREATE INDEX my_index ON table (x DESC)`` 

893 

894.. seealso:: 

895 

896 :ref:`schema_indexes_functional` 

897 

898Compatibility Levels 

899-------------------- 

900MSSQL supports the notion of setting compatibility levels at the 

901database level. This allows, for instance, to run a database that 

902is compatible with SQL2000 while running on a SQL2005 database 

903server. ``server_version_info`` will always return the database 

904server version information (in this case SQL2005) and not the 

905compatibility level information. Because of this, if running under 

906a backwards compatibility mode SQLAlchemy may attempt to use T-SQL 

907statements that are unable to be parsed by the database server. 

908 

909.. _mssql_triggers: 

910 

911Triggers 

912-------- 

913 

914SQLAlchemy by default uses OUTPUT INSERTED to get at newly 

915generated primary key values via IDENTITY columns or other 

916server side defaults. MS-SQL does not 

917allow the usage of OUTPUT INSERTED on tables that have triggers. 

918To disable the usage of OUTPUT INSERTED on a per-table basis, 

919specify ``implicit_returning=False`` for each :class:`_schema.Table` 

920which has triggers:: 

921 

922 Table( 

923 "mytable", 

924 metadata, 

925 Column("id", Integer, primary_key=True), 

926 # ..., 

927 implicit_returning=False, 

928 ) 

929 

930Declarative form:: 

931 

932 class MyClass(Base): 

933 # ... 

934 __table_args__ = {"implicit_returning": False} 

935 

936.. _mssql_rowcount_versioning: 

937 

938Rowcount Support / ORM Versioning 

939--------------------------------- 

940 

941The SQL Server drivers may have limited ability to return the number 

942of rows updated from an UPDATE or DELETE statement. 

943 

944As of this writing, the PyODBC driver is not able to return a rowcount when 

945OUTPUT INSERTED is used. Previous versions of SQLAlchemy therefore had 

946limitations for features such as the "ORM Versioning" feature that relies upon 

947accurate rowcounts in order to match version numbers with matched rows. 

948 

949SQLAlchemy 2.0 now retrieves the "rowcount" manually for these particular use 

950cases based on counting the rows that arrived back within RETURNING; so while 

951the driver still has this limitation, the ORM Versioning feature is no longer 

952impacted by it. As of SQLAlchemy 2.0.5, ORM versioning has been fully 

953re-enabled for the pyodbc driver. 

954 

955.. versionchanged:: 2.0.5 ORM versioning support is restored for the pyodbc 

956 driver. Previously, a warning would be emitted during ORM flush that 

957 versioning was not supported. 

958 

959 

960Enabling Snapshot Isolation 

961--------------------------- 

962 

963SQL Server has a default transaction 

964isolation mode that locks entire tables, and causes even mildly concurrent 

965applications to have long held locks and frequent deadlocks. 

966Enabling snapshot isolation for the database as a whole is recommended 

967for modern levels of concurrency support. This is accomplished via the 

968following ALTER DATABASE commands executed at the SQL prompt: 

969 

970.. sourcecode:: sql 

971 

972 ALTER DATABASE MyDatabase SET ALLOW_SNAPSHOT_ISOLATION ON 

973 

974 ALTER DATABASE MyDatabase SET READ_COMMITTED_SNAPSHOT ON 

975 

976Background on SQL Server snapshot isolation is available at 

977https://msdn.microsoft.com/en-us/library/ms175095.aspx. 

978 

979""" # noqa 

980 

981from __future__ import annotations 

982 

983import codecs 

984import datetime 

985import operator 

986import re 

987from typing import overload 

988from typing import TYPE_CHECKING 

989from uuid import UUID as _python_UUID 

990 

991from . import information_schema as ischema 

992from .json import JSON 

993from .json import JSONIndexType 

994from .json import JSONPathType 

995from ... import exc 

996from ... import Identity 

997from ... import schema as sa_schema 

998from ... import Sequence 

999from ... import sql 

1000from ... import text 

1001from ... import util 

1002from ...engine import cursor as _cursor 

1003from ...engine import default 

1004from ...engine import reflection 

1005from ...engine.reflection import ReflectionDefaults 

1006from ...sql import coercions 

1007from ...sql import compiler 

1008from ...sql import elements 

1009from ...sql import expression 

1010from ...sql import func 

1011from ...sql import quoted_name 

1012from ...sql import roles 

1013from ...sql import sqltypes 

1014from ...sql import try_cast as try_cast # noqa: F401 

1015from ...sql import util as sql_util 

1016from ...sql._typing import is_sql_compiler 

1017from ...sql.compiler import InsertmanyvaluesSentinelOpts 

1018from ...sql.elements import TryCast as TryCast # noqa: F401 

1019from ...types import BIGINT 

1020from ...types import BINARY 

1021from ...types import CHAR 

1022from ...types import DATE 

1023from ...types import DATETIME 

1024from ...types import DECIMAL 

1025from ...types import FLOAT 

1026from ...types import INTEGER 

1027from ...types import NCHAR 

1028from ...types import NUMERIC 

1029from ...types import NVARCHAR 

1030from ...types import SMALLINT 

1031from ...types import TEXT 

1032from ...types import VARCHAR 

1033from ...util import update_wrapper 

1034from ...util.typing import Literal 

1035 

1036if TYPE_CHECKING: 

1037 from ...sql.dml import DMLState 

1038 from ...sql.selectable import TableClause 

1039 

1040# https://sqlserverbuilds.blogspot.com/ 

1041MS_2017_VERSION = (14,) 

1042MS_2016_VERSION = (13,) 

1043MS_2014_VERSION = (12,) 

1044MS_2012_VERSION = (11,) 

1045MS_2008_VERSION = (10,) 

1046MS_2005_VERSION = (9,) 

1047MS_2000_VERSION = (8,) 

1048 

1049RESERVED_WORDS = { 

1050 "add", 

1051 "all", 

1052 "alter", 

1053 "and", 

1054 "any", 

1055 "as", 

1056 "asc", 

1057 "authorization", 

1058 "backup", 

1059 "begin", 

1060 "between", 

1061 "break", 

1062 "browse", 

1063 "bulk", 

1064 "by", 

1065 "cascade", 

1066 "case", 

1067 "check", 

1068 "checkpoint", 

1069 "close", 

1070 "clustered", 

1071 "coalesce", 

1072 "collate", 

1073 "column", 

1074 "commit", 

1075 "compute", 

1076 "constraint", 

1077 "contains", 

1078 "containstable", 

1079 "continue", 

1080 "convert", 

1081 "create", 

1082 "cross", 

1083 "current", 

1084 "current_date", 

1085 "current_time", 

1086 "current_timestamp", 

1087 "current_user", 

1088 "cursor", 

1089 "database", 

1090 "dbcc", 

1091 "deallocate", 

1092 "declare", 

1093 "default", 

1094 "delete", 

1095 "deny", 

1096 "desc", 

1097 "disk", 

1098 "distinct", 

1099 "distributed", 

1100 "double", 

1101 "drop", 

1102 "dump", 

1103 "else", 

1104 "end", 

1105 "errlvl", 

1106 "escape", 

1107 "except", 

1108 "exec", 

1109 "execute", 

1110 "exists", 

1111 "exit", 

1112 "external", 

1113 "fetch", 

1114 "file", 

1115 "fillfactor", 

1116 "for", 

1117 "foreign", 

1118 "freetext", 

1119 "freetexttable", 

1120 "from", 

1121 "full", 

1122 "function", 

1123 "goto", 

1124 "grant", 

1125 "group", 

1126 "having", 

1127 "holdlock", 

1128 "identity", 

1129 "identity_insert", 

1130 "identitycol", 

1131 "if", 

1132 "in", 

1133 "index", 

1134 "inner", 

1135 "insert", 

1136 "intersect", 

1137 "into", 

1138 "is", 

1139 "join", 

1140 "key", 

1141 "kill", 

1142 "left", 

1143 "like", 

1144 "lineno", 

1145 "load", 

1146 "merge", 

1147 "national", 

1148 "nocheck", 

1149 "nonclustered", 

1150 "not", 

1151 "null", 

1152 "nullif", 

1153 "of", 

1154 "off", 

1155 "offsets", 

1156 "on", 

1157 "open", 

1158 "opendatasource", 

1159 "openquery", 

1160 "openrowset", 

1161 "openxml", 

1162 "option", 

1163 "or", 

1164 "order", 

1165 "outer", 

1166 "over", 

1167 "percent", 

1168 "pivot", 

1169 "plan", 

1170 "precision", 

1171 "primary", 

1172 "print", 

1173 "proc", 

1174 "procedure", 

1175 "public", 

1176 "raiserror", 

1177 "read", 

1178 "readtext", 

1179 "reconfigure", 

1180 "references", 

1181 "replication", 

1182 "restore", 

1183 "restrict", 

1184 "return", 

1185 "revert", 

1186 "revoke", 

1187 "right", 

1188 "rollback", 

1189 "rowcount", 

1190 "rowguidcol", 

1191 "rule", 

1192 "save", 

1193 "schema", 

1194 "securityaudit", 

1195 "select", 

1196 "session_user", 

1197 "set", 

1198 "setuser", 

1199 "shutdown", 

1200 "some", 

1201 "statistics", 

1202 "system_user", 

1203 "table", 

1204 "tablesample", 

1205 "textsize", 

1206 "then", 

1207 "to", 

1208 "top", 

1209 "tran", 

1210 "transaction", 

1211 "trigger", 

1212 "truncate", 

1213 "tsequal", 

1214 "union", 

1215 "unique", 

1216 "unpivot", 

1217 "update", 

1218 "updatetext", 

1219 "use", 

1220 "user", 

1221 "values", 

1222 "varying", 

1223 "view", 

1224 "waitfor", 

1225 "when", 

1226 "where", 

1227 "while", 

1228 "with", 

1229 "writetext", 

1230} 

1231 

1232 

1233class REAL(sqltypes.REAL): 

1234 """the SQL Server REAL datatype.""" 

1235 

1236 def __init__(self, **kw): 

1237 # REAL is a synonym for FLOAT(24) on SQL server. 

1238 # it is only accepted as the word "REAL" in DDL, the numeric 

1239 # precision value is not allowed to be present 

1240 kw.setdefault("precision", 24) 

1241 super().__init__(**kw) 

1242 

1243 

1244class DOUBLE_PRECISION(sqltypes.DOUBLE_PRECISION): 

1245 """the SQL Server DOUBLE PRECISION datatype. 

1246 

1247 .. versionadded:: 2.0.11 

1248 

1249 """ 

1250 

1251 def __init__(self, **kw): 

1252 # DOUBLE PRECISION is a synonym for FLOAT(53) on SQL server. 

1253 # it is only accepted as the word "DOUBLE PRECISION" in DDL, 

1254 # the numeric precision value is not allowed to be present 

1255 kw.setdefault("precision", 53) 

1256 super().__init__(**kw) 

1257 

1258 

1259class TINYINT(sqltypes.Integer): 

1260 __visit_name__ = "TINYINT" 

1261 

1262 

1263# MSSQL DATE/TIME types have varied behavior, sometimes returning 

1264# strings. MSDate/TIME check for everything, and always 

1265# filter bind parameters into datetime objects (required by pyodbc, 

1266# not sure about other dialects). 

1267 

1268 

1269class _MSDate(sqltypes.Date): 

1270 def bind_processor(self, dialect): 

1271 def process(value): 

1272 if type(value) == datetime.date: 

1273 return datetime.datetime(value.year, value.month, value.day) 

1274 else: 

1275 return value 

1276 

1277 return process 

1278 

1279 _reg = re.compile(r"(\d+)-(\d+)-(\d+)") 

1280 

1281 def result_processor(self, dialect, coltype): 

1282 def process(value): 

1283 if isinstance(value, datetime.datetime): 

1284 return value.date() 

1285 elif isinstance(value, str): 

1286 m = self._reg.match(value) 

1287 if not m: 

1288 raise ValueError( 

1289 "could not parse %r as a date value" % (value,) 

1290 ) 

1291 return datetime.date(*[int(x or 0) for x in m.groups()]) 

1292 else: 

1293 return value 

1294 

1295 return process 

1296 

1297 

1298class TIME(sqltypes.TIME): 

1299 def __init__(self, precision=None, **kwargs): 

1300 self.precision = precision 

1301 super().__init__() 

1302 

1303 __zero_date = datetime.date(1900, 1, 1) 

1304 

1305 def bind_processor(self, dialect): 

1306 def process(value): 

1307 if isinstance(value, datetime.datetime): 

1308 value = datetime.datetime.combine( 

1309 self.__zero_date, value.time() 

1310 ) 

1311 elif isinstance(value, datetime.time): 

1312 """issue #5339 

1313 per: https://github.com/mkleehammer/pyodbc/wiki/Tips-and-Tricks-by-Database-Platform#time-columns 

1314 pass TIME value as string 

1315 """ # noqa 

1316 value = str(value) 

1317 return value 

1318 

1319 return process 

1320 

1321 _reg = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d{0,6}))?") 

1322 

1323 def result_processor(self, dialect, coltype): 

1324 def process(value): 

1325 if isinstance(value, datetime.datetime): 

1326 return value.time() 

1327 elif isinstance(value, str): 

1328 m = self._reg.match(value) 

1329 if not m: 

1330 raise ValueError( 

1331 "could not parse %r as a time value" % (value,) 

1332 ) 

1333 return datetime.time(*[int(x or 0) for x in m.groups()]) 

1334 else: 

1335 return value 

1336 

1337 return process 

1338 

1339 

1340_MSTime = TIME 

1341 

1342 

1343class _BASETIMEIMPL(TIME): 

1344 __visit_name__ = "_BASETIMEIMPL" 

1345 

1346 

1347class _DateTimeBase: 

1348 def bind_processor(self, dialect): 

1349 def process(value): 

1350 if type(value) == datetime.date: 

1351 return datetime.datetime(value.year, value.month, value.day) 

1352 else: 

1353 return value 

1354 

1355 return process 

1356 

1357 

1358class _MSDateTime(_DateTimeBase, sqltypes.DateTime): 

1359 pass 

1360 

1361 

1362class SMALLDATETIME(_DateTimeBase, sqltypes.DateTime): 

1363 __visit_name__ = "SMALLDATETIME" 

1364 

1365 

1366class DATETIME2(_DateTimeBase, sqltypes.DateTime): 

1367 __visit_name__ = "DATETIME2" 

1368 

1369 def __init__(self, precision=None, **kw): 

1370 super().__init__(**kw) 

1371 self.precision = precision 

1372 

1373 

1374class DATETIMEOFFSET(_DateTimeBase, sqltypes.DateTime): 

1375 __visit_name__ = "DATETIMEOFFSET" 

1376 

1377 def __init__(self, precision=None, **kw): 

1378 super().__init__(**kw) 

1379 self.precision = precision 

1380 

1381 

1382class _UnicodeLiteral: 

1383 def literal_processor(self, dialect): 

1384 def process(value): 

1385 value = value.replace("'", "''") 

1386 

1387 if dialect.identifier_preparer._double_percents: 

1388 value = value.replace("%", "%%") 

1389 

1390 return "N'%s'" % value 

1391 

1392 return process 

1393 

1394 

1395class _MSUnicode(_UnicodeLiteral, sqltypes.Unicode): 

1396 pass 

1397 

1398 

1399class _MSUnicodeText(_UnicodeLiteral, sqltypes.UnicodeText): 

1400 pass 

1401 

1402 

1403class TIMESTAMP(sqltypes._Binary): 

1404 """Implement the SQL Server TIMESTAMP type. 

1405 

1406 Note this is **completely different** than the SQL Standard 

1407 TIMESTAMP type, which is not supported by SQL Server. It 

1408 is a read-only datatype that does not support INSERT of values. 

1409 

1410 .. versionadded:: 1.2 

1411 

1412 .. seealso:: 

1413 

1414 :class:`_mssql.ROWVERSION` 

1415 

1416 """ 

1417 

1418 __visit_name__ = "TIMESTAMP" 

1419 

1420 # expected by _Binary to be present 

1421 length = None 

1422 

1423 def __init__(self, convert_int=False): 

1424 """Construct a TIMESTAMP or ROWVERSION type. 

1425 

1426 :param convert_int: if True, binary integer values will 

1427 be converted to integers on read. 

1428 

1429 .. versionadded:: 1.2 

1430 

1431 """ 

1432 self.convert_int = convert_int 

1433 

1434 def result_processor(self, dialect, coltype): 

1435 super_ = super().result_processor(dialect, coltype) 

1436 if self.convert_int: 

1437 

1438 def process(value): 

1439 if super_: 

1440 value = super_(value) 

1441 if value is not None: 

1442 # https://stackoverflow.com/a/30403242/34549 

1443 value = int(codecs.encode(value, "hex"), 16) 

1444 return value 

1445 

1446 return process 

1447 else: 

1448 return super_ 

1449 

1450 

1451class ROWVERSION(TIMESTAMP): 

1452 """Implement the SQL Server ROWVERSION type. 

1453 

1454 The ROWVERSION datatype is a SQL Server synonym for the TIMESTAMP 

1455 datatype, however current SQL Server documentation suggests using 

1456 ROWVERSION for new datatypes going forward. 

1457 

1458 The ROWVERSION datatype does **not** reflect (e.g. introspect) from the 

1459 database as itself; the returned datatype will be 

1460 :class:`_mssql.TIMESTAMP`. 

1461 

1462 This is a read-only datatype that does not support INSERT of values. 

1463 

1464 .. versionadded:: 1.2 

1465 

1466 .. seealso:: 

1467 

1468 :class:`_mssql.TIMESTAMP` 

1469 

1470 """ 

1471 

1472 __visit_name__ = "ROWVERSION" 

1473 

1474 

1475class NTEXT(sqltypes.UnicodeText): 

1476 """MSSQL NTEXT type, for variable-length unicode text up to 2^30 

1477 characters.""" 

1478 

1479 __visit_name__ = "NTEXT" 

1480 

1481 

1482class VARBINARY(sqltypes.VARBINARY, sqltypes.LargeBinary): 

1483 """The MSSQL VARBINARY type. 

1484 

1485 This type adds additional features to the core :class:`_types.VARBINARY` 

1486 type, including "deprecate_large_types" mode where 

1487 either ``VARBINARY(max)`` or IMAGE is rendered, as well as the SQL 

1488 Server ``FILESTREAM`` option. 

1489 

1490 .. seealso:: 

1491 

1492 :ref:`mssql_large_type_deprecation` 

1493 

1494 """ 

1495 

1496 __visit_name__ = "VARBINARY" 

1497 

1498 def __init__(self, length=None, filestream=False): 

1499 """ 

1500 Construct a VARBINARY type. 

1501 

1502 :param length: optional, a length for the column for use in 

1503 DDL statements, for those binary types that accept a length, 

1504 such as the MySQL BLOB type. 

1505 

1506 :param filestream=False: if True, renders the ``FILESTREAM`` keyword 

1507 in the table definition. In this case ``length`` must be ``None`` 

1508 or ``'max'``. 

1509 

1510 .. versionadded:: 1.4.31 

1511 

1512 """ 

1513 

1514 self.filestream = filestream 

1515 if self.filestream and length not in (None, "max"): 

1516 raise ValueError( 

1517 "length must be None or 'max' when setting filestream" 

1518 ) 

1519 super().__init__(length=length) 

1520 

1521 

1522class IMAGE(sqltypes.LargeBinary): 

1523 __visit_name__ = "IMAGE" 

1524 

1525 

1526class XML(sqltypes.Text): 

1527 """MSSQL XML type. 

1528 

1529 This is a placeholder type for reflection purposes that does not include 

1530 any Python-side datatype support. It also does not currently support 

1531 additional arguments, such as "CONTENT", "DOCUMENT", 

1532 "xml_schema_collection". 

1533 

1534 """ 

1535 

1536 __visit_name__ = "XML" 

1537 

1538 

1539class BIT(sqltypes.Boolean): 

1540 """MSSQL BIT type. 

1541 

1542 Both pyodbc and pymssql return values from BIT columns as 

1543 Python <class 'bool'> so just subclass Boolean. 

1544 

1545 """ 

1546 

1547 __visit_name__ = "BIT" 

1548 

1549 

1550class MONEY(sqltypes.TypeEngine): 

1551 __visit_name__ = "MONEY" 

1552 

1553 

1554class SMALLMONEY(sqltypes.TypeEngine): 

1555 __visit_name__ = "SMALLMONEY" 

1556 

1557 

1558class MSUUid(sqltypes.Uuid): 

1559 def bind_processor(self, dialect): 

1560 if self.native_uuid: 

1561 # this is currently assuming pyodbc; might not work for 

1562 # some other mssql driver 

1563 return None 

1564 else: 

1565 if self.as_uuid: 

1566 

1567 def process(value): 

1568 if value is not None: 

1569 value = value.hex 

1570 return value 

1571 

1572 return process 

1573 else: 

1574 

1575 def process(value): 

1576 if value is not None: 

1577 value = value.replace("-", "").replace("''", "'") 

1578 return value 

1579 

1580 return process 

1581 

1582 def literal_processor(self, dialect): 

1583 if self.native_uuid: 

1584 

1585 def process(value): 

1586 return f"""'{str(value).replace("''", "'")}'""" 

1587 

1588 return process 

1589 else: 

1590 if self.as_uuid: 

1591 

1592 def process(value): 

1593 return f"""'{value.hex}'""" 

1594 

1595 return process 

1596 else: 

1597 

1598 def process(value): 

1599 return f"""'{ 

1600 value.replace("-", "").replace("'", "''") 

1601 }'""" 

1602 

1603 return process 

1604 

1605 

1606class UNIQUEIDENTIFIER(sqltypes.Uuid[sqltypes._UUID_RETURN]): 

1607 __visit_name__ = "UNIQUEIDENTIFIER" 

1608 

1609 @overload 

1610 def __init__( 

1611 self: UNIQUEIDENTIFIER[_python_UUID], as_uuid: Literal[True] = ... 

1612 ): ... 

1613 

1614 @overload 

1615 def __init__( 

1616 self: UNIQUEIDENTIFIER[str], as_uuid: Literal[False] = ... 

1617 ): ... 

1618 

1619 def __init__(self, as_uuid: bool = True): 

1620 """Construct a :class:`_mssql.UNIQUEIDENTIFIER` type. 

1621 

1622 

1623 :param as_uuid=True: if True, values will be interpreted 

1624 as Python uuid objects, converting to/from string via the 

1625 DBAPI. 

1626 

1627 .. versionchanged: 2.0 Added direct "uuid" support to the 

1628 :class:`_mssql.UNIQUEIDENTIFIER` datatype; uuid interpretation 

1629 defaults to ``True``. 

1630 

1631 """ 

1632 self.as_uuid = as_uuid 

1633 self.native_uuid = True 

1634 

1635 

1636class SQL_VARIANT(sqltypes.TypeEngine): 

1637 __visit_name__ = "SQL_VARIANT" 

1638 

1639 

1640# old names. 

1641MSDateTime = _MSDateTime 

1642MSDate = _MSDate 

1643MSReal = REAL 

1644MSTinyInteger = TINYINT 

1645MSTime = TIME 

1646MSSmallDateTime = SMALLDATETIME 

1647MSDateTime2 = DATETIME2 

1648MSDateTimeOffset = DATETIMEOFFSET 

1649MSText = TEXT 

1650MSNText = NTEXT 

1651MSString = VARCHAR 

1652MSNVarchar = NVARCHAR 

1653MSChar = CHAR 

1654MSNChar = NCHAR 

1655MSBinary = BINARY 

1656MSVarBinary = VARBINARY 

1657MSImage = IMAGE 

1658MSBit = BIT 

1659MSMoney = MONEY 

1660MSSmallMoney = SMALLMONEY 

1661MSUniqueIdentifier = UNIQUEIDENTIFIER 

1662MSVariant = SQL_VARIANT 

1663 

1664ischema_names = { 

1665 "int": INTEGER, 

1666 "bigint": BIGINT, 

1667 "smallint": SMALLINT, 

1668 "tinyint": TINYINT, 

1669 "varchar": VARCHAR, 

1670 "nvarchar": NVARCHAR, 

1671 "char": CHAR, 

1672 "nchar": NCHAR, 

1673 "text": TEXT, 

1674 "ntext": NTEXT, 

1675 "decimal": DECIMAL, 

1676 "numeric": NUMERIC, 

1677 "float": FLOAT, 

1678 "datetime": DATETIME, 

1679 "datetime2": DATETIME2, 

1680 "datetimeoffset": DATETIMEOFFSET, 

1681 "date": DATE, 

1682 "time": TIME, 

1683 "smalldatetime": SMALLDATETIME, 

1684 "binary": BINARY, 

1685 "varbinary": VARBINARY, 

1686 "bit": BIT, 

1687 "real": REAL, 

1688 "double precision": DOUBLE_PRECISION, 

1689 "image": IMAGE, 

1690 "xml": XML, 

1691 "timestamp": TIMESTAMP, 

1692 "money": MONEY, 

1693 "smallmoney": SMALLMONEY, 

1694 "uniqueidentifier": UNIQUEIDENTIFIER, 

1695 "sql_variant": SQL_VARIANT, 

1696} 

1697 

1698 

1699class MSTypeCompiler(compiler.GenericTypeCompiler): 

1700 def _extend(self, spec, type_, length=None): 

1701 """Extend a string-type declaration with standard SQL 

1702 COLLATE annotations. 

1703 

1704 """ 

1705 

1706 if getattr(type_, "collation", None): 

1707 collation = "COLLATE %s" % type_.collation 

1708 else: 

1709 collation = None 

1710 

1711 if not length: 

1712 length = type_.length 

1713 

1714 if length: 

1715 spec = spec + "(%s)" % length 

1716 

1717 return " ".join([c for c in (spec, collation) if c is not None]) 

1718 

1719 def visit_double(self, type_, **kw): 

1720 return self.visit_DOUBLE_PRECISION(type_, **kw) 

1721 

1722 def visit_FLOAT(self, type_, **kw): 

1723 precision = getattr(type_, "precision", None) 

1724 if precision is None: 

1725 return "FLOAT" 

1726 else: 

1727 return "FLOAT(%(precision)s)" % {"precision": precision} 

1728 

1729 def visit_TINYINT(self, type_, **kw): 

1730 return "TINYINT" 

1731 

1732 def visit_TIME(self, type_, **kw): 

1733 precision = getattr(type_, "precision", None) 

1734 if precision is not None: 

1735 return "TIME(%s)" % precision 

1736 else: 

1737 return "TIME" 

1738 

1739 def visit_TIMESTAMP(self, type_, **kw): 

1740 return "TIMESTAMP" 

1741 

1742 def visit_ROWVERSION(self, type_, **kw): 

1743 return "ROWVERSION" 

1744 

1745 def visit_datetime(self, type_, **kw): 

1746 if type_.timezone: 

1747 return self.visit_DATETIMEOFFSET(type_, **kw) 

1748 else: 

1749 return self.visit_DATETIME(type_, **kw) 

1750 

1751 def visit_DATETIMEOFFSET(self, type_, **kw): 

1752 precision = getattr(type_, "precision", None) 

1753 if precision is not None: 

1754 return "DATETIMEOFFSET(%s)" % type_.precision 

1755 else: 

1756 return "DATETIMEOFFSET" 

1757 

1758 def visit_DATETIME2(self, type_, **kw): 

1759 precision = getattr(type_, "precision", None) 

1760 if precision is not None: 

1761 return "DATETIME2(%s)" % precision 

1762 else: 

1763 return "DATETIME2" 

1764 

1765 def visit_SMALLDATETIME(self, type_, **kw): 

1766 return "SMALLDATETIME" 

1767 

1768 def visit_unicode(self, type_, **kw): 

1769 return self.visit_NVARCHAR(type_, **kw) 

1770 

1771 def visit_text(self, type_, **kw): 

1772 if self.dialect.deprecate_large_types: 

1773 return self.visit_VARCHAR(type_, **kw) 

1774 else: 

1775 return self.visit_TEXT(type_, **kw) 

1776 

1777 def visit_unicode_text(self, type_, **kw): 

1778 if self.dialect.deprecate_large_types: 

1779 return self.visit_NVARCHAR(type_, **kw) 

1780 else: 

1781 return self.visit_NTEXT(type_, **kw) 

1782 

1783 def visit_NTEXT(self, type_, **kw): 

1784 return self._extend("NTEXT", type_) 

1785 

1786 def visit_TEXT(self, type_, **kw): 

1787 return self._extend("TEXT", type_) 

1788 

1789 def visit_VARCHAR(self, type_, **kw): 

1790 return self._extend("VARCHAR", type_, length=type_.length or "max") 

1791 

1792 def visit_CHAR(self, type_, **kw): 

1793 return self._extend("CHAR", type_) 

1794 

1795 def visit_NCHAR(self, type_, **kw): 

1796 return self._extend("NCHAR", type_) 

1797 

1798 def visit_NVARCHAR(self, type_, **kw): 

1799 return self._extend("NVARCHAR", type_, length=type_.length or "max") 

1800 

1801 def visit_date(self, type_, **kw): 

1802 if self.dialect.server_version_info < MS_2008_VERSION: 

1803 return self.visit_DATETIME(type_, **kw) 

1804 else: 

1805 return self.visit_DATE(type_, **kw) 

1806 

1807 def visit__BASETIMEIMPL(self, type_, **kw): 

1808 return self.visit_time(type_, **kw) 

1809 

1810 def visit_time(self, type_, **kw): 

1811 if self.dialect.server_version_info < MS_2008_VERSION: 

1812 return self.visit_DATETIME(type_, **kw) 

1813 else: 

1814 return self.visit_TIME(type_, **kw) 

1815 

1816 def visit_large_binary(self, type_, **kw): 

1817 if self.dialect.deprecate_large_types: 

1818 return self.visit_VARBINARY(type_, **kw) 

1819 else: 

1820 return self.visit_IMAGE(type_, **kw) 

1821 

1822 def visit_IMAGE(self, type_, **kw): 

1823 return "IMAGE" 

1824 

1825 def visit_XML(self, type_, **kw): 

1826 return "XML" 

1827 

1828 def visit_VARBINARY(self, type_, **kw): 

1829 text = self._extend("VARBINARY", type_, length=type_.length or "max") 

1830 if getattr(type_, "filestream", False): 

1831 text += " FILESTREAM" 

1832 return text 

1833 

1834 def visit_boolean(self, type_, **kw): 

1835 return self.visit_BIT(type_) 

1836 

1837 def visit_BIT(self, type_, **kw): 

1838 return "BIT" 

1839 

1840 def visit_JSON(self, type_, **kw): 

1841 # this is a bit of a break with SQLAlchemy's convention of 

1842 # "UPPERCASE name goes to UPPERCASE type name with no modification" 

1843 return self._extend("NVARCHAR", type_, length="max") 

1844 

1845 def visit_MONEY(self, type_, **kw): 

1846 return "MONEY" 

1847 

1848 def visit_SMALLMONEY(self, type_, **kw): 

1849 return "SMALLMONEY" 

1850 

1851 def visit_uuid(self, type_, **kw): 

1852 if type_.native_uuid: 

1853 return self.visit_UNIQUEIDENTIFIER(type_, **kw) 

1854 else: 

1855 return super().visit_uuid(type_, **kw) 

1856 

1857 def visit_UNIQUEIDENTIFIER(self, type_, **kw): 

1858 return "UNIQUEIDENTIFIER" 

1859 

1860 def visit_SQL_VARIANT(self, type_, **kw): 

1861 return "SQL_VARIANT" 

1862 

1863 

1864class MSExecutionContext(default.DefaultExecutionContext): 

1865 _enable_identity_insert = False 

1866 _select_lastrowid = False 

1867 _lastrowid = None 

1868 

1869 dialect: MSDialect 

1870 

1871 def _opt_encode(self, statement): 

1872 if self.compiled and self.compiled.schema_translate_map: 

1873 rst = self.compiled.preparer._render_schema_translates 

1874 statement = rst(statement, self.compiled.schema_translate_map) 

1875 

1876 return statement 

1877 

1878 def pre_exec(self): 

1879 """Activate IDENTITY_INSERT if needed.""" 

1880 

1881 if self.isinsert: 

1882 if TYPE_CHECKING: 

1883 assert is_sql_compiler(self.compiled) 

1884 assert isinstance(self.compiled.compile_state, DMLState) 

1885 assert isinstance( 

1886 self.compiled.compile_state.dml_table, TableClause 

1887 ) 

1888 

1889 tbl = self.compiled.compile_state.dml_table 

1890 id_column = tbl._autoincrement_column 

1891 

1892 if id_column is not None and ( 

1893 not isinstance(id_column.default, Sequence) 

1894 ): 

1895 insert_has_identity = True 

1896 compile_state = self.compiled.dml_compile_state 

1897 self._enable_identity_insert = ( 

1898 id_column.key in self.compiled_parameters[0] 

1899 ) or ( 

1900 compile_state._dict_parameters 

1901 and (id_column.key in compile_state._insert_col_keys) 

1902 ) 

1903 

1904 else: 

1905 insert_has_identity = False 

1906 self._enable_identity_insert = False 

1907 

1908 self._select_lastrowid = ( 

1909 not self.compiled.inline 

1910 and insert_has_identity 

1911 and not self.compiled.effective_returning 

1912 and not self._enable_identity_insert 

1913 and not self.executemany 

1914 ) 

1915 

1916 if self._enable_identity_insert: 

1917 self.root_connection._cursor_execute( 

1918 self.cursor, 

1919 self._opt_encode( 

1920 "SET IDENTITY_INSERT %s ON" 

1921 % self.identifier_preparer.format_table(tbl) 

1922 ), 

1923 (), 

1924 self, 

1925 ) 

1926 

1927 def post_exec(self): 

1928 """Disable IDENTITY_INSERT if enabled.""" 

1929 

1930 conn = self.root_connection 

1931 

1932 if self.isinsert or self.isupdate or self.isdelete: 

1933 self._rowcount = self.cursor.rowcount 

1934 

1935 if self._select_lastrowid: 

1936 if self.dialect.use_scope_identity: 

1937 conn._cursor_execute( 

1938 self.cursor, 

1939 "SELECT scope_identity() AS lastrowid", 

1940 (), 

1941 self, 

1942 ) 

1943 else: 

1944 conn._cursor_execute( 

1945 self.cursor, "SELECT @@identity AS lastrowid", (), self 

1946 ) 

1947 # fetchall() ensures the cursor is consumed without closing it 

1948 row = self.cursor.fetchall()[0] 

1949 self._lastrowid = int(row[0]) 

1950 

1951 self.cursor_fetch_strategy = _cursor._NO_CURSOR_DML 

1952 elif ( 

1953 self.compiled is not None 

1954 and is_sql_compiler(self.compiled) 

1955 and self.compiled.effective_returning 

1956 ): 

1957 self.cursor_fetch_strategy = ( 

1958 _cursor.FullyBufferedCursorFetchStrategy( 

1959 self.cursor, 

1960 self.cursor.description, 

1961 self.cursor.fetchall(), 

1962 ) 

1963 ) 

1964 

1965 if self._enable_identity_insert: 

1966 if TYPE_CHECKING: 

1967 assert is_sql_compiler(self.compiled) 

1968 assert isinstance(self.compiled.compile_state, DMLState) 

1969 assert isinstance( 

1970 self.compiled.compile_state.dml_table, TableClause 

1971 ) 

1972 conn._cursor_execute( 

1973 self.cursor, 

1974 self._opt_encode( 

1975 "SET IDENTITY_INSERT %s OFF" 

1976 % self.identifier_preparer.format_table( 

1977 self.compiled.compile_state.dml_table 

1978 ) 

1979 ), 

1980 (), 

1981 self, 

1982 ) 

1983 

1984 def get_lastrowid(self): 

1985 return self._lastrowid 

1986 

1987 def handle_dbapi_exception(self, e): 

1988 if self._enable_identity_insert: 

1989 try: 

1990 self.cursor.execute( 

1991 self._opt_encode( 

1992 "SET IDENTITY_INSERT %s OFF" 

1993 % self.identifier_preparer.format_table( 

1994 self.compiled.compile_state.dml_table 

1995 ) 

1996 ) 

1997 ) 

1998 except Exception: 

1999 pass 

2000 

2001 def fire_sequence(self, seq, type_): 

2002 return self._execute_scalar( 

2003 ( 

2004 "SELECT NEXT VALUE FOR %s" 

2005 % self.identifier_preparer.format_sequence(seq) 

2006 ), 

2007 type_, 

2008 ) 

2009 

2010 def get_insert_default(self, column): 

2011 if ( 

2012 isinstance(column, sa_schema.Column) 

2013 and column is column.table._autoincrement_column 

2014 and isinstance(column.default, sa_schema.Sequence) 

2015 and column.default.optional 

2016 ): 

2017 return None 

2018 return super().get_insert_default(column) 

2019 

2020 

2021class MSSQLCompiler(compiler.SQLCompiler): 

2022 returning_precedes_values = True 

2023 

2024 extract_map = util.update_copy( 

2025 compiler.SQLCompiler.extract_map, 

2026 { 

2027 "doy": "dayofyear", 

2028 "dow": "weekday", 

2029 "milliseconds": "millisecond", 

2030 "microseconds": "microsecond", 

2031 }, 

2032 ) 

2033 

2034 def __init__(self, *args, **kwargs): 

2035 self.tablealiases = {} 

2036 super().__init__(*args, **kwargs) 

2037 

2038 def _format_frame_clause(self, range_, **kw): 

2039 kw["literal_execute"] = True 

2040 return super()._format_frame_clause(range_, **kw) 

2041 

2042 def _with_legacy_schema_aliasing(fn): 

2043 def decorate(self, *arg, **kw): 

2044 if self.dialect.legacy_schema_aliasing: 

2045 return fn(self, *arg, **kw) 

2046 else: 

2047 super_ = getattr(super(MSSQLCompiler, self), fn.__name__) 

2048 return super_(*arg, **kw) 

2049 

2050 return decorate 

2051 

2052 def visit_now_func(self, fn, **kw): 

2053 return "CURRENT_TIMESTAMP" 

2054 

2055 def visit_current_date_func(self, fn, **kw): 

2056 return "GETDATE()" 

2057 

2058 def visit_length_func(self, fn, **kw): 

2059 return "LEN%s" % self.function_argspec(fn, **kw) 

2060 

2061 def visit_char_length_func(self, fn, **kw): 

2062 return "LEN%s" % self.function_argspec(fn, **kw) 

2063 

2064 def visit_aggregate_strings_func(self, fn, **kw): 

2065 expr = fn.clauses.clauses[0]._compiler_dispatch(self, **kw) 

2066 kw["literal_execute"] = True 

2067 delimeter = fn.clauses.clauses[1]._compiler_dispatch(self, **kw) 

2068 return f"string_agg({expr}, {delimeter})" 

2069 

2070 def visit_concat_op_expression_clauselist( 

2071 self, clauselist, operator, **kw 

2072 ): 

2073 return " + ".join(self.process(elem, **kw) for elem in clauselist) 

2074 

2075 def visit_concat_op_binary(self, binary, operator, **kw): 

2076 return "%s + %s" % ( 

2077 self.process(binary.left, **kw), 

2078 self.process(binary.right, **kw), 

2079 ) 

2080 

2081 def visit_true(self, expr, **kw): 

2082 return "1" 

2083 

2084 def visit_false(self, expr, **kw): 

2085 return "0" 

2086 

2087 def visit_match_op_binary(self, binary, operator, **kw): 

2088 return "CONTAINS (%s, %s)" % ( 

2089 self.process(binary.left, **kw), 

2090 self.process(binary.right, **kw), 

2091 ) 

2092 

2093 def get_select_precolumns(self, select, **kw): 

2094 """MS-SQL puts TOP, it's version of LIMIT here""" 

2095 

2096 s = super().get_select_precolumns(select, **kw) 

2097 

2098 if select._has_row_limiting_clause and self._use_top(select): 

2099 # ODBC drivers and possibly others 

2100 # don't support bind params in the SELECT clause on SQL Server. 

2101 # so have to use literal here. 

2102 kw["literal_execute"] = True 

2103 s += "TOP %s " % self.process( 

2104 self._get_limit_or_fetch(select), **kw 

2105 ) 

2106 if select._fetch_clause is not None: 

2107 if select._fetch_clause_options["percent"]: 

2108 s += "PERCENT " 

2109 if select._fetch_clause_options["with_ties"]: 

2110 s += "WITH TIES " 

2111 

2112 return s 

2113 

2114 def get_from_hint_text(self, table, text): 

2115 return text 

2116 

2117 def get_crud_hint_text(self, table, text): 

2118 return text 

2119 

2120 def _get_limit_or_fetch(self, select): 

2121 if select._fetch_clause is None: 

2122 return select._limit_clause 

2123 else: 

2124 return select._fetch_clause 

2125 

2126 def _use_top(self, select): 

2127 return (select._offset_clause is None) and ( 

2128 select._simple_int_clause(select._limit_clause) 

2129 or ( 

2130 # limit can use TOP with is by itself. fetch only uses TOP 

2131 # when it needs to because of PERCENT and/or WITH TIES 

2132 # TODO: Why? shouldn't we use TOP always ? 

2133 select._simple_int_clause(select._fetch_clause) 

2134 and ( 

2135 select._fetch_clause_options["percent"] 

2136 or select._fetch_clause_options["with_ties"] 

2137 ) 

2138 ) 

2139 ) 

2140 

2141 def limit_clause(self, cs, **kwargs): 

2142 return "" 

2143 

2144 def _check_can_use_fetch_limit(self, select): 

2145 # to use ROW_NUMBER(), an ORDER BY is required. 

2146 # OFFSET are FETCH are options of the ORDER BY clause 

2147 if not select._order_by_clause.clauses: 

2148 raise exc.CompileError( 

2149 "MSSQL requires an order_by when " 

2150 "using an OFFSET or a non-simple " 

2151 "LIMIT clause" 

2152 ) 

2153 

2154 if select._fetch_clause_options is not None and ( 

2155 select._fetch_clause_options["percent"] 

2156 or select._fetch_clause_options["with_ties"] 

2157 ): 

2158 raise exc.CompileError( 

2159 "MSSQL needs TOP to use PERCENT and/or WITH TIES. " 

2160 "Only simple fetch without offset can be used." 

2161 ) 

2162 

2163 def _row_limit_clause(self, select, **kw): 

2164 """MSSQL 2012 supports OFFSET/FETCH operators 

2165 Use it instead subquery with row_number 

2166 

2167 """ 

2168 

2169 if self.dialect._supports_offset_fetch and not self._use_top(select): 

2170 self._check_can_use_fetch_limit(select) 

2171 

2172 return self.fetch_clause( 

2173 select, 

2174 fetch_clause=self._get_limit_or_fetch(select), 

2175 require_offset=True, 

2176 **kw, 

2177 ) 

2178 

2179 else: 

2180 return "" 

2181 

2182 def visit_try_cast(self, element, **kw): 

2183 return "TRY_CAST (%s AS %s)" % ( 

2184 self.process(element.clause, **kw), 

2185 self.process(element.typeclause, **kw), 

2186 ) 

2187 

2188 def translate_select_structure(self, select_stmt, **kwargs): 

2189 """Look for ``LIMIT`` and OFFSET in a select statement, and if 

2190 so tries to wrap it in a subquery with ``row_number()`` criterion. 

2191 MSSQL 2012 and above are excluded 

2192 

2193 """ 

2194 select = select_stmt 

2195 

2196 if ( 

2197 select._has_row_limiting_clause 

2198 and not self.dialect._supports_offset_fetch 

2199 and not self._use_top(select) 

2200 and not getattr(select, "_mssql_visit", None) 

2201 ): 

2202 self._check_can_use_fetch_limit(select) 

2203 

2204 _order_by_clauses = [ 

2205 sql_util.unwrap_label_reference(elem) 

2206 for elem in select._order_by_clause.clauses 

2207 ] 

2208 

2209 limit_clause = self._get_limit_or_fetch(select) 

2210 offset_clause = select._offset_clause 

2211 

2212 select = select._generate() 

2213 select._mssql_visit = True 

2214 select = ( 

2215 select.add_columns( 

2216 sql.func.ROW_NUMBER() 

2217 .over(order_by=_order_by_clauses) 

2218 .label("mssql_rn") 

2219 ) 

2220 .order_by(None) 

2221 .alias() 

2222 ) 

2223 

2224 mssql_rn = sql.column("mssql_rn") 

2225 limitselect = sql.select( 

2226 *[c for c in select.c if c.key != "mssql_rn"] 

2227 ) 

2228 if offset_clause is not None: 

2229 limitselect = limitselect.where(mssql_rn > offset_clause) 

2230 if limit_clause is not None: 

2231 limitselect = limitselect.where( 

2232 mssql_rn <= (limit_clause + offset_clause) 

2233 ) 

2234 else: 

2235 limitselect = limitselect.where(mssql_rn <= (limit_clause)) 

2236 return limitselect 

2237 else: 

2238 return select 

2239 

2240 @_with_legacy_schema_aliasing 

2241 def visit_table(self, table, mssql_aliased=False, iscrud=False, **kwargs): 

2242 if mssql_aliased is table or iscrud: 

2243 return super().visit_table(table, **kwargs) 

2244 

2245 # alias schema-qualified tables 

2246 alias = self._schema_aliased_table(table) 

2247 if alias is not None: 

2248 return self.process(alias, mssql_aliased=table, **kwargs) 

2249 else: 

2250 return super().visit_table(table, **kwargs) 

2251 

2252 @_with_legacy_schema_aliasing 

2253 def visit_alias(self, alias, **kw): 

2254 # translate for schema-qualified table aliases 

2255 kw["mssql_aliased"] = alias.element 

2256 return super().visit_alias(alias, **kw) 

2257 

2258 @_with_legacy_schema_aliasing 

2259 def visit_column(self, column, add_to_result_map=None, **kw): 

2260 if ( 

2261 column.table is not None 

2262 and (not self.isupdate and not self.isdelete) 

2263 or self.is_subquery() 

2264 ): 

2265 # translate for schema-qualified table aliases 

2266 t = self._schema_aliased_table(column.table) 

2267 if t is not None: 

2268 converted = elements._corresponding_column_or_error(t, column) 

2269 if add_to_result_map is not None: 

2270 add_to_result_map( 

2271 column.name, 

2272 column.name, 

2273 (column, column.name, column.key), 

2274 column.type, 

2275 ) 

2276 

2277 return super().visit_column(converted, **kw) 

2278 

2279 return super().visit_column( 

2280 column, add_to_result_map=add_to_result_map, **kw 

2281 ) 

2282 

2283 def _schema_aliased_table(self, table): 

2284 if getattr(table, "schema", None) is not None: 

2285 if table not in self.tablealiases: 

2286 self.tablealiases[table] = table.alias() 

2287 return self.tablealiases[table] 

2288 else: 

2289 return None 

2290 

2291 def visit_extract(self, extract, **kw): 

2292 field = self.extract_map.get(extract.field, extract.field) 

2293 return "DATEPART(%s, %s)" % (field, self.process(extract.expr, **kw)) 

2294 

2295 def visit_savepoint(self, savepoint_stmt, **kw): 

2296 return "SAVE TRANSACTION %s" % self.preparer.format_savepoint( 

2297 savepoint_stmt 

2298 ) 

2299 

2300 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw): 

2301 return "ROLLBACK TRANSACTION %s" % self.preparer.format_savepoint( 

2302 savepoint_stmt 

2303 ) 

2304 

2305 def visit_binary(self, binary, **kwargs): 

2306 """Move bind parameters to the right-hand side of an operator, where 

2307 possible. 

2308 

2309 """ 

2310 if ( 

2311 isinstance(binary.left, expression.BindParameter) 

2312 and binary.operator == operator.eq 

2313 and not isinstance(binary.right, expression.BindParameter) 

2314 ): 

2315 return self.process( 

2316 expression.BinaryExpression( 

2317 binary.right, binary.left, binary.operator 

2318 ), 

2319 **kwargs, 

2320 ) 

2321 return super().visit_binary(binary, **kwargs) 

2322 

2323 def returning_clause( 

2324 self, stmt, returning_cols, *, populate_result_map, **kw 

2325 ): 

2326 # SQL server returning clause requires that the columns refer to 

2327 # the virtual table names "inserted" or "deleted". Here, we make 

2328 # a simple alias of our table with that name, and then adapt the 

2329 # columns we have from the list of RETURNING columns to that new name 

2330 # so that they render as "inserted.<colname>" / "deleted.<colname>". 

2331 

2332 if stmt.is_insert or stmt.is_update: 

2333 target = stmt.table.alias("inserted") 

2334 elif stmt.is_delete: 

2335 target = stmt.table.alias("deleted") 

2336 else: 

2337 assert False, "expected Insert, Update or Delete statement" 

2338 

2339 adapter = sql_util.ClauseAdapter(target) 

2340 

2341 # adapter.traverse() takes a column from our target table and returns 

2342 # the one that is linked to the "inserted" / "deleted" tables. So in 

2343 # order to retrieve these values back from the result (e.g. like 

2344 # row[column]), tell the compiler to also add the original unadapted 

2345 # column to the result map. Before #4877, these were (unknowingly) 

2346 # falling back using string name matching in the result set which 

2347 # necessarily used an expensive KeyError in order to match. 

2348 

2349 columns = [ 

2350 self._label_returning_column( 

2351 stmt, 

2352 adapter.traverse(column), 

2353 populate_result_map, 

2354 {"result_map_targets": (column,)}, 

2355 fallback_label_name=fallback_label_name, 

2356 column_is_repeated=repeated, 

2357 name=name, 

2358 proxy_name=proxy_name, 

2359 **kw, 

2360 ) 

2361 for ( 

2362 name, 

2363 proxy_name, 

2364 fallback_label_name, 

2365 column, 

2366 repeated, 

2367 ) in stmt._generate_columns_plus_names( 

2368 True, cols=expression._select_iterables(returning_cols) 

2369 ) 

2370 ] 

2371 

2372 return "OUTPUT " + ", ".join(columns) 

2373 

2374 def get_cte_preamble(self, recursive): 

2375 # SQL Server finds it too inconvenient to accept 

2376 # an entirely optional, SQL standard specified, 

2377 # "RECURSIVE" word with their "WITH", 

2378 # so here we go 

2379 return "WITH" 

2380 

2381 def label_select_column(self, select, column, asfrom): 

2382 if isinstance(column, expression.Function): 

2383 return column.label(None) 

2384 else: 

2385 return super().label_select_column(select, column, asfrom) 

2386 

2387 def for_update_clause(self, select, **kw): 

2388 # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which 

2389 # SQLAlchemy doesn't use 

2390 return "" 

2391 

2392 def order_by_clause(self, select, **kw): 

2393 # MSSQL only allows ORDER BY in subqueries if there is a LIMIT: 

2394 # "The ORDER BY clause is invalid in views, inline functions, 

2395 # derived tables, subqueries, and common table expressions, 

2396 # unless TOP, OFFSET or FOR XML is also specified." 

2397 if ( 

2398 self.is_subquery() 

2399 and not self._use_top(select) 

2400 and ( 

2401 select._offset is None 

2402 or not self.dialect._supports_offset_fetch 

2403 ) 

2404 ): 

2405 # avoid processing the order by clause if we won't end up 

2406 # using it, because we don't want all the bind params tacked 

2407 # onto the positional list if that is what the dbapi requires 

2408 return "" 

2409 

2410 order_by = self.process(select._order_by_clause, **kw) 

2411 

2412 if order_by: 

2413 return " ORDER BY " + order_by 

2414 else: 

2415 return "" 

2416 

2417 def update_from_clause( 

2418 self, update_stmt, from_table, extra_froms, from_hints, **kw 

2419 ): 

2420 """Render the UPDATE..FROM clause specific to MSSQL. 

2421 

2422 In MSSQL, if the UPDATE statement involves an alias of the table to 

2423 be updated, then the table itself must be added to the FROM list as 

2424 well. Otherwise, it is optional. Here, we add it regardless. 

2425 

2426 """ 

2427 return "FROM " + ", ".join( 

2428 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw) 

2429 for t in [from_table] + extra_froms 

2430 ) 

2431 

2432 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw): 

2433 """If we have extra froms make sure we render any alias as hint.""" 

2434 ashint = False 

2435 if extra_froms: 

2436 ashint = True 

2437 return from_table._compiler_dispatch( 

2438 self, asfrom=True, iscrud=True, ashint=ashint, **kw 

2439 ) 

2440 

2441 def delete_extra_from_clause( 

2442 self, delete_stmt, from_table, extra_froms, from_hints, **kw 

2443 ): 

2444 """Render the DELETE .. FROM clause specific to MSSQL. 

2445 

2446 Yes, it has the FROM keyword twice. 

2447 

2448 """ 

2449 return "FROM " + ", ".join( 

2450 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw) 

2451 for t in [from_table] + extra_froms 

2452 ) 

2453 

2454 def visit_empty_set_expr(self, type_, **kw): 

2455 return "SELECT 1 WHERE 1!=1" 

2456 

2457 def visit_is_distinct_from_binary(self, binary, operator, **kw): 

2458 return "NOT EXISTS (SELECT %s INTERSECT SELECT %s)" % ( 

2459 self.process(binary.left), 

2460 self.process(binary.right), 

2461 ) 

2462 

2463 def visit_is_not_distinct_from_binary(self, binary, operator, **kw): 

2464 return "EXISTS (SELECT %s INTERSECT SELECT %s)" % ( 

2465 self.process(binary.left), 

2466 self.process(binary.right), 

2467 ) 

2468 

2469 def _render_json_extract_from_binary(self, binary, operator, **kw): 

2470 # note we are intentionally calling upon the process() calls in the 

2471 # order in which they appear in the SQL String as this is used 

2472 # by positional parameter rendering 

2473 

2474 if binary.type._type_affinity is sqltypes.JSON: 

2475 return "JSON_QUERY(%s, %s)" % ( 

2476 self.process(binary.left, **kw), 

2477 self.process(binary.right, **kw), 

2478 ) 

2479 

2480 # as with other dialects, start with an explicit test for NULL 

2481 case_expression = "CASE JSON_VALUE(%s, %s) WHEN NULL THEN NULL" % ( 

2482 self.process(binary.left, **kw), 

2483 self.process(binary.right, **kw), 

2484 ) 

2485 

2486 if binary.type._type_affinity is sqltypes.Integer: 

2487 type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS INTEGER)" % ( 

2488 self.process(binary.left, **kw), 

2489 self.process(binary.right, **kw), 

2490 ) 

2491 elif binary.type._type_affinity is sqltypes.Numeric: 

2492 type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS %s)" % ( 

2493 self.process(binary.left, **kw), 

2494 self.process(binary.right, **kw), 

2495 ( 

2496 "FLOAT" 

2497 if isinstance(binary.type, sqltypes.Float) 

2498 else "NUMERIC(%s, %s)" 

2499 % (binary.type.precision, binary.type.scale) 

2500 ), 

2501 ) 

2502 elif binary.type._type_affinity is sqltypes.Boolean: 

2503 # the NULL handling is particularly weird with boolean, so 

2504 # explicitly return numeric (BIT) constants 

2505 type_expression = ( 

2506 "WHEN 'true' THEN 1 WHEN 'false' THEN 0 ELSE NULL" 

2507 ) 

2508 elif binary.type._type_affinity is sqltypes.String: 

2509 # TODO: does this comment (from mysql) apply to here, too? 

2510 # this fails with a JSON value that's a four byte unicode 

2511 # string. SQLite has the same problem at the moment 

2512 type_expression = "ELSE JSON_VALUE(%s, %s)" % ( 

2513 self.process(binary.left, **kw), 

2514 self.process(binary.right, **kw), 

2515 ) 

2516 else: 

2517 # other affinity....this is not expected right now 

2518 type_expression = "ELSE JSON_QUERY(%s, %s)" % ( 

2519 self.process(binary.left, **kw), 

2520 self.process(binary.right, **kw), 

2521 ) 

2522 

2523 return case_expression + " " + type_expression + " END" 

2524 

2525 def visit_json_getitem_op_binary(self, binary, operator, **kw): 

2526 return self._render_json_extract_from_binary(binary, operator, **kw) 

2527 

2528 def visit_json_path_getitem_op_binary(self, binary, operator, **kw): 

2529 return self._render_json_extract_from_binary(binary, operator, **kw) 

2530 

2531 def visit_sequence(self, seq, **kw): 

2532 return "NEXT VALUE FOR %s" % self.preparer.format_sequence(seq) 

2533 

2534 

2535class MSSQLStrictCompiler(MSSQLCompiler): 

2536 """A subclass of MSSQLCompiler which disables the usage of bind 

2537 parameters where not allowed natively by MS-SQL. 

2538 

2539 A dialect may use this compiler on a platform where native 

2540 binds are used. 

2541 

2542 """ 

2543 

2544 ansi_bind_rules = True 

2545 

2546 def visit_in_op_binary(self, binary, operator, **kw): 

2547 kw["literal_execute"] = True 

2548 return "%s IN %s" % ( 

2549 self.process(binary.left, **kw), 

2550 self.process(binary.right, **kw), 

2551 ) 

2552 

2553 def visit_not_in_op_binary(self, binary, operator, **kw): 

2554 kw["literal_execute"] = True 

2555 return "%s NOT IN %s" % ( 

2556 self.process(binary.left, **kw), 

2557 self.process(binary.right, **kw), 

2558 ) 

2559 

2560 def render_literal_value(self, value, type_): 

2561 """ 

2562 For date and datetime values, convert to a string 

2563 format acceptable to MSSQL. That seems to be the 

2564 so-called ODBC canonical date format which looks 

2565 like this: 

2566 

2567 yyyy-mm-dd hh:mi:ss.mmm(24h) 

2568 

2569 For other data types, call the base class implementation. 

2570 """ 

2571 # datetime and date are both subclasses of datetime.date 

2572 if issubclass(type(value), datetime.date): 

2573 # SQL Server wants single quotes around the date string. 

2574 return "'" + str(value) + "'" 

2575 else: 

2576 return super().render_literal_value(value, type_) 

2577 

2578 

2579class MSDDLCompiler(compiler.DDLCompiler): 

2580 def get_column_specification(self, column, **kwargs): 

2581 colspec = self.preparer.format_column(column) 

2582 

2583 # type is not accepted in a computed column 

2584 if column.computed is not None: 

2585 colspec += " " + self.process(column.computed) 

2586 else: 

2587 colspec += " " + self.dialect.type_compiler_instance.process( 

2588 column.type, type_expression=column 

2589 ) 

2590 

2591 if column.nullable is not None: 

2592 if ( 

2593 not column.nullable 

2594 or column.primary_key 

2595 or isinstance(column.default, sa_schema.Sequence) 

2596 or column.autoincrement is True 

2597 or column.identity 

2598 ): 

2599 colspec += " NOT NULL" 

2600 elif column.computed is None: 

2601 # don't specify "NULL" for computed columns 

2602 colspec += " NULL" 

2603 

2604 if column.table is None: 

2605 raise exc.CompileError( 

2606 "mssql requires Table-bound columns " 

2607 "in order to generate DDL" 

2608 ) 

2609 

2610 d_opt = column.dialect_options["mssql"] 

2611 start = d_opt["identity_start"] 

2612 increment = d_opt["identity_increment"] 

2613 if start is not None or increment is not None: 

2614 if column.identity: 

2615 raise exc.CompileError( 

2616 "Cannot specify options 'mssql_identity_start' and/or " 

2617 "'mssql_identity_increment' while also using the " 

2618 "'Identity' construct." 

2619 ) 

2620 util.warn_deprecated( 

2621 "The dialect options 'mssql_identity_start' and " 

2622 "'mssql_identity_increment' are deprecated. " 

2623 "Use the 'Identity' object instead.", 

2624 "1.4", 

2625 ) 

2626 

2627 if column.identity: 

2628 colspec += self.process(column.identity, **kwargs) 

2629 elif ( 

2630 column is column.table._autoincrement_column 

2631 or column.autoincrement is True 

2632 ) and ( 

2633 not isinstance(column.default, Sequence) or column.default.optional 

2634 ): 

2635 colspec += self.process(Identity(start=start, increment=increment)) 

2636 else: 

2637 default = self.get_column_default_string(column) 

2638 if default is not None: 

2639 colspec += " DEFAULT " + default 

2640 

2641 return colspec 

2642 

2643 def visit_create_index(self, create, include_schema=False, **kw): 

2644 index = create.element 

2645 self._verify_index_table(index) 

2646 preparer = self.preparer 

2647 text = "CREATE " 

2648 if index.unique: 

2649 text += "UNIQUE " 

2650 

2651 # handle clustering option 

2652 clustered = index.dialect_options["mssql"]["clustered"] 

2653 if clustered is not None: 

2654 if clustered: 

2655 text += "CLUSTERED " 

2656 else: 

2657 text += "NONCLUSTERED " 

2658 

2659 # handle columnstore option (has no negative value) 

2660 columnstore = index.dialect_options["mssql"]["columnstore"] 

2661 if columnstore: 

2662 text += "COLUMNSTORE " 

2663 

2664 text += "INDEX %s ON %s" % ( 

2665 self._prepared_index_name(index, include_schema=include_schema), 

2666 preparer.format_table(index.table), 

2667 ) 

2668 

2669 # in some case mssql allows indexes with no columns defined 

2670 if len(index.expressions) > 0: 

2671 text += " (%s)" % ", ".join( 

2672 self.sql_compiler.process( 

2673 expr, include_table=False, literal_binds=True 

2674 ) 

2675 for expr in index.expressions 

2676 ) 

2677 

2678 # handle other included columns 

2679 if index.dialect_options["mssql"]["include"]: 

2680 inclusions = [ 

2681 index.table.c[col] if isinstance(col, str) else col 

2682 for col in index.dialect_options["mssql"]["include"] 

2683 ] 

2684 

2685 text += " INCLUDE (%s)" % ", ".join( 

2686 [preparer.quote(c.name) for c in inclusions] 

2687 ) 

2688 

2689 whereclause = index.dialect_options["mssql"]["where"] 

2690 

2691 if whereclause is not None: 

2692 whereclause = coercions.expect( 

2693 roles.DDLExpressionRole, whereclause 

2694 ) 

2695 

2696 where_compiled = self.sql_compiler.process( 

2697 whereclause, include_table=False, literal_binds=True 

2698 ) 

2699 text += " WHERE " + where_compiled 

2700 

2701 return text 

2702 

2703 def visit_drop_index(self, drop, **kw): 

2704 return "\nDROP INDEX %s ON %s" % ( 

2705 self._prepared_index_name(drop.element, include_schema=False), 

2706 self.preparer.format_table(drop.element.table), 

2707 ) 

2708 

2709 def visit_primary_key_constraint(self, constraint, **kw): 

2710 if len(constraint) == 0: 

2711 return "" 

2712 text = "" 

2713 if constraint.name is not None: 

2714 text += "CONSTRAINT %s " % self.preparer.format_constraint( 

2715 constraint 

2716 ) 

2717 text += "PRIMARY KEY " 

2718 

2719 clustered = constraint.dialect_options["mssql"]["clustered"] 

2720 if clustered is not None: 

2721 if clustered: 

2722 text += "CLUSTERED " 

2723 else: 

2724 text += "NONCLUSTERED " 

2725 

2726 text += "(%s)" % ", ".join( 

2727 self.preparer.quote(c.name) for c in constraint 

2728 ) 

2729 text += self.define_constraint_deferrability(constraint) 

2730 return text 

2731 

2732 def visit_unique_constraint(self, constraint, **kw): 

2733 if len(constraint) == 0: 

2734 return "" 

2735 text = "" 

2736 if constraint.name is not None: 

2737 formatted_name = self.preparer.format_constraint(constraint) 

2738 if formatted_name is not None: 

2739 text += "CONSTRAINT %s " % formatted_name 

2740 text += "UNIQUE %s" % self.define_unique_constraint_distinct( 

2741 constraint, **kw 

2742 ) 

2743 clustered = constraint.dialect_options["mssql"]["clustered"] 

2744 if clustered is not None: 

2745 if clustered: 

2746 text += "CLUSTERED " 

2747 else: 

2748 text += "NONCLUSTERED " 

2749 

2750 text += "(%s)" % ", ".join( 

2751 self.preparer.quote(c.name) for c in constraint 

2752 ) 

2753 text += self.define_constraint_deferrability(constraint) 

2754 return text 

2755 

2756 def visit_computed_column(self, generated, **kw): 

2757 text = "AS (%s)" % self.sql_compiler.process( 

2758 generated.sqltext, include_table=False, literal_binds=True 

2759 ) 

2760 # explicitly check for True|False since None means server default 

2761 if generated.persisted is True: 

2762 text += " PERSISTED" 

2763 return text 

2764 

2765 def visit_set_table_comment(self, create, **kw): 

2766 schema = self.preparer.schema_for_object(create.element) 

2767 schema_name = schema if schema else self.dialect.default_schema_name 

2768 return ( 

2769 "execute sp_addextendedproperty 'MS_Description', " 

2770 "{}, 'schema', {}, 'table', {}".format( 

2771 self.sql_compiler.render_literal_value( 

2772 create.element.comment, sqltypes.NVARCHAR() 

2773 ), 

2774 self.preparer.quote_schema(schema_name), 

2775 self.preparer.format_table(create.element, use_schema=False), 

2776 ) 

2777 ) 

2778 

2779 def visit_drop_table_comment(self, drop, **kw): 

2780 schema = self.preparer.schema_for_object(drop.element) 

2781 schema_name = schema if schema else self.dialect.default_schema_name 

2782 return ( 

2783 "execute sp_dropextendedproperty 'MS_Description', 'schema', " 

2784 "{}, 'table', {}".format( 

2785 self.preparer.quote_schema(schema_name), 

2786 self.preparer.format_table(drop.element, use_schema=False), 

2787 ) 

2788 ) 

2789 

2790 def visit_set_column_comment(self, create, **kw): 

2791 schema = self.preparer.schema_for_object(create.element.table) 

2792 schema_name = schema if schema else self.dialect.default_schema_name 

2793 return ( 

2794 "execute sp_addextendedproperty 'MS_Description', " 

2795 "{}, 'schema', {}, 'table', {}, 'column', {}".format( 

2796 self.sql_compiler.render_literal_value( 

2797 create.element.comment, sqltypes.NVARCHAR() 

2798 ), 

2799 self.preparer.quote_schema(schema_name), 

2800 self.preparer.format_table( 

2801 create.element.table, use_schema=False 

2802 ), 

2803 self.preparer.format_column(create.element), 

2804 ) 

2805 ) 

2806 

2807 def visit_drop_column_comment(self, drop, **kw): 

2808 schema = self.preparer.schema_for_object(drop.element.table) 

2809 schema_name = schema if schema else self.dialect.default_schema_name 

2810 return ( 

2811 "execute sp_dropextendedproperty 'MS_Description', 'schema', " 

2812 "{}, 'table', {}, 'column', {}".format( 

2813 self.preparer.quote_schema(schema_name), 

2814 self.preparer.format_table( 

2815 drop.element.table, use_schema=False 

2816 ), 

2817 self.preparer.format_column(drop.element), 

2818 ) 

2819 ) 

2820 

2821 def visit_create_sequence(self, create, **kw): 

2822 prefix = None 

2823 if create.element.data_type is not None: 

2824 data_type = create.element.data_type 

2825 prefix = " AS %s" % self.type_compiler.process(data_type) 

2826 return super().visit_create_sequence(create, prefix=prefix, **kw) 

2827 

2828 def visit_identity_column(self, identity, **kw): 

2829 text = " IDENTITY" 

2830 if identity.start is not None or identity.increment is not None: 

2831 start = 1 if identity.start is None else identity.start 

2832 increment = 1 if identity.increment is None else identity.increment 

2833 text += "(%s,%s)" % (start, increment) 

2834 return text 

2835 

2836 

2837class MSIdentifierPreparer(compiler.IdentifierPreparer): 

2838 reserved_words = RESERVED_WORDS 

2839 

2840 def __init__(self, dialect): 

2841 super().__init__( 

2842 dialect, 

2843 initial_quote="[", 

2844 final_quote="]", 

2845 quote_case_sensitive_collations=False, 

2846 ) 

2847 

2848 def _escape_identifier(self, value): 

2849 return value.replace("]", "]]") 

2850 

2851 def _unescape_identifier(self, value): 

2852 return value.replace("]]", "]") 

2853 

2854 def quote_schema(self, schema, force=None): 

2855 """Prepare a quoted table and schema name.""" 

2856 

2857 # need to re-implement the deprecation warning entirely 

2858 if force is not None: 

2859 # not using the util.deprecated_params() decorator in this 

2860 # case because of the additional function call overhead on this 

2861 # very performance-critical spot. 

2862 util.warn_deprecated( 

2863 "The IdentifierPreparer.quote_schema.force parameter is " 

2864 "deprecated and will be removed in a future release. This " 

2865 "flag has no effect on the behavior of the " 

2866 "IdentifierPreparer.quote method; please refer to " 

2867 "quoted_name().", 

2868 version="1.3", 

2869 ) 

2870 

2871 dbname, owner = _schema_elements(schema) 

2872 if dbname: 

2873 result = "%s.%s" % (self.quote(dbname), self.quote(owner)) 

2874 elif owner: 

2875 result = self.quote(owner) 

2876 else: 

2877 result = "" 

2878 return result 

2879 

2880 

2881def _db_plus_owner_listing(fn): 

2882 def wrap(dialect, connection, schema=None, **kw): 

2883 dbname, owner = _owner_plus_db(dialect, schema) 

2884 return _switch_db( 

2885 dbname, 

2886 connection, 

2887 fn, 

2888 dialect, 

2889 connection, 

2890 dbname, 

2891 owner, 

2892 schema, 

2893 **kw, 

2894 ) 

2895 

2896 return update_wrapper(wrap, fn) 

2897 

2898 

2899def _db_plus_owner(fn): 

2900 def wrap(dialect, connection, tablename, schema=None, **kw): 

2901 dbname, owner = _owner_plus_db(dialect, schema) 

2902 return _switch_db( 

2903 dbname, 

2904 connection, 

2905 fn, 

2906 dialect, 

2907 connection, 

2908 tablename, 

2909 dbname, 

2910 owner, 

2911 schema, 

2912 **kw, 

2913 ) 

2914 

2915 return update_wrapper(wrap, fn) 

2916 

2917 

2918def _switch_db(dbname, connection, fn, *arg, **kw): 

2919 if dbname: 

2920 current_db = connection.exec_driver_sql("select db_name()").scalar() 

2921 if current_db != dbname: 

2922 connection.exec_driver_sql( 

2923 "use %s" % connection.dialect.identifier_preparer.quote(dbname) 

2924 ) 

2925 try: 

2926 return fn(*arg, **kw) 

2927 finally: 

2928 if dbname and current_db != dbname: 

2929 connection.exec_driver_sql( 

2930 "use %s" 

2931 % connection.dialect.identifier_preparer.quote(current_db) 

2932 ) 

2933 

2934 

2935def _owner_plus_db(dialect, schema): 

2936 if not schema: 

2937 return None, dialect.default_schema_name 

2938 else: 

2939 return _schema_elements(schema) 

2940 

2941 

2942_memoized_schema = util.LRUCache() 

2943 

2944 

2945def _schema_elements(schema): 

2946 if isinstance(schema, quoted_name) and schema.quote: 

2947 return None, schema 

2948 

2949 if schema in _memoized_schema: 

2950 return _memoized_schema[schema] 

2951 

2952 # tests for this function are in: 

2953 # test/dialect/mssql/test_reflection.py -> 

2954 # OwnerPlusDBTest.test_owner_database_pairs 

2955 # test/dialect/mssql/test_compiler.py -> test_force_schema_* 

2956 # test/dialect/mssql/test_compiler.py -> test_schema_many_tokens_* 

2957 # 

2958 

2959 if schema.startswith("__[SCHEMA_"): 

2960 return None, schema 

2961 

2962 push = [] 

2963 symbol = "" 

2964 bracket = False 

2965 has_brackets = False 

2966 for token in re.split(r"(\[|\]|\.)", schema): 

2967 if not token: 

2968 continue 

2969 if token == "[": 

2970 bracket = True 

2971 has_brackets = True 

2972 elif token == "]": 

2973 bracket = False 

2974 elif not bracket and token == ".": 

2975 if has_brackets: 

2976 push.append("[%s]" % symbol) 

2977 else: 

2978 push.append(symbol) 

2979 symbol = "" 

2980 has_brackets = False 

2981 else: 

2982 symbol += token 

2983 if symbol: 

2984 push.append(symbol) 

2985 if len(push) > 1: 

2986 dbname, owner = ".".join(push[0:-1]), push[-1] 

2987 

2988 # test for internal brackets 

2989 if re.match(r".*\].*\[.*", dbname[1:-1]): 

2990 dbname = quoted_name(dbname, quote=False) 

2991 else: 

2992 dbname = dbname.lstrip("[").rstrip("]") 

2993 

2994 elif len(push): 

2995 dbname, owner = None, push[0] 

2996 else: 

2997 dbname, owner = None, None 

2998 

2999 _memoized_schema[schema] = dbname, owner 

3000 return dbname, owner 

3001 

3002 

3003class MSDialect(default.DefaultDialect): 

3004 # will assume it's at least mssql2005 

3005 name = "mssql" 

3006 supports_statement_cache = True 

3007 supports_default_values = True 

3008 supports_empty_insert = False 

3009 favor_returning_over_lastrowid = True 

3010 

3011 returns_native_bytes = True 

3012 

3013 supports_comments = True 

3014 supports_default_metavalue = False 

3015 """dialect supports INSERT... VALUES (DEFAULT) syntax - 

3016 SQL Server **does** support this, but **not** for the IDENTITY column, 

3017 so we can't turn this on. 

3018 

3019 """ 

3020 

3021 # supports_native_uuid is partial here, so we implement our 

3022 # own impl type 

3023 

3024 execution_ctx_cls = MSExecutionContext 

3025 use_scope_identity = True 

3026 max_identifier_length = 128 

3027 schema_name = "dbo" 

3028 

3029 insert_returning = True 

3030 update_returning = True 

3031 delete_returning = True 

3032 update_returning_multifrom = True 

3033 delete_returning_multifrom = True 

3034 

3035 colspecs = { 

3036 sqltypes.DateTime: _MSDateTime, 

3037 sqltypes.Date: _MSDate, 

3038 sqltypes.JSON: JSON, 

3039 sqltypes.JSON.JSONIndexType: JSONIndexType, 

3040 sqltypes.JSON.JSONPathType: JSONPathType, 

3041 sqltypes.Time: _BASETIMEIMPL, 

3042 sqltypes.Unicode: _MSUnicode, 

3043 sqltypes.UnicodeText: _MSUnicodeText, 

3044 DATETIMEOFFSET: DATETIMEOFFSET, 

3045 DATETIME2: DATETIME2, 

3046 SMALLDATETIME: SMALLDATETIME, 

3047 DATETIME: DATETIME, 

3048 sqltypes.Uuid: MSUUid, 

3049 } 

3050 

3051 engine_config_types = default.DefaultDialect.engine_config_types.union( 

3052 {"legacy_schema_aliasing": util.asbool} 

3053 ) 

3054 

3055 ischema_names = ischema_names 

3056 

3057 supports_sequences = True 

3058 sequences_optional = True 

3059 # This is actually used for autoincrement, where itentity is used that 

3060 # starts with 1. 

3061 # for sequences T-SQL's actual default is -9223372036854775808 

3062 default_sequence_base = 1 

3063 

3064 supports_native_boolean = False 

3065 non_native_boolean_check_constraint = False 

3066 supports_unicode_binds = True 

3067 postfetch_lastrowid = True 

3068 

3069 # may be changed at server inspection time for older SQL server versions 

3070 supports_multivalues_insert = True 

3071 

3072 use_insertmanyvalues = True 

3073 

3074 # note pyodbc will set this to False if fast_executemany is set, 

3075 # as of SQLAlchemy 2.0.9 

3076 use_insertmanyvalues_wo_returning = True 

3077 

3078 insertmanyvalues_implicit_sentinel = ( 

3079 InsertmanyvaluesSentinelOpts.AUTOINCREMENT 

3080 | InsertmanyvaluesSentinelOpts.IDENTITY 

3081 | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT 

3082 ) 

3083 

3084 # "The incoming request has too many parameters. The server supports a " 

3085 # "maximum of 2100 parameters." 

3086 # in fact you can have 2099 parameters. 

3087 insertmanyvalues_max_parameters = 2099 

3088 

3089 _supports_offset_fetch = False 

3090 _supports_nvarchar_max = False 

3091 

3092 legacy_schema_aliasing = False 

3093 

3094 server_version_info = () 

3095 

3096 statement_compiler = MSSQLCompiler 

3097 ddl_compiler = MSDDLCompiler 

3098 type_compiler_cls = MSTypeCompiler 

3099 preparer = MSIdentifierPreparer 

3100 

3101 construct_arguments = [ 

3102 (sa_schema.PrimaryKeyConstraint, {"clustered": None}), 

3103 (sa_schema.UniqueConstraint, {"clustered": None}), 

3104 ( 

3105 sa_schema.Index, 

3106 { 

3107 "clustered": None, 

3108 "include": None, 

3109 "where": None, 

3110 "columnstore": None, 

3111 }, 

3112 ), 

3113 ( 

3114 sa_schema.Column, 

3115 {"identity_start": None, "identity_increment": None}, 

3116 ), 

3117 ] 

3118 

3119 def __init__( 

3120 self, 

3121 query_timeout=None, 

3122 use_scope_identity=True, 

3123 schema_name="dbo", 

3124 deprecate_large_types=None, 

3125 supports_comments=None, 

3126 json_serializer=None, 

3127 json_deserializer=None, 

3128 legacy_schema_aliasing=None, 

3129 ignore_no_transaction_on_rollback=False, 

3130 **opts, 

3131 ): 

3132 self.query_timeout = int(query_timeout or 0) 

3133 self.schema_name = schema_name 

3134 

3135 self.use_scope_identity = use_scope_identity 

3136 self.deprecate_large_types = deprecate_large_types 

3137 self.ignore_no_transaction_on_rollback = ( 

3138 ignore_no_transaction_on_rollback 

3139 ) 

3140 self._user_defined_supports_comments = uds = supports_comments 

3141 if uds is not None: 

3142 self.supports_comments = uds 

3143 

3144 if legacy_schema_aliasing is not None: 

3145 util.warn_deprecated( 

3146 "The legacy_schema_aliasing parameter is " 

3147 "deprecated and will be removed in a future release.", 

3148 "1.4", 

3149 ) 

3150 self.legacy_schema_aliasing = legacy_schema_aliasing 

3151 

3152 super().__init__(**opts) 

3153 

3154 self._json_serializer = json_serializer 

3155 self._json_deserializer = json_deserializer 

3156 

3157 def do_savepoint(self, connection, name): 

3158 # give the DBAPI a push 

3159 connection.exec_driver_sql("IF @@TRANCOUNT = 0 BEGIN TRANSACTION") 

3160 super().do_savepoint(connection, name) 

3161 

3162 def do_release_savepoint(self, connection, name): 

3163 # SQL Server does not support RELEASE SAVEPOINT 

3164 pass 

3165 

3166 def do_rollback(self, dbapi_connection): 

3167 try: 

3168 super().do_rollback(dbapi_connection) 

3169 except self.dbapi.ProgrammingError as e: 

3170 if self.ignore_no_transaction_on_rollback and re.match( 

3171 r".*\b111214\b", str(e) 

3172 ): 

3173 util.warn( 

3174 "ProgrammingError 111214 " 

3175 "'No corresponding transaction found.' " 

3176 "has been suppressed via " 

3177 "ignore_no_transaction_on_rollback=True" 

3178 ) 

3179 else: 

3180 raise 

3181 

3182 _isolation_lookup = { 

3183 "SERIALIZABLE", 

3184 "READ UNCOMMITTED", 

3185 "READ COMMITTED", 

3186 "REPEATABLE READ", 

3187 "SNAPSHOT", 

3188 } 

3189 

3190 def get_isolation_level_values(self, dbapi_connection): 

3191 return list(self._isolation_lookup) 

3192 

3193 def set_isolation_level(self, dbapi_connection, level): 

3194 cursor = dbapi_connection.cursor() 

3195 cursor.execute(f"SET TRANSACTION ISOLATION LEVEL {level}") 

3196 cursor.close() 

3197 if level == "SNAPSHOT": 

3198 dbapi_connection.commit() 

3199 

3200 def get_isolation_level(self, dbapi_connection): 

3201 cursor = dbapi_connection.cursor() 

3202 view_name = "sys.system_views" 

3203 try: 

3204 cursor.execute( 

3205 ( 

3206 "SELECT name FROM {} WHERE name IN " 

3207 "('dm_exec_sessions', 'dm_pdw_nodes_exec_sessions')" 

3208 ).format(view_name) 

3209 ) 

3210 row = cursor.fetchone() 

3211 if not row: 

3212 raise NotImplementedError( 

3213 "Can't fetch isolation level on this particular " 

3214 "SQL Server version." 

3215 ) 

3216 

3217 view_name = f"sys.{row[0]}" 

3218 

3219 cursor.execute( 

3220 """ 

3221 SELECT CASE transaction_isolation_level 

3222 WHEN 0 THEN NULL 

3223 WHEN 1 THEN 'READ UNCOMMITTED' 

3224 WHEN 2 THEN 'READ COMMITTED' 

3225 WHEN 3 THEN 'REPEATABLE READ' 

3226 WHEN 4 THEN 'SERIALIZABLE' 

3227 WHEN 5 THEN 'SNAPSHOT' END 

3228 AS TRANSACTION_ISOLATION_LEVEL 

3229 FROM {} 

3230 where session_id = @@SPID 

3231 """.format( 

3232 view_name 

3233 ) 

3234 ) 

3235 except self.dbapi.Error as err: 

3236 raise NotImplementedError( 

3237 "Can't fetch isolation level; encountered error {} when " 

3238 'attempting to query the "{}" view.'.format(err, view_name) 

3239 ) from err 

3240 else: 

3241 row = cursor.fetchone() 

3242 return row[0].upper() 

3243 finally: 

3244 cursor.close() 

3245 

3246 def initialize(self, connection): 

3247 super().initialize(connection) 

3248 self._setup_version_attributes() 

3249 self._setup_supports_nvarchar_max(connection) 

3250 self._setup_supports_comments(connection) 

3251 

3252 def _setup_version_attributes(self): 

3253 if self.server_version_info[0] not in list(range(8, 17)): 

3254 util.warn( 

3255 "Unrecognized server version info '%s'. Some SQL Server " 

3256 "features may not function properly." 

3257 % ".".join(str(x) for x in self.server_version_info) 

3258 ) 

3259 

3260 if self.server_version_info >= MS_2008_VERSION: 

3261 self.supports_multivalues_insert = True 

3262 else: 

3263 self.supports_multivalues_insert = False 

3264 

3265 if self.deprecate_large_types is None: 

3266 self.deprecate_large_types = ( 

3267 self.server_version_info >= MS_2012_VERSION 

3268 ) 

3269 

3270 self._supports_offset_fetch = ( 

3271 self.server_version_info and self.server_version_info[0] >= 11 

3272 ) 

3273 

3274 def _setup_supports_nvarchar_max(self, connection): 

3275 try: 

3276 connection.scalar( 

3277 sql.text("SELECT CAST('test max support' AS NVARCHAR(max))") 

3278 ) 

3279 except exc.DBAPIError: 

3280 self._supports_nvarchar_max = False 

3281 else: 

3282 self._supports_nvarchar_max = True 

3283 

3284 def _setup_supports_comments(self, connection): 

3285 if self._user_defined_supports_comments is not None: 

3286 return 

3287 

3288 try: 

3289 connection.scalar( 

3290 sql.text( 

3291 "SELECT 1 FROM fn_listextendedproperty" 

3292 "(default, default, default, default, " 

3293 "default, default, default)" 

3294 ) 

3295 ) 

3296 except exc.DBAPIError: 

3297 self.supports_comments = False 

3298 else: 

3299 self.supports_comments = True 

3300 

3301 def _get_default_schema_name(self, connection): 

3302 query = sql.text("SELECT schema_name()") 

3303 default_schema_name = connection.scalar(query) 

3304 if default_schema_name is not None: 

3305 # guard against the case where the default_schema_name is being 

3306 # fed back into a table reflection function. 

3307 return quoted_name(default_schema_name, quote=True) 

3308 else: 

3309 return self.schema_name 

3310 

3311 @_db_plus_owner 

3312 def has_table(self, connection, tablename, dbname, owner, schema, **kw): 

3313 self._ensure_has_table_connection(connection) 

3314 

3315 return self._internal_has_table(connection, tablename, owner, **kw) 

3316 

3317 @reflection.cache 

3318 @_db_plus_owner 

3319 def has_sequence( 

3320 self, connection, sequencename, dbname, owner, schema, **kw 

3321 ): 

3322 sequences = ischema.sequences 

3323 

3324 s = sql.select(sequences.c.sequence_name).where( 

3325 sequences.c.sequence_name == sequencename 

3326 ) 

3327 

3328 if owner: 

3329 s = s.where(sequences.c.sequence_schema == owner) 

3330 

3331 c = connection.execute(s) 

3332 

3333 return c.first() is not None 

3334 

3335 @reflection.cache 

3336 @_db_plus_owner_listing 

3337 def get_sequence_names(self, connection, dbname, owner, schema, **kw): 

3338 sequences = ischema.sequences 

3339 

3340 s = sql.select(sequences.c.sequence_name) 

3341 if owner: 

3342 s = s.where(sequences.c.sequence_schema == owner) 

3343 

3344 c = connection.execute(s) 

3345 

3346 return [row[0] for row in c] 

3347 

3348 @reflection.cache 

3349 def get_schema_names(self, connection, **kw): 

3350 s = sql.select(ischema.schemata.c.schema_name).order_by( 

3351 ischema.schemata.c.schema_name 

3352 ) 

3353 schema_names = [r[0] for r in connection.execute(s)] 

3354 return schema_names 

3355 

3356 @reflection.cache 

3357 @_db_plus_owner_listing 

3358 def get_table_names(self, connection, dbname, owner, schema, **kw): 

3359 tables = ischema.tables 

3360 s = ( 

3361 sql.select(tables.c.table_name) 

3362 .where( 

3363 sql.and_( 

3364 tables.c.table_schema == owner, 

3365 tables.c.table_type == "BASE TABLE", 

3366 ) 

3367 ) 

3368 .order_by(tables.c.table_name) 

3369 ) 

3370 table_names = [r[0] for r in connection.execute(s)] 

3371 return table_names 

3372 

3373 @reflection.cache 

3374 @_db_plus_owner_listing 

3375 def get_view_names(self, connection, dbname, owner, schema, **kw): 

3376 tables = ischema.tables 

3377 s = ( 

3378 sql.select(tables.c.table_name) 

3379 .where( 

3380 sql.and_( 

3381 tables.c.table_schema == owner, 

3382 tables.c.table_type == "VIEW", 

3383 ) 

3384 ) 

3385 .order_by(tables.c.table_name) 

3386 ) 

3387 view_names = [r[0] for r in connection.execute(s)] 

3388 return view_names 

3389 

3390 @reflection.cache 

3391 def _internal_has_table(self, connection, tablename, owner, **kw): 

3392 if tablename.startswith("#"): # temporary table 

3393 # mssql does not support temporary views 

3394 # SQL Error [4103] [S0001]: "#v": Temporary views are not allowed 

3395 return bool( 

3396 connection.scalar( 

3397 # U filters on user tables only. 

3398 text("SELECT object_id(:table_name, 'U')"), 

3399 {"table_name": f"tempdb.dbo.[{tablename}]"}, 

3400 ) 

3401 ) 

3402 else: 

3403 tables = ischema.tables 

3404 

3405 s = sql.select(tables.c.table_name).where( 

3406 sql.and_( 

3407 sql.or_( 

3408 tables.c.table_type == "BASE TABLE", 

3409 tables.c.table_type == "VIEW", 

3410 ), 

3411 tables.c.table_name == tablename, 

3412 ) 

3413 ) 

3414 

3415 if owner: 

3416 s = s.where(tables.c.table_schema == owner) 

3417 

3418 c = connection.execute(s) 

3419 

3420 return c.first() is not None 

3421 

3422 def _default_or_error(self, connection, tablename, owner, method, **kw): 

3423 # TODO: try to avoid having to run a separate query here 

3424 if self._internal_has_table(connection, tablename, owner, **kw): 

3425 return method() 

3426 else: 

3427 raise exc.NoSuchTableError(f"{owner}.{tablename}") 

3428 

3429 @reflection.cache 

3430 @_db_plus_owner 

3431 def get_indexes(self, connection, tablename, dbname, owner, schema, **kw): 

3432 filter_definition = ( 

3433 "ind.filter_definition" 

3434 if self.server_version_info >= MS_2008_VERSION 

3435 else "NULL as filter_definition" 

3436 ) 

3437 rp = connection.execution_options(future_result=True).execute( 

3438 sql.text( 

3439 f""" 

3440select 

3441 ind.index_id, 

3442 ind.is_unique, 

3443 ind.name, 

3444 ind.type, 

3445 {filter_definition} 

3446from 

3447 sys.indexes as ind 

3448join sys.tables as tab on 

3449 ind.object_id = tab.object_id 

3450join sys.schemas as sch on 

3451 sch.schema_id = tab.schema_id 

3452where 

3453 tab.name = :tabname 

3454 and sch.name = :schname 

3455 and ind.is_primary_key = 0 

3456 and ind.type != 0 

3457order by 

3458 ind.name 

3459 """ 

3460 ) 

3461 .bindparams( 

3462 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()), 

3463 sql.bindparam("schname", owner, ischema.CoerceUnicode()), 

3464 ) 

3465 .columns(name=sqltypes.Unicode()) 

3466 ) 

3467 indexes = {} 

3468 for row in rp.mappings(): 

3469 indexes[row["index_id"]] = current = { 

3470 "name": row["name"], 

3471 "unique": row["is_unique"] == 1, 

3472 "column_names": [], 

3473 "include_columns": [], 

3474 "dialect_options": {}, 

3475 } 

3476 

3477 do = current["dialect_options"] 

3478 index_type = row["type"] 

3479 if index_type in {1, 2}: 

3480 do["mssql_clustered"] = index_type == 1 

3481 if index_type in {5, 6}: 

3482 do["mssql_clustered"] = index_type == 5 

3483 do["mssql_columnstore"] = True 

3484 if row["filter_definition"] is not None: 

3485 do["mssql_where"] = row["filter_definition"] 

3486 

3487 rp = connection.execution_options(future_result=True).execute( 

3488 sql.text( 

3489 """ 

3490select 

3491 ind_col.index_id, 

3492 col.name, 

3493 ind_col.is_included_column 

3494from 

3495 sys.columns as col 

3496join sys.tables as tab on 

3497 tab.object_id = col.object_id 

3498join sys.index_columns as ind_col on 

3499 ind_col.column_id = col.column_id 

3500 and ind_col.object_id = tab.object_id 

3501join sys.schemas as sch on 

3502 sch.schema_id = tab.schema_id 

3503where 

3504 tab.name = :tabname 

3505 and sch.name = :schname 

3506 """ 

3507 ) 

3508 .bindparams( 

3509 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()), 

3510 sql.bindparam("schname", owner, ischema.CoerceUnicode()), 

3511 ) 

3512 .columns(name=sqltypes.Unicode()) 

3513 ) 

3514 for row in rp.mappings(): 

3515 if row["index_id"] not in indexes: 

3516 continue 

3517 index_def = indexes[row["index_id"]] 

3518 is_colstore = index_def["dialect_options"].get("mssql_columnstore") 

3519 is_clustered = index_def["dialect_options"].get("mssql_clustered") 

3520 if not (is_colstore and is_clustered): 

3521 # a clustered columnstore index includes all columns but does 

3522 # not want them in the index definition 

3523 if row["is_included_column"] and not is_colstore: 

3524 # a noncludsted columnstore index reports that includes 

3525 # columns but requires that are listed as normal columns 

3526 index_def["include_columns"].append(row["name"]) 

3527 else: 

3528 index_def["column_names"].append(row["name"]) 

3529 for index_info in indexes.values(): 

3530 # NOTE: "root level" include_columns is legacy, now part of 

3531 # dialect_options (issue #7382) 

3532 index_info["dialect_options"]["mssql_include"] = index_info[ 

3533 "include_columns" 

3534 ] 

3535 

3536 if indexes: 

3537 return list(indexes.values()) 

3538 else: 

3539 return self._default_or_error( 

3540 connection, tablename, owner, ReflectionDefaults.indexes, **kw 

3541 ) 

3542 

3543 @reflection.cache 

3544 @_db_plus_owner 

3545 def get_view_definition( 

3546 self, connection, viewname, dbname, owner, schema, **kw 

3547 ): 

3548 view_def = connection.execute( 

3549 sql.text( 

3550 "select mod.definition " 

3551 "from sys.sql_modules as mod " 

3552 "join sys.views as views on mod.object_id = views.object_id " 

3553 "join sys.schemas as sch on views.schema_id = sch.schema_id " 

3554 "where views.name=:viewname and sch.name=:schname" 

3555 ).bindparams( 

3556 sql.bindparam("viewname", viewname, ischema.CoerceUnicode()), 

3557 sql.bindparam("schname", owner, ischema.CoerceUnicode()), 

3558 ) 

3559 ).scalar() 

3560 if view_def: 

3561 return view_def 

3562 else: 

3563 raise exc.NoSuchTableError(f"{owner}.{viewname}") 

3564 

3565 @reflection.cache 

3566 def get_table_comment(self, connection, table_name, schema=None, **kw): 

3567 if not self.supports_comments: 

3568 raise NotImplementedError( 

3569 "Can't get table comments on current SQL Server version in use" 

3570 ) 

3571 

3572 schema_name = schema if schema else self.default_schema_name 

3573 COMMENT_SQL = """ 

3574 SELECT cast(com.value as nvarchar(max)) 

3575 FROM fn_listextendedproperty('MS_Description', 

3576 'schema', :schema, 'table', :table, NULL, NULL 

3577 ) as com; 

3578 """ 

3579 

3580 comment = connection.execute( 

3581 sql.text(COMMENT_SQL).bindparams( 

3582 sql.bindparam("schema", schema_name, ischema.CoerceUnicode()), 

3583 sql.bindparam("table", table_name, ischema.CoerceUnicode()), 

3584 ) 

3585 ).scalar() 

3586 if comment: 

3587 return {"text": comment} 

3588 else: 

3589 return self._default_or_error( 

3590 connection, 

3591 table_name, 

3592 None, 

3593 ReflectionDefaults.table_comment, 

3594 **kw, 

3595 ) 

3596 

3597 def _temp_table_name_like_pattern(self, tablename): 

3598 # LIKE uses '%' to match zero or more characters and '_' to match any 

3599 # single character. We want to match literal underscores, so T-SQL 

3600 # requires that we enclose them in square brackets. 

3601 return tablename + ( 

3602 ("[_][_][_]%") if not tablename.startswith("##") else "" 

3603 ) 

3604 

3605 def _get_internal_temp_table_name(self, connection, tablename): 

3606 # it's likely that schema is always "dbo", but since we can 

3607 # get it here, let's get it. 

3608 # see https://stackoverflow.com/questions/8311959/ 

3609 # specifying-schema-for-temporary-tables 

3610 

3611 try: 

3612 return connection.execute( 

3613 sql.text( 

3614 "select table_schema, table_name " 

3615 "from tempdb.information_schema.tables " 

3616 "where table_name like :p1" 

3617 ), 

3618 {"p1": self._temp_table_name_like_pattern(tablename)}, 

3619 ).one() 

3620 except exc.MultipleResultsFound as me: 

3621 raise exc.UnreflectableTableError( 

3622 "Found more than one temporary table named '%s' in tempdb " 

3623 "at this time. Cannot reliably resolve that name to its " 

3624 "internal table name." % tablename 

3625 ) from me 

3626 except exc.NoResultFound as ne: 

3627 raise exc.NoSuchTableError( 

3628 "Unable to find a temporary table named '%s' in tempdb." 

3629 % tablename 

3630 ) from ne 

3631 

3632 @reflection.cache 

3633 @_db_plus_owner 

3634 def get_columns(self, connection, tablename, dbname, owner, schema, **kw): 

3635 is_temp_table = tablename.startswith("#") 

3636 if is_temp_table: 

3637 owner, tablename = self._get_internal_temp_table_name( 

3638 connection, tablename 

3639 ) 

3640 

3641 columns = ischema.mssql_temp_table_columns 

3642 else: 

3643 columns = ischema.columns 

3644 

3645 computed_cols = ischema.computed_columns 

3646 identity_cols = ischema.identity_columns 

3647 if owner: 

3648 whereclause = sql.and_( 

3649 columns.c.table_name == tablename, 

3650 columns.c.table_schema == owner, 

3651 ) 

3652 full_name = columns.c.table_schema + "." + columns.c.table_name 

3653 else: 

3654 whereclause = columns.c.table_name == tablename 

3655 full_name = columns.c.table_name 

3656 

3657 if self._supports_nvarchar_max: 

3658 computed_definition = computed_cols.c.definition 

3659 else: 

3660 # tds_version 4.2 does not support NVARCHAR(MAX) 

3661 computed_definition = sql.cast( 

3662 computed_cols.c.definition, NVARCHAR(4000) 

3663 ) 

3664 

3665 object_id = func.object_id(full_name) 

3666 

3667 s = ( 

3668 sql.select( 

3669 columns.c.column_name, 

3670 columns.c.data_type, 

3671 columns.c.is_nullable, 

3672 columns.c.character_maximum_length, 

3673 columns.c.numeric_precision, 

3674 columns.c.numeric_scale, 

3675 columns.c.column_default, 

3676 columns.c.collation_name, 

3677 computed_definition, 

3678 computed_cols.c.is_persisted, 

3679 identity_cols.c.is_identity, 

3680 identity_cols.c.seed_value, 

3681 identity_cols.c.increment_value, 

3682 ischema.extended_properties.c.value.label("comment"), 

3683 ) 

3684 .select_from(columns) 

3685 .outerjoin( 

3686 computed_cols, 

3687 onclause=sql.and_( 

3688 computed_cols.c.object_id == object_id, 

3689 computed_cols.c.name 

3690 == columns.c.column_name.collate("DATABASE_DEFAULT"), 

3691 ), 

3692 ) 

3693 .outerjoin( 

3694 identity_cols, 

3695 onclause=sql.and_( 

3696 identity_cols.c.object_id == object_id, 

3697 identity_cols.c.name 

3698 == columns.c.column_name.collate("DATABASE_DEFAULT"), 

3699 ), 

3700 ) 

3701 .outerjoin( 

3702 ischema.extended_properties, 

3703 onclause=sql.and_( 

3704 ischema.extended_properties.c["class"] == 1, 

3705 ischema.extended_properties.c.major_id == object_id, 

3706 ischema.extended_properties.c.minor_id 

3707 == columns.c.ordinal_position, 

3708 ischema.extended_properties.c.name == "MS_Description", 

3709 ), 

3710 ) 

3711 .where(whereclause) 

3712 .order_by(columns.c.ordinal_position) 

3713 ) 

3714 

3715 c = connection.execution_options(future_result=True).execute(s) 

3716 

3717 cols = [] 

3718 for row in c.mappings(): 

3719 name = row[columns.c.column_name] 

3720 type_ = row[columns.c.data_type] 

3721 nullable = row[columns.c.is_nullable] == "YES" 

3722 charlen = row[columns.c.character_maximum_length] 

3723 numericprec = row[columns.c.numeric_precision] 

3724 numericscale = row[columns.c.numeric_scale] 

3725 default = row[columns.c.column_default] 

3726 collation = row[columns.c.collation_name] 

3727 definition = row[computed_definition] 

3728 is_persisted = row[computed_cols.c.is_persisted] 

3729 is_identity = row[identity_cols.c.is_identity] 

3730 identity_start = row[identity_cols.c.seed_value] 

3731 identity_increment = row[identity_cols.c.increment_value] 

3732 comment = row[ischema.extended_properties.c.value] 

3733 

3734 coltype = self.ischema_names.get(type_, None) 

3735 

3736 kwargs = {} 

3737 if coltype in ( 

3738 MSString, 

3739 MSChar, 

3740 MSNVarchar, 

3741 MSNChar, 

3742 MSText, 

3743 MSNText, 

3744 MSBinary, 

3745 MSVarBinary, 

3746 sqltypes.LargeBinary, 

3747 ): 

3748 if charlen == -1: 

3749 charlen = None 

3750 kwargs["length"] = charlen 

3751 if collation: 

3752 kwargs["collation"] = collation 

3753 

3754 if coltype is None: 

3755 util.warn( 

3756 "Did not recognize type '%s' of column '%s'" 

3757 % (type_, name) 

3758 ) 

3759 coltype = sqltypes.NULLTYPE 

3760 else: 

3761 if issubclass(coltype, sqltypes.Numeric): 

3762 kwargs["precision"] = numericprec 

3763 

3764 if not issubclass(coltype, sqltypes.Float): 

3765 kwargs["scale"] = numericscale 

3766 

3767 coltype = coltype(**kwargs) 

3768 cdict = { 

3769 "name": name, 

3770 "type": coltype, 

3771 "nullable": nullable, 

3772 "default": default, 

3773 "autoincrement": is_identity is not None, 

3774 "comment": comment, 

3775 } 

3776 

3777 if definition is not None and is_persisted is not None: 

3778 cdict["computed"] = { 

3779 "sqltext": definition, 

3780 "persisted": is_persisted, 

3781 } 

3782 

3783 if is_identity is not None: 

3784 # identity_start and identity_increment are Decimal or None 

3785 if identity_start is None or identity_increment is None: 

3786 cdict["identity"] = {} 

3787 else: 

3788 if isinstance(coltype, sqltypes.BigInteger): 

3789 start = int(identity_start) 

3790 increment = int(identity_increment) 

3791 elif isinstance(coltype, sqltypes.Integer): 

3792 start = int(identity_start) 

3793 increment = int(identity_increment) 

3794 else: 

3795 start = identity_start 

3796 increment = identity_increment 

3797 

3798 cdict["identity"] = { 

3799 "start": start, 

3800 "increment": increment, 

3801 } 

3802 

3803 cols.append(cdict) 

3804 

3805 if cols: 

3806 return cols 

3807 else: 

3808 return self._default_or_error( 

3809 connection, tablename, owner, ReflectionDefaults.columns, **kw 

3810 ) 

3811 

3812 @reflection.cache 

3813 @_db_plus_owner 

3814 def get_pk_constraint( 

3815 self, connection, tablename, dbname, owner, schema, **kw 

3816 ): 

3817 pkeys = [] 

3818 TC = ischema.constraints 

3819 C = ischema.key_constraints.alias("C") 

3820 

3821 # Primary key constraints 

3822 s = ( 

3823 sql.select( 

3824 C.c.column_name, 

3825 TC.c.constraint_type, 

3826 C.c.constraint_name, 

3827 func.objectproperty( 

3828 func.object_id( 

3829 C.c.table_schema + "." + C.c.constraint_name 

3830 ), 

3831 "CnstIsClustKey", 

3832 ).label("is_clustered"), 

3833 ) 

3834 .where( 

3835 sql.and_( 

3836 TC.c.constraint_name == C.c.constraint_name, 

3837 TC.c.table_schema == C.c.table_schema, 

3838 C.c.table_name == tablename, 

3839 C.c.table_schema == owner, 

3840 ), 

3841 ) 

3842 .order_by(TC.c.constraint_name, C.c.ordinal_position) 

3843 ) 

3844 c = connection.execution_options(future_result=True).execute(s) 

3845 constraint_name = None 

3846 is_clustered = None 

3847 for row in c.mappings(): 

3848 if "PRIMARY" in row[TC.c.constraint_type.name]: 

3849 pkeys.append(row["COLUMN_NAME"]) 

3850 if constraint_name is None: 

3851 constraint_name = row[C.c.constraint_name.name] 

3852 if is_clustered is None: 

3853 is_clustered = row["is_clustered"] 

3854 if pkeys: 

3855 return { 

3856 "constrained_columns": pkeys, 

3857 "name": constraint_name, 

3858 "dialect_options": {"mssql_clustered": is_clustered}, 

3859 } 

3860 else: 

3861 return self._default_or_error( 

3862 connection, 

3863 tablename, 

3864 owner, 

3865 ReflectionDefaults.pk_constraint, 

3866 **kw, 

3867 ) 

3868 

3869 @reflection.cache 

3870 @_db_plus_owner 

3871 def get_foreign_keys( 

3872 self, connection, tablename, dbname, owner, schema, **kw 

3873 ): 

3874 # Foreign key constraints 

3875 s = ( 

3876 text( 

3877 """\ 

3878WITH fk_info AS ( 

3879 SELECT 

3880 ischema_ref_con.constraint_schema, 

3881 ischema_ref_con.constraint_name, 

3882 ischema_key_col.ordinal_position, 

3883 ischema_key_col.table_schema, 

3884 ischema_key_col.table_name, 

3885 ischema_ref_con.unique_constraint_schema, 

3886 ischema_ref_con.unique_constraint_name, 

3887 ischema_ref_con.match_option, 

3888 ischema_ref_con.update_rule, 

3889 ischema_ref_con.delete_rule, 

3890 ischema_key_col.column_name AS constrained_column 

3891 FROM 

3892 INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS ischema_ref_con 

3893 INNER JOIN 

3894 INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col ON 

3895 ischema_key_col.table_schema = ischema_ref_con.constraint_schema 

3896 AND ischema_key_col.constraint_name = 

3897 ischema_ref_con.constraint_name 

3898 WHERE ischema_key_col.table_name = :tablename 

3899 AND ischema_key_col.table_schema = :owner 

3900), 

3901constraint_info AS ( 

3902 SELECT 

3903 ischema_key_col.constraint_schema, 

3904 ischema_key_col.constraint_name, 

3905 ischema_key_col.ordinal_position, 

3906 ischema_key_col.table_schema, 

3907 ischema_key_col.table_name, 

3908 ischema_key_col.column_name 

3909 FROM 

3910 INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col 

3911), 

3912index_info AS ( 

3913 SELECT 

3914 sys.schemas.name AS index_schema, 

3915 sys.indexes.name AS index_name, 

3916 sys.index_columns.key_ordinal AS ordinal_position, 

3917 sys.schemas.name AS table_schema, 

3918 sys.objects.name AS table_name, 

3919 sys.columns.name AS column_name 

3920 FROM 

3921 sys.indexes 

3922 INNER JOIN 

3923 sys.objects ON 

3924 sys.objects.object_id = sys.indexes.object_id 

3925 INNER JOIN 

3926 sys.schemas ON 

3927 sys.schemas.schema_id = sys.objects.schema_id 

3928 INNER JOIN 

3929 sys.index_columns ON 

3930 sys.index_columns.object_id = sys.objects.object_id 

3931 AND sys.index_columns.index_id = sys.indexes.index_id 

3932 INNER JOIN 

3933 sys.columns ON 

3934 sys.columns.object_id = sys.indexes.object_id 

3935 AND sys.columns.column_id = sys.index_columns.column_id 

3936) 

3937 SELECT 

3938 fk_info.constraint_schema, 

3939 fk_info.constraint_name, 

3940 fk_info.ordinal_position, 

3941 fk_info.constrained_column, 

3942 constraint_info.table_schema AS referred_table_schema, 

3943 constraint_info.table_name AS referred_table_name, 

3944 constraint_info.column_name AS referred_column, 

3945 fk_info.match_option, 

3946 fk_info.update_rule, 

3947 fk_info.delete_rule 

3948 FROM 

3949 fk_info INNER JOIN constraint_info ON 

3950 constraint_info.constraint_schema = 

3951 fk_info.unique_constraint_schema 

3952 AND constraint_info.constraint_name = 

3953 fk_info.unique_constraint_name 

3954 AND constraint_info.ordinal_position = fk_info.ordinal_position 

3955 UNION 

3956 SELECT 

3957 fk_info.constraint_schema, 

3958 fk_info.constraint_name, 

3959 fk_info.ordinal_position, 

3960 fk_info.constrained_column, 

3961 index_info.table_schema AS referred_table_schema, 

3962 index_info.table_name AS referred_table_name, 

3963 index_info.column_name AS referred_column, 

3964 fk_info.match_option, 

3965 fk_info.update_rule, 

3966 fk_info.delete_rule 

3967 FROM 

3968 fk_info INNER JOIN index_info ON 

3969 index_info.index_schema = fk_info.unique_constraint_schema 

3970 AND index_info.index_name = fk_info.unique_constraint_name 

3971 AND index_info.ordinal_position = fk_info.ordinal_position 

3972 

3973 ORDER BY fk_info.constraint_schema, fk_info.constraint_name, 

3974 fk_info.ordinal_position 

3975""" 

3976 ) 

3977 .bindparams( 

3978 sql.bindparam("tablename", tablename, ischema.CoerceUnicode()), 

3979 sql.bindparam("owner", owner, ischema.CoerceUnicode()), 

3980 ) 

3981 .columns( 

3982 constraint_schema=sqltypes.Unicode(), 

3983 constraint_name=sqltypes.Unicode(), 

3984 table_schema=sqltypes.Unicode(), 

3985 table_name=sqltypes.Unicode(), 

3986 constrained_column=sqltypes.Unicode(), 

3987 referred_table_schema=sqltypes.Unicode(), 

3988 referred_table_name=sqltypes.Unicode(), 

3989 referred_column=sqltypes.Unicode(), 

3990 ) 

3991 ) 

3992 

3993 # group rows by constraint ID, to handle multi-column FKs 

3994 fkeys = util.defaultdict( 

3995 lambda: { 

3996 "name": None, 

3997 "constrained_columns": [], 

3998 "referred_schema": None, 

3999 "referred_table": None, 

4000 "referred_columns": [], 

4001 "options": {}, 

4002 } 

4003 ) 

4004 

4005 for r in connection.execute(s).all(): 

4006 ( 

4007 _, # constraint schema 

4008 rfknm, 

4009 _, # ordinal position 

4010 scol, 

4011 rschema, 

4012 rtbl, 

4013 rcol, 

4014 # TODO: we support match=<keyword> for foreign keys so 

4015 # we can support this also, PG has match=FULL for example 

4016 # but this seems to not be a valid value for SQL Server 

4017 _, # match rule 

4018 fkuprule, 

4019 fkdelrule, 

4020 ) = r 

4021 

4022 rec = fkeys[rfknm] 

4023 rec["name"] = rfknm 

4024 

4025 if fkuprule != "NO ACTION": 

4026 rec["options"]["onupdate"] = fkuprule 

4027 

4028 if fkdelrule != "NO ACTION": 

4029 rec["options"]["ondelete"] = fkdelrule 

4030 

4031 if not rec["referred_table"]: 

4032 rec["referred_table"] = rtbl 

4033 if schema is not None or owner != rschema: 

4034 if dbname: 

4035 rschema = dbname + "." + rschema 

4036 rec["referred_schema"] = rschema 

4037 

4038 local_cols, remote_cols = ( 

4039 rec["constrained_columns"], 

4040 rec["referred_columns"], 

4041 ) 

4042 

4043 local_cols.append(scol) 

4044 remote_cols.append(rcol) 

4045 

4046 if fkeys: 

4047 return list(fkeys.values()) 

4048 else: 

4049 return self._default_or_error( 

4050 connection, 

4051 tablename, 

4052 owner, 

4053 ReflectionDefaults.foreign_keys, 

4054 **kw, 

4055 )