Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/mssql/base.py: 33%

1056 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# mssql/base.py 

2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7""" 

8.. dialect:: mssql 

9 :name: Microsoft SQL Server 

10 :full_support: 2017 

11 :normal_support: 2012+ 

12 :best_effort: 2005+ 

13 

14.. _mssql_external_dialects: 

15 

16External Dialects 

17----------------- 

18 

19In addition to the above DBAPI layers with native SQLAlchemy support, there 

20are third-party dialects for other DBAPI layers that are compatible 

21with SQL Server. See the "External Dialects" list on the 

22:ref:`dialect_toplevel` page. 

23 

24.. _mssql_identity: 

25 

26Auto Increment Behavior / IDENTITY Columns 

27------------------------------------------ 

28 

29SQL Server provides so-called "auto incrementing" behavior using the 

30``IDENTITY`` construct, which can be placed on any single integer column in a 

31table. SQLAlchemy considers ``IDENTITY`` within its default "autoincrement" 

32behavior for an integer primary key column, described at 

33:paramref:`_schema.Column.autoincrement`. This means that by default, 

34the first integer primary key column in a :class:`_schema.Table` will be 

35considered to be the identity column - unless it is associated with a 

36:class:`.Sequence` - and will generate DDL as such:: 

37 

38 from sqlalchemy import Table, MetaData, Column, Integer 

39 

40 m = MetaData() 

41 t = Table('t', m, 

42 Column('id', Integer, primary_key=True), 

43 Column('x', Integer)) 

44 m.create_all(engine) 

45 

46The above example will generate DDL as: 

47 

48.. sourcecode:: sql 

49 

50 CREATE TABLE t ( 

51 id INTEGER NOT NULL IDENTITY, 

52 x INTEGER NULL, 

53 PRIMARY KEY (id) 

54 ) 

55 

56For the case where this default generation of ``IDENTITY`` is not desired, 

57specify ``False`` for the :paramref:`_schema.Column.autoincrement` flag, 

58on the first integer primary key column:: 

59 

60 m = MetaData() 

61 t = Table('t', m, 

62 Column('id', Integer, primary_key=True, autoincrement=False), 

63 Column('x', Integer)) 

64 m.create_all(engine) 

65 

66To add the ``IDENTITY`` keyword to a non-primary key column, specify 

67``True`` for the :paramref:`_schema.Column.autoincrement` flag on the desired 

68:class:`_schema.Column` object, and ensure that 

69:paramref:`_schema.Column.autoincrement` 

70is set to ``False`` on any integer primary key column:: 

71 

72 m = MetaData() 

73 t = Table('t', m, 

74 Column('id', Integer, primary_key=True, autoincrement=False), 

75 Column('x', Integer, autoincrement=True)) 

76 m.create_all(engine) 

77 

78.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct 

79 in a :class:`_schema.Column` to specify the start and increment 

80 parameters of an IDENTITY. These replace 

81 the use of the :class:`.Sequence` object in order to specify these values. 

82 

83.. deprecated:: 1.4 

84 

85 The ``mssql_identity_start`` and ``mssql_identity_increment`` parameters 

86 to :class:`_schema.Column` are deprecated and should we replaced by 

87 an :class:`_schema.Identity` object. Specifying both ways of configuring 

88 an IDENTITY will result in a compile error. 

89 These options are also no longer returned as part of the 

90 ``dialect_options`` key in :meth:`_reflection.Inspector.get_columns`. 

91 Use the information in the ``identity`` key instead. 

92 

93.. deprecated:: 1.3 

94 

95 The use of :class:`.Sequence` to specify IDENTITY characteristics is 

96 deprecated and will be removed in a future release. Please use 

97 the :class:`_schema.Identity` object parameters 

98 :paramref:`_schema.Identity.start` and 

99 :paramref:`_schema.Identity.increment`. 

100 

101.. versionchanged:: 1.4 Removed the ability to use a :class:`.Sequence` 

102 object to modify IDENTITY characteristics. :class:`.Sequence` objects 

103 now only manipulate true T-SQL SEQUENCE types. 

104 

105.. note:: 

106 

107 There can only be one IDENTITY column on the table. When using 

108 ``autoincrement=True`` to enable the IDENTITY keyword, SQLAlchemy does not 

109 guard against multiple columns specifying the option simultaneously. The 

110 SQL Server database will instead reject the ``CREATE TABLE`` statement. 

111 

112.. note:: 

113 

114 An INSERT statement which attempts to provide a value for a column that is 

115 marked with IDENTITY will be rejected by SQL Server. In order for the 

116 value to be accepted, a session-level option "SET IDENTITY_INSERT" must be 

117 enabled. The SQLAlchemy SQL Server dialect will perform this operation 

118 automatically when using a core :class:`_expression.Insert` 

119 construct; if the 

120 execution specifies a value for the IDENTITY column, the "IDENTITY_INSERT" 

121 option will be enabled for the span of that statement's invocation.However, 

122 this scenario is not high performing and should not be relied upon for 

123 normal use. If a table doesn't actually require IDENTITY behavior in its 

124 integer primary key column, the keyword should be disabled when creating 

125 the table by ensuring that ``autoincrement=False`` is set. 

126 

127Controlling "Start" and "Increment" 

128^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

129 

130Specific control over the "start" and "increment" values for 

131the ``IDENTITY`` generator are provided using the 

132:paramref:`_schema.Identity.start` and :paramref:`_schema.Identity.increment` 

133parameters passed to the :class:`_schema.Identity` object:: 

134 

135 from sqlalchemy import Table, Integer, Column, Identity 

136 

137 test = Table( 

138 'test', metadata, 

139 Column( 

140 'id', 

141 Integer, 

142 primary_key=True, 

143 Identity(start=100, increment=10) 

144 ), 

145 Column('name', String(20)) 

146 ) 

147 

148The CREATE TABLE for the above :class:`_schema.Table` object would be: 

149 

150.. sourcecode:: sql 

151 

152 CREATE TABLE test ( 

153 id INTEGER NOT NULL IDENTITY(100,10) PRIMARY KEY, 

154 name VARCHAR(20) NULL, 

155 ) 

156 

157.. note:: 

158 

159 The :class:`_schema.Identity` object supports many other parameter in 

160 addition to ``start`` and ``increment``. These are not supported by 

161 SQL Server and will be ignored when generating the CREATE TABLE ddl. 

162 

163.. versionchanged:: 1.3.19 The :class:`_schema.Identity` object is 

164 now used to affect the 

165 ``IDENTITY`` generator for a :class:`_schema.Column` under SQL Server. 

166 Previously, the :class:`.Sequence` object was used. As SQL Server now 

167 supports real sequences as a separate construct, :class:`.Sequence` will be 

168 functional in the normal way starting from SQLAlchemy version 1.4. 

169 

170 

171Using IDENTITY with Non-Integer numeric types 

172^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

173 

174SQL Server also allows ``IDENTITY`` to be used with ``NUMERIC`` columns. To 

175implement this pattern smoothly in SQLAlchemy, the primary datatype of the 

176column should remain as ``Integer``, however the underlying implementation 

177type deployed to the SQL Server database can be specified as ``Numeric`` using 

178:meth:`.TypeEngine.with_variant`:: 

179 

180 from sqlalchemy import Column 

181 from sqlalchemy import Integer 

182 from sqlalchemy import Numeric 

183 from sqlalchemy import String 

184 from sqlalchemy.ext.declarative import declarative_base 

185 

186 Base = declarative_base() 

187 

188 class TestTable(Base): 

189 __tablename__ = "test" 

190 id = Column( 

191 Integer().with_variant(Numeric(10, 0), "mssql"), 

192 primary_key=True, 

193 autoincrement=True, 

194 ) 

195 name = Column(String) 

196 

197In the above example, ``Integer().with_variant()`` provides clear usage 

198information that accurately describes the intent of the code. The general 

199restriction that ``autoincrement`` only applies to ``Integer`` is established 

200at the metadata level and not at the per-dialect level. 

201 

202When using the above pattern, the primary key identifier that comes back from 

203the insertion of a row, which is also the value that would be assigned to an 

204ORM object such as ``TestTable`` above, will be an instance of ``Decimal()`` 

205and not ``int`` when using SQL Server. The numeric return type of the 

206:class:`_types.Numeric` type can be changed to return floats by passing False 

207to :paramref:`_types.Numeric.asdecimal`. To normalize the return type of the 

208above ``Numeric(10, 0)`` to return Python ints (which also support "long" 

209integer values in Python 3), use :class:`_types.TypeDecorator` as follows:: 

210 

211 from sqlalchemy import TypeDecorator 

212 

213 class NumericAsInteger(TypeDecorator): 

214 '''normalize floating point return values into ints''' 

215 

216 impl = Numeric(10, 0, asdecimal=False) 

217 cache_ok = True 

218 

219 def process_result_value(self, value, dialect): 

220 if value is not None: 

221 value = int(value) 

222 return value 

223 

224 class TestTable(Base): 

225 __tablename__ = "test" 

226 id = Column( 

227 Integer().with_variant(NumericAsInteger, "mssql"), 

228 primary_key=True, 

229 autoincrement=True, 

230 ) 

231 name = Column(String) 

232 

233 

234INSERT behavior 

235^^^^^^^^^^^^^^^^ 

236 

237Handling of the ``IDENTITY`` column at INSERT time involves two key 

238techniques. The most common is being able to fetch the "last inserted value" 

239for a given ``IDENTITY`` column, a process which SQLAlchemy performs 

240implicitly in many cases, most importantly within the ORM. 

241 

242The process for fetching this value has several variants: 

243 

244* In the vast majority of cases, RETURNING is used in conjunction with INSERT 

245 statements on SQL Server in order to get newly generated primary key values: 

246 

247 .. sourcecode:: sql 

248 

249 INSERT INTO t (x) OUTPUT inserted.id VALUES (?) 

250 

251* When RETURNING is not available or has been disabled via 

252 ``implicit_returning=False``, either the ``scope_identity()`` function or 

253 the ``@@identity`` variable is used; behavior varies by backend: 

254 

255 * when using PyODBC, the phrase ``; select scope_identity()`` will be 

256 appended to the end of the INSERT statement; a second result set will be 

257 fetched in order to receive the value. Given a table as:: 

258 

259 t = Table('t', m, Column('id', Integer, primary_key=True), 

260 Column('x', Integer), 

261 implicit_returning=False) 

262 

263 an INSERT will look like: 

264 

265 .. sourcecode:: sql 

266 

267 INSERT INTO t (x) VALUES (?); select scope_identity() 

268 

269 * Other dialects such as pymssql will call upon 

270 ``SELECT scope_identity() AS lastrowid`` subsequent to an INSERT 

271 statement. If the flag ``use_scope_identity=False`` is passed to 

272 :func:`_sa.create_engine`, 

273 the statement ``SELECT @@identity AS lastrowid`` 

274 is used instead. 

275 

276A table that contains an ``IDENTITY`` column will prohibit an INSERT statement 

277that refers to the identity column explicitly. The SQLAlchemy dialect will 

278detect when an INSERT construct, created using a core 

279:func:`_expression.insert` 

280construct (not a plain string SQL), refers to the identity column, and 

281in this case will emit ``SET IDENTITY_INSERT ON`` prior to the insert 

282statement proceeding, and ``SET IDENTITY_INSERT OFF`` subsequent to the 

283execution. Given this example:: 

284 

285 m = MetaData() 

286 t = Table('t', m, Column('id', Integer, primary_key=True), 

287 Column('x', Integer)) 

288 m.create_all(engine) 

289 

290 with engine.begin() as conn: 

291 conn.execute(t.insert(), {'id': 1, 'x':1}, {'id':2, 'x':2}) 

292 

293The above column will be created with IDENTITY, however the INSERT statement 

294we emit is specifying explicit values. In the echo output we can see 

295how SQLAlchemy handles this: 

296 

297.. sourcecode:: sql 

298 

299 CREATE TABLE t ( 

300 id INTEGER NOT NULL IDENTITY(1,1), 

301 x INTEGER NULL, 

302 PRIMARY KEY (id) 

303 ) 

304 

305 COMMIT 

306 SET IDENTITY_INSERT t ON 

307 INSERT INTO t (id, x) VALUES (?, ?) 

308 ((1, 1), (2, 2)) 

309 SET IDENTITY_INSERT t OFF 

310 COMMIT 

311 

312 

313 

314This is an auxiliary use case suitable for testing and bulk insert scenarios. 

315 

316SEQUENCE support 

317---------------- 

318 

319The :class:`.Sequence` object now creates "real" sequences, i.e., 

320``CREATE SEQUENCE``. To provide compatibility with other dialects, 

321:class:`.Sequence` defaults to a start value of 1, even though the 

322T-SQL defaults is -9223372036854775808. 

323 

324.. versionadded:: 1.4.0 

325 

326MAX on VARCHAR / NVARCHAR 

327------------------------- 

328 

329SQL Server supports the special string "MAX" within the 

330:class:`_types.VARCHAR` and :class:`_types.NVARCHAR` datatypes, 

331to indicate "maximum length possible". The dialect currently handles this as 

332a length of "None" in the base type, rather than supplying a 

333dialect-specific version of these types, so that a base type 

334specified such as ``VARCHAR(None)`` can assume "unlengthed" behavior on 

335more than one backend without using dialect-specific types. 

336 

337To build a SQL Server VARCHAR or NVARCHAR with MAX length, use None:: 

338 

339 my_table = Table( 

340 'my_table', metadata, 

341 Column('my_data', VARCHAR(None)), 

342 Column('my_n_data', NVARCHAR(None)) 

343 ) 

344 

345 

346Collation Support 

347----------------- 

348 

349Character collations are supported by the base string types, 

350specified by the string argument "collation":: 

351 

352 from sqlalchemy import VARCHAR 

353 Column('login', VARCHAR(32, collation='Latin1_General_CI_AS')) 

354 

355When such a column is associated with a :class:`_schema.Table`, the 

356CREATE TABLE statement for this column will yield:: 

357 

358 login VARCHAR(32) COLLATE Latin1_General_CI_AS NULL 

359 

360LIMIT/OFFSET Support 

361-------------------- 

362 

363MSSQL has added support for LIMIT / OFFSET as of SQL Server 2012, via the 

364"OFFSET n ROWS" and "FETCH NEXT n ROWS" clauses. SQLAlchemy supports these 

365syntaxes automatically if SQL Server 2012 or greater is detected. 

366 

367.. versionchanged:: 1.4 support added for SQL Server "OFFSET n ROWS" and 

368 "FETCH NEXT n ROWS" syntax. 

369 

370For statements that specify only LIMIT and no OFFSET, all versions of SQL 

371Server support the TOP keyword. This syntax is used for all SQL Server 

372versions when no OFFSET clause is present. A statement such as:: 

373 

374 select(some_table).limit(5) 

375 

376will render similarly to:: 

377 

378 SELECT TOP 5 col1, col2.. FROM table 

379 

380For versions of SQL Server prior to SQL Server 2012, a statement that uses 

381LIMIT and OFFSET, or just OFFSET alone, will be rendered using the 

382``ROW_NUMBER()`` window function. A statement such as:: 

383 

384 select(some_table).order_by(some_table.c.col3).limit(5).offset(10) 

385 

386will render similarly to:: 

387 

388 SELECT anon_1.col1, anon_1.col2 FROM (SELECT col1, col2, 

389 ROW_NUMBER() OVER (ORDER BY col3) AS 

390 mssql_rn FROM table WHERE t.x = :x_1) AS 

391 anon_1 WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1 

392 

393Note that when using LIMIT and/or OFFSET, whether using the older 

394or newer SQL Server syntaxes, the statement must have an ORDER BY as well, 

395else a :class:`.CompileError` is raised. 

396 

397.. _mssql_isolation_level: 

398 

399Transaction Isolation Level 

400--------------------------- 

401 

402All SQL Server dialects support setting of transaction isolation level 

403both via a dialect-specific parameter 

404:paramref:`_sa.create_engine.isolation_level` 

405accepted by :func:`_sa.create_engine`, 

406as well as the :paramref:`.Connection.execution_options.isolation_level` 

407argument as passed to 

408:meth:`_engine.Connection.execution_options`. 

409This feature works by issuing the 

410command ``SET TRANSACTION ISOLATION LEVEL <level>`` for 

411each new connection. 

412 

413To set isolation level using :func:`_sa.create_engine`:: 

414 

415 engine = create_engine( 

416 "mssql+pyodbc://scott:tiger@ms_2008", 

417 isolation_level="REPEATABLE READ" 

418 ) 

419 

420To set using per-connection execution options:: 

421 

422 connection = engine.connect() 

423 connection = connection.execution_options( 

424 isolation_level="READ COMMITTED" 

425 ) 

426 

427Valid values for ``isolation_level`` include: 

428 

429* ``AUTOCOMMIT`` - pyodbc / pymssql-specific 

430* ``READ COMMITTED`` 

431* ``READ UNCOMMITTED`` 

432* ``REPEATABLE READ`` 

433* ``SERIALIZABLE`` 

434* ``SNAPSHOT`` - specific to SQL Server 

435 

436There are also more options for isolation level configurations, such as 

437"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply 

438different isolation level settings. See the discussion at 

439:ref:`dbapi_autocommit` for background. 

440 

441.. seealso:: 

442 

443 :ref:`dbapi_autocommit` 

444 

445.. _mssql_reset_on_return: 

446 

447Temporary Table / Resource Reset for Connection Pooling 

448------------------------------------------------------- 

449 

450The :class:`.QueuePool` connection pool implementation used 

451by the SQLAlchemy :class:`_sa.Engine` object includes 

452:ref:`reset on return <pool_reset_on_return>` behavior that will invoke 

453the DBAPI ``.rollback()`` method when connections are returned to the pool. 

454While this rollback will clear out the immediate state used by the previous 

455transaction, it does not cover a wider range of session-level state, including 

456temporary tables as well as other server state such as prepared statement 

457handles and statement caches. An undocumented SQL Server procedure known 

458as ``sp_reset_connection`` is known to be a workaround for this issue which 

459will reset most of the session state that builds up on a connection, including 

460temporary tables. 

461 

462To install ``sp_reset_connection`` as the means of performing reset-on-return, 

463the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated in the 

464example below (**requires SQLAlchemy 1.4.43 or greater**). The 

465:paramref:`_sa.create_engine.pool_reset_on_return` parameter is set to ``None`` 

466so that the custom scheme can replace the default behavior completely. The 

467custom hook implementation calls ``.rollback()`` in any case, as it's usually 

468important that the DBAPI's own tracking of commit/rollback will remain 

469consistent with the state of the transaction:: 

470 

471 from sqlalchemy import create_engine 

472 from sqlalchemy import event 

473 

474 mssql_engine = create_engine( 

475 "mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server", 

476 

477 # disable default reset-on-return scheme 

478 pool_reset_on_return=None, 

479 ) 

480 

481 

482 @event.listens_for(mssql_engine, "reset") 

483 def _reset_mssql(dbapi_connection, connection_record, reset_state): 

484 dbapi_connection.execute("{call sys.sp_reset_connection}") 

485 

486 # so that the DBAPI itself knows that the connection has been 

487 # reset 

488 dbapi_connection.rollback() 

489 

490.. versionchanged:: 1.4.43 Ensured the :meth:`.PoolEvents.reset` event 

491 is invoked for all "reset" occurrences, so that it's appropriate 

492 as a place for custom "reset" handlers. Previous schemes which 

493 use the :meth:`.PoolEvents.checkin` handler remain usable as well. 

494 

495.. seealso:: 

496 

497 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation 

498 

499Nullability 

500----------- 

501MSSQL has support for three levels of column nullability. The default 

502nullability allows nulls and is explicit in the CREATE TABLE 

503construct:: 

504 

505 name VARCHAR(20) NULL 

506 

507If ``nullable=None`` is specified then no specification is made. In 

508other words the database's configured default is used. This will 

509render:: 

510 

511 name VARCHAR(20) 

512 

513If ``nullable`` is ``True`` or ``False`` then the column will be 

514``NULL`` or ``NOT NULL`` respectively. 

515 

516Date / Time Handling 

517-------------------- 

518DATE and TIME are supported. Bind parameters are converted 

519to datetime.datetime() objects as required by most MSSQL drivers, 

520and results are processed from strings if needed. 

521The DATE and TIME types are not available for MSSQL 2005 and 

522previous - if a server version below 2008 is detected, DDL 

523for these types will be issued as DATETIME. 

524 

525.. _mssql_large_type_deprecation: 

526 

527Large Text/Binary Type Deprecation 

528---------------------------------- 

529 

530Per 

531`SQL Server 2012/2014 Documentation <https://technet.microsoft.com/en-us/library/ms187993.aspx>`_, 

532the ``NTEXT``, ``TEXT`` and ``IMAGE`` datatypes are to be removed from SQL 

533Server in a future release. SQLAlchemy normally relates these types to the 

534:class:`.UnicodeText`, :class:`_expression.TextClause` and 

535:class:`.LargeBinary` datatypes. 

536 

537In order to accommodate this change, a new flag ``deprecate_large_types`` 

538is added to the dialect, which will be automatically set based on detection 

539of the server version in use, if not otherwise set by the user. The 

540behavior of this flag is as follows: 

541 

542* When this flag is ``True``, the :class:`.UnicodeText`, 

543 :class:`_expression.TextClause` and 

544 :class:`.LargeBinary` datatypes, when used to render DDL, will render the 

545 types ``NVARCHAR(max)``, ``VARCHAR(max)``, and ``VARBINARY(max)``, 

546 respectively. This is a new behavior as of the addition of this flag. 

547 

548* When this flag is ``False``, the :class:`.UnicodeText`, 

549 :class:`_expression.TextClause` and 

550 :class:`.LargeBinary` datatypes, when used to render DDL, will render the 

551 types ``NTEXT``, ``TEXT``, and ``IMAGE``, 

552 respectively. This is the long-standing behavior of these types. 

553 

554* The flag begins with the value ``None``, before a database connection is 

555 established. If the dialect is used to render DDL without the flag being 

556 set, it is interpreted the same as ``False``. 

557 

558* On first connection, the dialect detects if SQL Server version 2012 or 

559 greater is in use; if the flag is still at ``None``, it sets it to ``True`` 

560 or ``False`` based on whether 2012 or greater is detected. 

561 

562* The flag can be set to either ``True`` or ``False`` when the dialect 

563 is created, typically via :func:`_sa.create_engine`:: 

564 

565 eng = create_engine("mssql+pymssql://user:pass@host/db", 

566 deprecate_large_types=True) 

567 

568* Complete control over whether the "old" or "new" types are rendered is 

569 available in all SQLAlchemy versions by using the UPPERCASE type objects 

570 instead: :class:`_types.NVARCHAR`, :class:`_types.VARCHAR`, 

571 :class:`_types.VARBINARY`, :class:`_types.TEXT`, :class:`_mssql.NTEXT`, 

572 :class:`_mssql.IMAGE` 

573 will always remain fixed and always output exactly that 

574 type. 

575 

576.. versionadded:: 1.0.0 

577 

578.. _multipart_schema_names: 

579 

580Multipart Schema Names 

581---------------------- 

582 

583SQL Server schemas sometimes require multiple parts to their "schema" 

584qualifier, that is, including the database name and owner name as separate 

585tokens, such as ``mydatabase.dbo.some_table``. These multipart names can be set 

586at once using the :paramref:`_schema.Table.schema` argument of 

587:class:`_schema.Table`:: 

588 

589 Table( 

590 "some_table", metadata, 

591 Column("q", String(50)), 

592 schema="mydatabase.dbo" 

593 ) 

594 

595When performing operations such as table or component reflection, a schema 

596argument that contains a dot will be split into separate 

597"database" and "owner" components in order to correctly query the SQL 

598Server information schema tables, as these two values are stored separately. 

599Additionally, when rendering the schema name for DDL or SQL, the two 

600components will be quoted separately for case sensitive names and other 

601special characters. Given an argument as below:: 

602 

603 Table( 

604 "some_table", metadata, 

605 Column("q", String(50)), 

606 schema="MyDataBase.dbo" 

607 ) 

608 

609The above schema would be rendered as ``[MyDataBase].dbo``, and also in 

610reflection, would be reflected using "dbo" as the owner and "MyDataBase" 

611as the database name. 

612 

613To control how the schema name is broken into database / owner, 

614specify brackets (which in SQL Server are quoting characters) in the name. 

615Below, the "owner" will be considered as ``MyDataBase.dbo`` and the 

616"database" will be None:: 

617 

618 Table( 

619 "some_table", metadata, 

620 Column("q", String(50)), 

621 schema="[MyDataBase.dbo]" 

622 ) 

623 

624To individually specify both database and owner name with special characters 

625or embedded dots, use two sets of brackets:: 

626 

627 Table( 

628 "some_table", metadata, 

629 Column("q", String(50)), 

630 schema="[MyDataBase.Period].[MyOwner.Dot]" 

631 ) 

632 

633 

634.. versionchanged:: 1.2 the SQL Server dialect now treats brackets as 

635 identifier delimiters splitting the schema into separate database 

636 and owner tokens, to allow dots within either name itself. 

637 

638.. _legacy_schema_rendering: 

639 

640Legacy Schema Mode 

641------------------ 

642 

643Very old versions of the MSSQL dialect introduced the behavior such that a 

644schema-qualified table would be auto-aliased when used in a 

645SELECT statement; given a table:: 

646 

647 account_table = Table( 

648 'account', metadata, 

649 Column('id', Integer, primary_key=True), 

650 Column('info', String(100)), 

651 schema="customer_schema" 

652 ) 

653 

654this legacy mode of rendering would assume that "customer_schema.account" 

655would not be accepted by all parts of the SQL statement, as illustrated 

656below:: 

657 

658 >>> eng = create_engine("mssql+pymssql://mydsn", legacy_schema_aliasing=True) 

659 >>> print(account_table.select().compile(eng)) 

660 SELECT account_1.id, account_1.info 

661 FROM customer_schema.account AS account_1 

662 

663This mode of behavior is now off by default, as it appears to have served 

664no purpose; however in the case that legacy applications rely upon it, 

665it is available using the ``legacy_schema_aliasing`` argument to 

666:func:`_sa.create_engine` as illustrated above. 

667 

668.. versionchanged:: 1.1 the ``legacy_schema_aliasing`` flag introduced 

669 in version 1.0.5 to allow disabling of legacy mode for schemas now 

670 defaults to False. 

671 

672.. deprecated:: 1.4 

673 

674 The ``legacy_schema_aliasing`` flag is now 

675 deprecated and will be removed in a future release. 

676 

677.. _mssql_indexes: 

678 

679Clustered Index Support 

680----------------------- 

681 

682The MSSQL dialect supports clustered indexes (and primary keys) via the 

683``mssql_clustered`` option. This option is available to :class:`.Index`, 

684:class:`.UniqueConstraint`. and :class:`.PrimaryKeyConstraint`. 

685 

686To generate a clustered index:: 

687 

688 Index("my_index", table.c.x, mssql_clustered=True) 

689 

690which renders the index as ``CREATE CLUSTERED INDEX my_index ON table (x)``. 

691 

692To generate a clustered primary key use:: 

693 

694 Table('my_table', metadata, 

695 Column('x', ...), 

696 Column('y', ...), 

697 PrimaryKeyConstraint("x", "y", mssql_clustered=True)) 

698 

699which will render the table, for example, as:: 

700 

701 CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL, 

702 PRIMARY KEY CLUSTERED (x, y)) 

703 

704Similarly, we can generate a clustered unique constraint using:: 

705 

706 Table('my_table', metadata, 

707 Column('x', ...), 

708 Column('y', ...), 

709 PrimaryKeyConstraint("x"), 

710 UniqueConstraint("y", mssql_clustered=True), 

711 ) 

712 

713To explicitly request a non-clustered primary key (for example, when 

714a separate clustered index is desired), use:: 

715 

716 Table('my_table', metadata, 

717 Column('x', ...), 

718 Column('y', ...), 

719 PrimaryKeyConstraint("x", "y", mssql_clustered=False)) 

720 

721which will render the table, for example, as:: 

722 

723 CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL, 

724 PRIMARY KEY NONCLUSTERED (x, y)) 

725 

726.. versionchanged:: 1.1 the ``mssql_clustered`` option now defaults 

727 to None, rather than False. ``mssql_clustered=False`` now explicitly 

728 renders the NONCLUSTERED clause, whereas None omits the CLUSTERED 

729 clause entirely, allowing SQL Server defaults to take effect. 

730 

731 

732MSSQL-Specific Index Options 

733----------------------------- 

734 

735In addition to clustering, the MSSQL dialect supports other special options 

736for :class:`.Index`. 

737 

738INCLUDE 

739^^^^^^^ 

740 

741The ``mssql_include`` option renders INCLUDE(colname) for the given string 

742names:: 

743 

744 Index("my_index", table.c.x, mssql_include=['y']) 

745 

746would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)`` 

747 

748.. _mssql_index_where: 

749 

750Filtered Indexes 

751^^^^^^^^^^^^^^^^ 

752 

753The ``mssql_where`` option renders WHERE(condition) for the given string 

754names:: 

755 

756 Index("my_index", table.c.x, mssql_where=table.c.x > 10) 

757 

758would render the index as ``CREATE INDEX my_index ON table (x) WHERE x > 10``. 

759 

760.. versionadded:: 1.3.4 

761 

762Index ordering 

763^^^^^^^^^^^^^^ 

764 

765Index ordering is available via functional expressions, such as:: 

766 

767 Index("my_index", table.c.x.desc()) 

768 

769would render the index as ``CREATE INDEX my_index ON table (x DESC)`` 

770 

771.. seealso:: 

772 

773 :ref:`schema_indexes_functional` 

774 

775Compatibility Levels 

776-------------------- 

777MSSQL supports the notion of setting compatibility levels at the 

778database level. This allows, for instance, to run a database that 

779is compatible with SQL2000 while running on a SQL2005 database 

780server. ``server_version_info`` will always return the database 

781server version information (in this case SQL2005) and not the 

782compatibility level information. Because of this, if running under 

783a backwards compatibility mode SQLAlchemy may attempt to use T-SQL 

784statements that are unable to be parsed by the database server. 

785 

786Triggers 

787-------- 

788 

789SQLAlchemy by default uses OUTPUT INSERTED to get at newly 

790generated primary key values via IDENTITY columns or other 

791server side defaults. MS-SQL does not 

792allow the usage of OUTPUT INSERTED on tables that have triggers. 

793To disable the usage of OUTPUT INSERTED on a per-table basis, 

794specify ``implicit_returning=False`` for each :class:`_schema.Table` 

795which has triggers:: 

796 

797 Table('mytable', metadata, 

798 Column('id', Integer, primary_key=True), 

799 # ..., 

800 implicit_returning=False 

801 ) 

802 

803Declarative form:: 

804 

805 class MyClass(Base): 

806 # ... 

807 __table_args__ = {'implicit_returning':False} 

808 

809 

810This option can also be specified engine-wide using the 

811``implicit_returning=False`` argument on :func:`_sa.create_engine`. 

812 

813.. _mssql_rowcount_versioning: 

814 

815Rowcount Support / ORM Versioning 

816--------------------------------- 

817 

818The SQL Server drivers may have limited ability to return the number 

819of rows updated from an UPDATE or DELETE statement. 

820 

821As of this writing, the PyODBC driver is not able to return a rowcount when 

822OUTPUT INSERTED is used. This impacts the SQLAlchemy ORM's versioning feature 

823in many cases where server-side value generators are in use in that while the 

824versioning operations can succeed, the ORM cannot always check that an UPDATE 

825or DELETE statement matched the number of rows expected, which is how it 

826verifies that the version identifier matched. When this condition occurs, a 

827warning will be emitted but the operation will proceed. 

828 

829The use of OUTPUT INSERTED can be disabled by setting the 

830:paramref:`_schema.Table.implicit_returning` flag to ``False`` on a particular 

831:class:`_schema.Table`, which in declarative looks like:: 

832 

833 class MyTable(Base): 

834 __tablename__ = 'mytable' 

835 id = Column(Integer, primary_key=True) 

836 stuff = Column(String(10)) 

837 timestamp = Column(TIMESTAMP(), default=text('DEFAULT')) 

838 __mapper_args__ = { 

839 'version_id_col': timestamp, 

840 'version_id_generator': False, 

841 } 

842 __table_args__ = { 

843 'implicit_returning': False 

844 } 

845 

846Enabling Snapshot Isolation 

847--------------------------- 

848 

849SQL Server has a default transaction 

850isolation mode that locks entire tables, and causes even mildly concurrent 

851applications to have long held locks and frequent deadlocks. 

852Enabling snapshot isolation for the database as a whole is recommended 

853for modern levels of concurrency support. This is accomplished via the 

854following ALTER DATABASE commands executed at the SQL prompt:: 

855 

856 ALTER DATABASE MyDatabase SET ALLOW_SNAPSHOT_ISOLATION ON 

857 

858 ALTER DATABASE MyDatabase SET READ_COMMITTED_SNAPSHOT ON 

859 

860Background on SQL Server snapshot isolation is available at 

861https://msdn.microsoft.com/en-us/library/ms175095.aspx. 

862 

863""" # noqa 

864 

865import codecs 

866import datetime 

867import operator 

868import re 

869 

870from . import information_schema as ischema 

871from .json import JSON 

872from .json import JSONIndexType 

873from .json import JSONPathType 

874from ... import exc 

875from ... import Identity 

876from ... import schema as sa_schema 

877from ... import Sequence 

878from ... import sql 

879from ... import text 

880from ... import types as sqltypes 

881from ... import util 

882from ...engine import cursor as _cursor 

883from ...engine import default 

884from ...engine import reflection 

885from ...sql import coercions 

886from ...sql import compiler 

887from ...sql import elements 

888from ...sql import expression 

889from ...sql import func 

890from ...sql import quoted_name 

891from ...sql import roles 

892from ...sql import util as sql_util 

893from ...types import BIGINT 

894from ...types import BINARY 

895from ...types import CHAR 

896from ...types import DATE 

897from ...types import DATETIME 

898from ...types import DECIMAL 

899from ...types import FLOAT 

900from ...types import INTEGER 

901from ...types import NCHAR 

902from ...types import NUMERIC 

903from ...types import NVARCHAR 

904from ...types import SMALLINT 

905from ...types import TEXT 

906from ...types import VARCHAR 

907from ...util import compat 

908from ...util import update_wrapper 

909from ...util.langhelpers import public_factory 

910 

911 

912# https://sqlserverbuilds.blogspot.com/ 

913MS_2017_VERSION = (14,) 

914MS_2016_VERSION = (13,) 

915MS_2014_VERSION = (12,) 

916MS_2012_VERSION = (11,) 

917MS_2008_VERSION = (10,) 

918MS_2005_VERSION = (9,) 

919MS_2000_VERSION = (8,) 

920 

921RESERVED_WORDS = set( 

922 [ 

923 "add", 

924 "all", 

925 "alter", 

926 "and", 

927 "any", 

928 "as", 

929 "asc", 

930 "authorization", 

931 "backup", 

932 "begin", 

933 "between", 

934 "break", 

935 "browse", 

936 "bulk", 

937 "by", 

938 "cascade", 

939 "case", 

940 "check", 

941 "checkpoint", 

942 "close", 

943 "clustered", 

944 "coalesce", 

945 "collate", 

946 "column", 

947 "commit", 

948 "compute", 

949 "constraint", 

950 "contains", 

951 "containstable", 

952 "continue", 

953 "convert", 

954 "create", 

955 "cross", 

956 "current", 

957 "current_date", 

958 "current_time", 

959 "current_timestamp", 

960 "current_user", 

961 "cursor", 

962 "database", 

963 "dbcc", 

964 "deallocate", 

965 "declare", 

966 "default", 

967 "delete", 

968 "deny", 

969 "desc", 

970 "disk", 

971 "distinct", 

972 "distributed", 

973 "double", 

974 "drop", 

975 "dump", 

976 "else", 

977 "end", 

978 "errlvl", 

979 "escape", 

980 "except", 

981 "exec", 

982 "execute", 

983 "exists", 

984 "exit", 

985 "external", 

986 "fetch", 

987 "file", 

988 "fillfactor", 

989 "for", 

990 "foreign", 

991 "freetext", 

992 "freetexttable", 

993 "from", 

994 "full", 

995 "function", 

996 "goto", 

997 "grant", 

998 "group", 

999 "having", 

1000 "holdlock", 

1001 "identity", 

1002 "identity_insert", 

1003 "identitycol", 

1004 "if", 

1005 "in", 

1006 "index", 

1007 "inner", 

1008 "insert", 

1009 "intersect", 

1010 "into", 

1011 "is", 

1012 "join", 

1013 "key", 

1014 "kill", 

1015 "left", 

1016 "like", 

1017 "lineno", 

1018 "load", 

1019 "merge", 

1020 "national", 

1021 "nocheck", 

1022 "nonclustered", 

1023 "not", 

1024 "null", 

1025 "nullif", 

1026 "of", 

1027 "off", 

1028 "offsets", 

1029 "on", 

1030 "open", 

1031 "opendatasource", 

1032 "openquery", 

1033 "openrowset", 

1034 "openxml", 

1035 "option", 

1036 "or", 

1037 "order", 

1038 "outer", 

1039 "over", 

1040 "percent", 

1041 "pivot", 

1042 "plan", 

1043 "precision", 

1044 "primary", 

1045 "print", 

1046 "proc", 

1047 "procedure", 

1048 "public", 

1049 "raiserror", 

1050 "read", 

1051 "readtext", 

1052 "reconfigure", 

1053 "references", 

1054 "replication", 

1055 "restore", 

1056 "restrict", 

1057 "return", 

1058 "revert", 

1059 "revoke", 

1060 "right", 

1061 "rollback", 

1062 "rowcount", 

1063 "rowguidcol", 

1064 "rule", 

1065 "save", 

1066 "schema", 

1067 "securityaudit", 

1068 "select", 

1069 "session_user", 

1070 "set", 

1071 "setuser", 

1072 "shutdown", 

1073 "some", 

1074 "statistics", 

1075 "system_user", 

1076 "table", 

1077 "tablesample", 

1078 "textsize", 

1079 "then", 

1080 "to", 

1081 "top", 

1082 "tran", 

1083 "transaction", 

1084 "trigger", 

1085 "truncate", 

1086 "tsequal", 

1087 "union", 

1088 "unique", 

1089 "unpivot", 

1090 "update", 

1091 "updatetext", 

1092 "use", 

1093 "user", 

1094 "values", 

1095 "varying", 

1096 "view", 

1097 "waitfor", 

1098 "when", 

1099 "where", 

1100 "while", 

1101 "with", 

1102 "writetext", 

1103 ] 

1104) 

1105 

1106 

1107class REAL(sqltypes.REAL): 

1108 __visit_name__ = "REAL" 

1109 

1110 def __init__(self, **kw): 

1111 # REAL is a synonym for FLOAT(24) on SQL server. 

1112 # it is only accepted as the word "REAL" in DDL, the numeric 

1113 # precision value is not allowed to be present 

1114 kw.setdefault("precision", 24) 

1115 super(REAL, self).__init__(**kw) 

1116 

1117 

1118class TINYINT(sqltypes.Integer): 

1119 __visit_name__ = "TINYINT" 

1120 

1121 

1122# MSSQL DATE/TIME types have varied behavior, sometimes returning 

1123# strings. MSDate/TIME check for everything, and always 

1124# filter bind parameters into datetime objects (required by pyodbc, 

1125# not sure about other dialects). 

1126 

1127 

1128class _MSDate(sqltypes.Date): 

1129 def bind_processor(self, dialect): 

1130 def process(value): 

1131 if type(value) == datetime.date: 

1132 return datetime.datetime(value.year, value.month, value.day) 

1133 else: 

1134 return value 

1135 

1136 return process 

1137 

1138 _reg = re.compile(r"(\d+)-(\d+)-(\d+)") 

1139 

1140 def result_processor(self, dialect, coltype): 

1141 def process(value): 

1142 if isinstance(value, datetime.datetime): 

1143 return value.date() 

1144 elif isinstance(value, util.string_types): 

1145 m = self._reg.match(value) 

1146 if not m: 

1147 raise ValueError( 

1148 "could not parse %r as a date value" % (value,) 

1149 ) 

1150 return datetime.date(*[int(x or 0) for x in m.groups()]) 

1151 else: 

1152 return value 

1153 

1154 return process 

1155 

1156 

1157class TIME(sqltypes.TIME): 

1158 def __init__(self, precision=None, **kwargs): 

1159 self.precision = precision 

1160 super(TIME, self).__init__() 

1161 

1162 __zero_date = datetime.date(1900, 1, 1) 

1163 

1164 def bind_processor(self, dialect): 

1165 def process(value): 

1166 if isinstance(value, datetime.datetime): 

1167 value = datetime.datetime.combine( 

1168 self.__zero_date, value.time() 

1169 ) 

1170 elif isinstance(value, datetime.time): 

1171 """issue #5339 

1172 per: https://github.com/mkleehammer/pyodbc/wiki/Tips-and-Tricks-by-Database-Platform#time-columns 

1173 pass TIME value as string 

1174 """ # noqa 

1175 value = str(value) 

1176 return value 

1177 

1178 return process 

1179 

1180 _reg = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d{0,6}))?") 

1181 

1182 def result_processor(self, dialect, coltype): 

1183 def process(value): 

1184 if isinstance(value, datetime.datetime): 

1185 return value.time() 

1186 elif isinstance(value, util.string_types): 

1187 m = self._reg.match(value) 

1188 if not m: 

1189 raise ValueError( 

1190 "could not parse %r as a time value" % (value,) 

1191 ) 

1192 return datetime.time(*[int(x or 0) for x in m.groups()]) 

1193 else: 

1194 return value 

1195 

1196 return process 

1197 

1198 

1199_MSTime = TIME 

1200 

1201 

1202class _BASETIMEIMPL(TIME): 

1203 __visit_name__ = "_BASETIMEIMPL" 

1204 

1205 

1206class _DateTimeBase(object): 

1207 def bind_processor(self, dialect): 

1208 def process(value): 

1209 if type(value) == datetime.date: 

1210 return datetime.datetime(value.year, value.month, value.day) 

1211 else: 

1212 return value 

1213 

1214 return process 

1215 

1216 

1217class _MSDateTime(_DateTimeBase, sqltypes.DateTime): 

1218 pass 

1219 

1220 

1221class SMALLDATETIME(_DateTimeBase, sqltypes.DateTime): 

1222 __visit_name__ = "SMALLDATETIME" 

1223 

1224 

1225class DATETIME2(_DateTimeBase, sqltypes.DateTime): 

1226 __visit_name__ = "DATETIME2" 

1227 

1228 def __init__(self, precision=None, **kw): 

1229 super(DATETIME2, self).__init__(**kw) 

1230 self.precision = precision 

1231 

1232 

1233class DATETIMEOFFSET(_DateTimeBase, sqltypes.DateTime): 

1234 __visit_name__ = "DATETIMEOFFSET" 

1235 

1236 def __init__(self, precision=None, **kw): 

1237 super(DATETIMEOFFSET, self).__init__(**kw) 

1238 self.precision = precision 

1239 

1240 

1241class _UnicodeLiteral(object): 

1242 def literal_processor(self, dialect): 

1243 def process(value): 

1244 

1245 value = value.replace("'", "''") 

1246 

1247 if dialect.identifier_preparer._double_percents: 

1248 value = value.replace("%", "%%") 

1249 

1250 return "N'%s'" % value 

1251 

1252 return process 

1253 

1254 

1255class _MSUnicode(_UnicodeLiteral, sqltypes.Unicode): 

1256 pass 

1257 

1258 

1259class _MSUnicodeText(_UnicodeLiteral, sqltypes.UnicodeText): 

1260 pass 

1261 

1262 

1263class TIMESTAMP(sqltypes._Binary): 

1264 """Implement the SQL Server TIMESTAMP type. 

1265 

1266 Note this is **completely different** than the SQL Standard 

1267 TIMESTAMP type, which is not supported by SQL Server. It 

1268 is a read-only datatype that does not support INSERT of values. 

1269 

1270 .. versionadded:: 1.2 

1271 

1272 .. seealso:: 

1273 

1274 :class:`_mssql.ROWVERSION` 

1275 

1276 """ 

1277 

1278 __visit_name__ = "TIMESTAMP" 

1279 

1280 # expected by _Binary to be present 

1281 length = None 

1282 

1283 def __init__(self, convert_int=False): 

1284 """Construct a TIMESTAMP or ROWVERSION type. 

1285 

1286 :param convert_int: if True, binary integer values will 

1287 be converted to integers on read. 

1288 

1289 .. versionadded:: 1.2 

1290 

1291 """ 

1292 self.convert_int = convert_int 

1293 

1294 def result_processor(self, dialect, coltype): 

1295 super_ = super(TIMESTAMP, self).result_processor(dialect, coltype) 

1296 if self.convert_int: 

1297 

1298 def process(value): 

1299 value = super_(value) 

1300 if value is not None: 

1301 # https://stackoverflow.com/a/30403242/34549 

1302 value = int(codecs.encode(value, "hex"), 16) 

1303 return value 

1304 

1305 return process 

1306 else: 

1307 return super_ 

1308 

1309 

1310class ROWVERSION(TIMESTAMP): 

1311 """Implement the SQL Server ROWVERSION type. 

1312 

1313 The ROWVERSION datatype is a SQL Server synonym for the TIMESTAMP 

1314 datatype, however current SQL Server documentation suggests using 

1315 ROWVERSION for new datatypes going forward. 

1316 

1317 The ROWVERSION datatype does **not** reflect (e.g. introspect) from the 

1318 database as itself; the returned datatype will be 

1319 :class:`_mssql.TIMESTAMP`. 

1320 

1321 This is a read-only datatype that does not support INSERT of values. 

1322 

1323 .. versionadded:: 1.2 

1324 

1325 .. seealso:: 

1326 

1327 :class:`_mssql.TIMESTAMP` 

1328 

1329 """ 

1330 

1331 __visit_name__ = "ROWVERSION" 

1332 

1333 

1334class NTEXT(sqltypes.UnicodeText): 

1335 

1336 """MSSQL NTEXT type, for variable-length unicode text up to 2^30 

1337 characters.""" 

1338 

1339 __visit_name__ = "NTEXT" 

1340 

1341 

1342class VARBINARY(sqltypes.VARBINARY, sqltypes.LargeBinary): 

1343 """The MSSQL VARBINARY type. 

1344 

1345 This type adds additional features to the core :class:`_types.VARBINARY` 

1346 type, including "deprecate_large_types" mode where 

1347 either ``VARBINARY(max)`` or IMAGE is rendered, as well as the SQL 

1348 Server ``FILESTREAM`` option. 

1349 

1350 .. versionadded:: 1.0.0 

1351 

1352 .. seealso:: 

1353 

1354 :ref:`mssql_large_type_deprecation` 

1355 

1356 """ 

1357 

1358 __visit_name__ = "VARBINARY" 

1359 

1360 def __init__(self, length=None, filestream=False): 

1361 """ 

1362 Construct a VARBINARY type. 

1363 

1364 :param length: optional, a length for the column for use in 

1365 DDL statements, for those binary types that accept a length, 

1366 such as the MySQL BLOB type. 

1367 

1368 :param filestream=False: if True, renders the ``FILESTREAM`` keyword 

1369 in the table definition. In this case ``length`` must be ``None`` 

1370 or ``'max'``. 

1371 

1372 .. versionadded:: 1.4.31 

1373 

1374 """ 

1375 

1376 self.filestream = filestream 

1377 if self.filestream and length not in (None, "max"): 

1378 raise ValueError( 

1379 "length must be None or 'max' when setting filestream" 

1380 ) 

1381 super(VARBINARY, self).__init__(length=length) 

1382 

1383 

1384class IMAGE(sqltypes.LargeBinary): 

1385 __visit_name__ = "IMAGE" 

1386 

1387 

1388class XML(sqltypes.Text): 

1389 """MSSQL XML type. 

1390 

1391 This is a placeholder type for reflection purposes that does not include 

1392 any Python-side datatype support. It also does not currently support 

1393 additional arguments, such as "CONTENT", "DOCUMENT", 

1394 "xml_schema_collection". 

1395 

1396 .. versionadded:: 1.1.11 

1397 

1398 """ 

1399 

1400 __visit_name__ = "XML" 

1401 

1402 

1403class BIT(sqltypes.Boolean): 

1404 """MSSQL BIT type. 

1405 

1406 Both pyodbc and pymssql return values from BIT columns as 

1407 Python <class 'bool'> so just subclass Boolean. 

1408 

1409 """ 

1410 

1411 __visit_name__ = "BIT" 

1412 

1413 

1414class MONEY(sqltypes.TypeEngine): 

1415 __visit_name__ = "MONEY" 

1416 

1417 

1418class SMALLMONEY(sqltypes.TypeEngine): 

1419 __visit_name__ = "SMALLMONEY" 

1420 

1421 

1422class UNIQUEIDENTIFIER(sqltypes.TypeEngine): 

1423 __visit_name__ = "UNIQUEIDENTIFIER" 

1424 

1425 

1426class SQL_VARIANT(sqltypes.TypeEngine): 

1427 __visit_name__ = "SQL_VARIANT" 

1428 

1429 

1430class TryCast(sql.elements.Cast): 

1431 """Represent a SQL Server TRY_CAST expression.""" 

1432 

1433 __visit_name__ = "try_cast" 

1434 

1435 stringify_dialect = "mssql" 

1436 inherit_cache = True 

1437 

1438 def __init__(self, *arg, **kw): 

1439 """Create a TRY_CAST expression. 

1440 

1441 :class:`.TryCast` is a subclass of SQLAlchemy's :class:`.Cast` 

1442 construct, and works in the same way, except that the SQL expression 

1443 rendered is "TRY_CAST" rather than "CAST":: 

1444 

1445 from sqlalchemy import select 

1446 from sqlalchemy import Numeric 

1447 from sqlalchemy.dialects.mssql import try_cast 

1448 

1449 stmt = select( 

1450 try_cast(product_table.c.unit_price, Numeric(10, 4)) 

1451 ) 

1452 

1453 The above would render:: 

1454 

1455 SELECT TRY_CAST (product_table.unit_price AS NUMERIC(10, 4)) 

1456 FROM product_table 

1457 

1458 .. versionadded:: 1.3.7 

1459 

1460 """ 

1461 super(TryCast, self).__init__(*arg, **kw) 

1462 

1463 

1464try_cast = public_factory(TryCast, ".dialects.mssql.try_cast") 

1465 

1466# old names. 

1467MSDateTime = _MSDateTime 

1468MSDate = _MSDate 

1469MSReal = REAL 

1470MSTinyInteger = TINYINT 

1471MSTime = TIME 

1472MSSmallDateTime = SMALLDATETIME 

1473MSDateTime2 = DATETIME2 

1474MSDateTimeOffset = DATETIMEOFFSET 

1475MSText = TEXT 

1476MSNText = NTEXT 

1477MSString = VARCHAR 

1478MSNVarchar = NVARCHAR 

1479MSChar = CHAR 

1480MSNChar = NCHAR 

1481MSBinary = BINARY 

1482MSVarBinary = VARBINARY 

1483MSImage = IMAGE 

1484MSBit = BIT 

1485MSMoney = MONEY 

1486MSSmallMoney = SMALLMONEY 

1487MSUniqueIdentifier = UNIQUEIDENTIFIER 

1488MSVariant = SQL_VARIANT 

1489 

1490ischema_names = { 

1491 "int": INTEGER, 

1492 "bigint": BIGINT, 

1493 "smallint": SMALLINT, 

1494 "tinyint": TINYINT, 

1495 "varchar": VARCHAR, 

1496 "nvarchar": NVARCHAR, 

1497 "char": CHAR, 

1498 "nchar": NCHAR, 

1499 "text": TEXT, 

1500 "ntext": NTEXT, 

1501 "decimal": DECIMAL, 

1502 "numeric": NUMERIC, 

1503 "float": FLOAT, 

1504 "datetime": DATETIME, 

1505 "datetime2": DATETIME2, 

1506 "datetimeoffset": DATETIMEOFFSET, 

1507 "date": DATE, 

1508 "time": TIME, 

1509 "smalldatetime": SMALLDATETIME, 

1510 "binary": BINARY, 

1511 "varbinary": VARBINARY, 

1512 "bit": BIT, 

1513 "real": REAL, 

1514 "image": IMAGE, 

1515 "xml": XML, 

1516 "timestamp": TIMESTAMP, 

1517 "money": MONEY, 

1518 "smallmoney": SMALLMONEY, 

1519 "uniqueidentifier": UNIQUEIDENTIFIER, 

1520 "sql_variant": SQL_VARIANT, 

1521} 

1522 

1523 

1524class MSTypeCompiler(compiler.GenericTypeCompiler): 

1525 def _extend(self, spec, type_, length=None): 

1526 """Extend a string-type declaration with standard SQL 

1527 COLLATE annotations. 

1528 

1529 """ 

1530 

1531 if getattr(type_, "collation", None): 

1532 collation = "COLLATE %s" % type_.collation 

1533 else: 

1534 collation = None 

1535 

1536 if not length: 

1537 length = type_.length 

1538 

1539 if length: 

1540 spec = spec + "(%s)" % length 

1541 

1542 return " ".join([c for c in (spec, collation) if c is not None]) 

1543 

1544 def visit_FLOAT(self, type_, **kw): 

1545 precision = getattr(type_, "precision", None) 

1546 if precision is None: 

1547 return "FLOAT" 

1548 else: 

1549 return "FLOAT(%(precision)s)" % {"precision": precision} 

1550 

1551 def visit_TINYINT(self, type_, **kw): 

1552 return "TINYINT" 

1553 

1554 def visit_TIME(self, type_, **kw): 

1555 precision = getattr(type_, "precision", None) 

1556 if precision is not None: 

1557 return "TIME(%s)" % precision 

1558 else: 

1559 return "TIME" 

1560 

1561 def visit_TIMESTAMP(self, type_, **kw): 

1562 return "TIMESTAMP" 

1563 

1564 def visit_ROWVERSION(self, type_, **kw): 

1565 return "ROWVERSION" 

1566 

1567 def visit_datetime(self, type_, **kw): 

1568 if type_.timezone: 

1569 return self.visit_DATETIMEOFFSET(type_, **kw) 

1570 else: 

1571 return self.visit_DATETIME(type_, **kw) 

1572 

1573 def visit_DATETIMEOFFSET(self, type_, **kw): 

1574 precision = getattr(type_, "precision", None) 

1575 if precision is not None: 

1576 return "DATETIMEOFFSET(%s)" % type_.precision 

1577 else: 

1578 return "DATETIMEOFFSET" 

1579 

1580 def visit_DATETIME2(self, type_, **kw): 

1581 precision = getattr(type_, "precision", None) 

1582 if precision is not None: 

1583 return "DATETIME2(%s)" % precision 

1584 else: 

1585 return "DATETIME2" 

1586 

1587 def visit_SMALLDATETIME(self, type_, **kw): 

1588 return "SMALLDATETIME" 

1589 

1590 def visit_unicode(self, type_, **kw): 

1591 return self.visit_NVARCHAR(type_, **kw) 

1592 

1593 def visit_text(self, type_, **kw): 

1594 if self.dialect.deprecate_large_types: 

1595 return self.visit_VARCHAR(type_, **kw) 

1596 else: 

1597 return self.visit_TEXT(type_, **kw) 

1598 

1599 def visit_unicode_text(self, type_, **kw): 

1600 if self.dialect.deprecate_large_types: 

1601 return self.visit_NVARCHAR(type_, **kw) 

1602 else: 

1603 return self.visit_NTEXT(type_, **kw) 

1604 

1605 def visit_NTEXT(self, type_, **kw): 

1606 return self._extend("NTEXT", type_) 

1607 

1608 def visit_TEXT(self, type_, **kw): 

1609 return self._extend("TEXT", type_) 

1610 

1611 def visit_VARCHAR(self, type_, **kw): 

1612 return self._extend("VARCHAR", type_, length=type_.length or "max") 

1613 

1614 def visit_CHAR(self, type_, **kw): 

1615 return self._extend("CHAR", type_) 

1616 

1617 def visit_NCHAR(self, type_, **kw): 

1618 return self._extend("NCHAR", type_) 

1619 

1620 def visit_NVARCHAR(self, type_, **kw): 

1621 return self._extend("NVARCHAR", type_, length=type_.length or "max") 

1622 

1623 def visit_date(self, type_, **kw): 

1624 if self.dialect.server_version_info < MS_2008_VERSION: 

1625 return self.visit_DATETIME(type_, **kw) 

1626 else: 

1627 return self.visit_DATE(type_, **kw) 

1628 

1629 def visit__BASETIMEIMPL(self, type_, **kw): 

1630 return self.visit_time(type_, **kw) 

1631 

1632 def visit_time(self, type_, **kw): 

1633 if self.dialect.server_version_info < MS_2008_VERSION: 

1634 return self.visit_DATETIME(type_, **kw) 

1635 else: 

1636 return self.visit_TIME(type_, **kw) 

1637 

1638 def visit_large_binary(self, type_, **kw): 

1639 if self.dialect.deprecate_large_types: 

1640 return self.visit_VARBINARY(type_, **kw) 

1641 else: 

1642 return self.visit_IMAGE(type_, **kw) 

1643 

1644 def visit_IMAGE(self, type_, **kw): 

1645 return "IMAGE" 

1646 

1647 def visit_XML(self, type_, **kw): 

1648 return "XML" 

1649 

1650 def visit_VARBINARY(self, type_, **kw): 

1651 text = self._extend("VARBINARY", type_, length=type_.length or "max") 

1652 if getattr(type_, "filestream", False): 

1653 text += " FILESTREAM" 

1654 return text 

1655 

1656 def visit_boolean(self, type_, **kw): 

1657 return self.visit_BIT(type_) 

1658 

1659 def visit_BIT(self, type_, **kw): 

1660 return "BIT" 

1661 

1662 def visit_JSON(self, type_, **kw): 

1663 # this is a bit of a break with SQLAlchemy's convention of 

1664 # "UPPERCASE name goes to UPPERCASE type name with no modification" 

1665 return self._extend("NVARCHAR", type_, length="max") 

1666 

1667 def visit_MONEY(self, type_, **kw): 

1668 return "MONEY" 

1669 

1670 def visit_SMALLMONEY(self, type_, **kw): 

1671 return "SMALLMONEY" 

1672 

1673 def visit_UNIQUEIDENTIFIER(self, type_, **kw): 

1674 return "UNIQUEIDENTIFIER" 

1675 

1676 def visit_SQL_VARIANT(self, type_, **kw): 

1677 return "SQL_VARIANT" 

1678 

1679 

1680class MSExecutionContext(default.DefaultExecutionContext): 

1681 _enable_identity_insert = False 

1682 _select_lastrowid = False 

1683 _lastrowid = None 

1684 _rowcount = None 

1685 

1686 def _opt_encode(self, statement): 

1687 

1688 if not self.dialect.supports_unicode_statements: 

1689 encoded = self.dialect._encoder(statement)[0] 

1690 else: 

1691 encoded = statement 

1692 

1693 if self.compiled and self.compiled.schema_translate_map: 

1694 

1695 rst = self.compiled.preparer._render_schema_translates 

1696 encoded = rst(encoded, self.compiled.schema_translate_map) 

1697 

1698 return encoded 

1699 

1700 def pre_exec(self): 

1701 """Activate IDENTITY_INSERT if needed.""" 

1702 

1703 if self.isinsert: 

1704 tbl = self.compiled.compile_state.dml_table 

1705 id_column = tbl._autoincrement_column 

1706 insert_has_identity = (id_column is not None) and ( 

1707 not isinstance(id_column.default, Sequence) 

1708 ) 

1709 

1710 if insert_has_identity: 

1711 compile_state = self.compiled.dml_compile_state 

1712 self._enable_identity_insert = ( 

1713 id_column.key in self.compiled_parameters[0] 

1714 ) or ( 

1715 compile_state._dict_parameters 

1716 and (id_column.key in compile_state._insert_col_keys) 

1717 ) 

1718 

1719 else: 

1720 self._enable_identity_insert = False 

1721 

1722 self._select_lastrowid = ( 

1723 not self.compiled.inline 

1724 and insert_has_identity 

1725 and not self.compiled.returning 

1726 and not self._enable_identity_insert 

1727 and not self.executemany 

1728 ) 

1729 

1730 if self._enable_identity_insert: 

1731 self.root_connection._cursor_execute( 

1732 self.cursor, 

1733 self._opt_encode( 

1734 "SET IDENTITY_INSERT %s ON" 

1735 % self.identifier_preparer.format_table(tbl) 

1736 ), 

1737 (), 

1738 self, 

1739 ) 

1740 

1741 def post_exec(self): 

1742 """Disable IDENTITY_INSERT if enabled.""" 

1743 

1744 conn = self.root_connection 

1745 

1746 if self.isinsert or self.isupdate or self.isdelete: 

1747 self._rowcount = self.cursor.rowcount 

1748 

1749 if self._select_lastrowid: 

1750 if self.dialect.use_scope_identity: 

1751 conn._cursor_execute( 

1752 self.cursor, 

1753 "SELECT scope_identity() AS lastrowid", 

1754 (), 

1755 self, 

1756 ) 

1757 else: 

1758 conn._cursor_execute( 

1759 self.cursor, "SELECT @@identity AS lastrowid", (), self 

1760 ) 

1761 # fetchall() ensures the cursor is consumed without closing it 

1762 row = self.cursor.fetchall()[0] 

1763 self._lastrowid = int(row[0]) 

1764 

1765 elif ( 

1766 self.isinsert or self.isupdate or self.isdelete 

1767 ) and self.compiled.returning: 

1768 self.cursor_fetch_strategy = ( 

1769 _cursor.FullyBufferedCursorFetchStrategy( 

1770 self.cursor, 

1771 self.cursor.description, 

1772 self.cursor.fetchall(), 

1773 ) 

1774 ) 

1775 

1776 if self._enable_identity_insert: 

1777 conn._cursor_execute( 

1778 self.cursor, 

1779 self._opt_encode( 

1780 "SET IDENTITY_INSERT %s OFF" 

1781 % self.identifier_preparer.format_table( 

1782 self.compiled.compile_state.dml_table 

1783 ) 

1784 ), 

1785 (), 

1786 self, 

1787 ) 

1788 

1789 def get_lastrowid(self): 

1790 return self._lastrowid 

1791 

1792 @property 

1793 def rowcount(self): 

1794 if self._rowcount is not None: 

1795 return self._rowcount 

1796 else: 

1797 return self.cursor.rowcount 

1798 

1799 def handle_dbapi_exception(self, e): 

1800 if self._enable_identity_insert: 

1801 try: 

1802 self.cursor.execute( 

1803 self._opt_encode( 

1804 "SET IDENTITY_INSERT %s OFF" 

1805 % self.identifier_preparer.format_table( 

1806 self.compiled.compile_state.dml_table 

1807 ) 

1808 ) 

1809 ) 

1810 except Exception: 

1811 pass 

1812 

1813 def fire_sequence(self, seq, type_): 

1814 return self._execute_scalar( 

1815 ( 

1816 "SELECT NEXT VALUE FOR %s" 

1817 % self.identifier_preparer.format_sequence(seq) 

1818 ), 

1819 type_, 

1820 ) 

1821 

1822 def get_insert_default(self, column): 

1823 if ( 

1824 isinstance(column, sa_schema.Column) 

1825 and column is column.table._autoincrement_column 

1826 and isinstance(column.default, sa_schema.Sequence) 

1827 and column.default.optional 

1828 ): 

1829 return None 

1830 return super(MSExecutionContext, self).get_insert_default(column) 

1831 

1832 

1833class MSSQLCompiler(compiler.SQLCompiler): 

1834 returning_precedes_values = True 

1835 

1836 extract_map = util.update_copy( 

1837 compiler.SQLCompiler.extract_map, 

1838 { 

1839 "doy": "dayofyear", 

1840 "dow": "weekday", 

1841 "milliseconds": "millisecond", 

1842 "microseconds": "microsecond", 

1843 }, 

1844 ) 

1845 

1846 def __init__(self, *args, **kwargs): 

1847 self.tablealiases = {} 

1848 super(MSSQLCompiler, self).__init__(*args, **kwargs) 

1849 

1850 def _with_legacy_schema_aliasing(fn): 

1851 def decorate(self, *arg, **kw): 

1852 if self.dialect.legacy_schema_aliasing: 

1853 return fn(self, *arg, **kw) 

1854 else: 

1855 super_ = getattr(super(MSSQLCompiler, self), fn.__name__) 

1856 return super_(*arg, **kw) 

1857 

1858 return decorate 

1859 

1860 def visit_now_func(self, fn, **kw): 

1861 return "CURRENT_TIMESTAMP" 

1862 

1863 def visit_current_date_func(self, fn, **kw): 

1864 return "GETDATE()" 

1865 

1866 def visit_length_func(self, fn, **kw): 

1867 return "LEN%s" % self.function_argspec(fn, **kw) 

1868 

1869 def visit_char_length_func(self, fn, **kw): 

1870 return "LEN%s" % self.function_argspec(fn, **kw) 

1871 

1872 def visit_concat_op_binary(self, binary, operator, **kw): 

1873 return "%s + %s" % ( 

1874 self.process(binary.left, **kw), 

1875 self.process(binary.right, **kw), 

1876 ) 

1877 

1878 def visit_true(self, expr, **kw): 

1879 return "1" 

1880 

1881 def visit_false(self, expr, **kw): 

1882 return "0" 

1883 

1884 def visit_match_op_binary(self, binary, operator, **kw): 

1885 return "CONTAINS (%s, %s)" % ( 

1886 self.process(binary.left, **kw), 

1887 self.process(binary.right, **kw), 

1888 ) 

1889 

1890 def get_select_precolumns(self, select, **kw): 

1891 """MS-SQL puts TOP, it's version of LIMIT here""" 

1892 

1893 s = super(MSSQLCompiler, self).get_select_precolumns(select, **kw) 

1894 

1895 if select._has_row_limiting_clause and self._use_top(select): 

1896 # ODBC drivers and possibly others 

1897 # don't support bind params in the SELECT clause on SQL Server. 

1898 # so have to use literal here. 

1899 kw["literal_execute"] = True 

1900 s += "TOP %s " % self.process( 

1901 self._get_limit_or_fetch(select), **kw 

1902 ) 

1903 if select._fetch_clause is not None: 

1904 if select._fetch_clause_options["percent"]: 

1905 s += "PERCENT " 

1906 if select._fetch_clause_options["with_ties"]: 

1907 s += "WITH TIES " 

1908 

1909 return s 

1910 

1911 def get_from_hint_text(self, table, text): 

1912 return text 

1913 

1914 def get_crud_hint_text(self, table, text): 

1915 return text 

1916 

1917 def _get_limit_or_fetch(self, select): 

1918 if select._fetch_clause is None: 

1919 return select._limit_clause 

1920 else: 

1921 return select._fetch_clause 

1922 

1923 def _use_top(self, select): 

1924 return (select._offset_clause is None) and ( 

1925 select._simple_int_clause(select._limit_clause) 

1926 or ( 

1927 # limit can use TOP with is by itself. fetch only uses TOP 

1928 # when it needs to because of PERCENT and/or WITH TIES 

1929 select._simple_int_clause(select._fetch_clause) 

1930 and ( 

1931 select._fetch_clause_options["percent"] 

1932 or select._fetch_clause_options["with_ties"] 

1933 ) 

1934 ) 

1935 ) 

1936 

1937 def fetch_clause(self, cs, **kwargs): 

1938 return "" 

1939 

1940 def limit_clause(self, cs, **kwargs): 

1941 return "" 

1942 

1943 def _check_can_use_fetch_limit(self, select): 

1944 # to use ROW_NUMBER(), an ORDER BY is required. 

1945 # OFFSET are FETCH are options of the ORDER BY clause 

1946 if not select._order_by_clause.clauses: 

1947 raise exc.CompileError( 

1948 "MSSQL requires an order_by when " 

1949 "using an OFFSET or a non-simple " 

1950 "LIMIT clause" 

1951 ) 

1952 

1953 if select._fetch_clause_options is not None and ( 

1954 select._fetch_clause_options["percent"] 

1955 or select._fetch_clause_options["with_ties"] 

1956 ): 

1957 raise exc.CompileError( 

1958 "MSSQL needs TOP to use PERCENT and/or WITH TIES. " 

1959 "Only simple fetch without offset can be used." 

1960 ) 

1961 

1962 def _row_limit_clause(self, select, **kw): 

1963 """MSSQL 2012 supports OFFSET/FETCH operators 

1964 Use it instead subquery with row_number 

1965 

1966 """ 

1967 

1968 if self.dialect._supports_offset_fetch and not self._use_top(select): 

1969 self._check_can_use_fetch_limit(select) 

1970 

1971 text = "" 

1972 

1973 if select._offset_clause is not None: 

1974 offset_str = self.process(select._offset_clause, **kw) 

1975 else: 

1976 offset_str = "0" 

1977 text += "\n OFFSET %s ROWS" % offset_str 

1978 

1979 limit = self._get_limit_or_fetch(select) 

1980 

1981 if limit is not None: 

1982 text += "\n FETCH FIRST %s ROWS ONLY" % self.process( 

1983 limit, **kw 

1984 ) 

1985 return text 

1986 else: 

1987 return "" 

1988 

1989 def visit_try_cast(self, element, **kw): 

1990 return "TRY_CAST (%s AS %s)" % ( 

1991 self.process(element.clause, **kw), 

1992 self.process(element.typeclause, **kw), 

1993 ) 

1994 

1995 def translate_select_structure(self, select_stmt, **kwargs): 

1996 """Look for ``LIMIT`` and OFFSET in a select statement, and if 

1997 so tries to wrap it in a subquery with ``row_number()`` criterion. 

1998 MSSQL 2012 and above are excluded 

1999 

2000 """ 

2001 select = select_stmt 

2002 

2003 if ( 

2004 select._has_row_limiting_clause 

2005 and not self.dialect._supports_offset_fetch 

2006 and not self._use_top(select) 

2007 and not getattr(select, "_mssql_visit", None) 

2008 ): 

2009 self._check_can_use_fetch_limit(select) 

2010 

2011 _order_by_clauses = [ 

2012 sql_util.unwrap_label_reference(elem) 

2013 for elem in select._order_by_clause.clauses 

2014 ] 

2015 

2016 limit_clause = self._get_limit_or_fetch(select) 

2017 offset_clause = select._offset_clause 

2018 

2019 select = select._generate() 

2020 select._mssql_visit = True 

2021 select = ( 

2022 select.add_columns( 

2023 sql.func.ROW_NUMBER() 

2024 .over(order_by=_order_by_clauses) 

2025 .label("mssql_rn") 

2026 ) 

2027 .order_by(None) 

2028 .alias() 

2029 ) 

2030 

2031 mssql_rn = sql.column("mssql_rn") 

2032 limitselect = sql.select( 

2033 *[c for c in select.c if c.key != "mssql_rn"] 

2034 ) 

2035 if offset_clause is not None: 

2036 limitselect = limitselect.where(mssql_rn > offset_clause) 

2037 if limit_clause is not None: 

2038 limitselect = limitselect.where( 

2039 mssql_rn <= (limit_clause + offset_clause) 

2040 ) 

2041 else: 

2042 limitselect = limitselect.where(mssql_rn <= (limit_clause)) 

2043 return limitselect 

2044 else: 

2045 return select 

2046 

2047 @_with_legacy_schema_aliasing 

2048 def visit_table(self, table, mssql_aliased=False, iscrud=False, **kwargs): 

2049 if mssql_aliased is table or iscrud: 

2050 return super(MSSQLCompiler, self).visit_table(table, **kwargs) 

2051 

2052 # alias schema-qualified tables 

2053 alias = self._schema_aliased_table(table) 

2054 if alias is not None: 

2055 return self.process(alias, mssql_aliased=table, **kwargs) 

2056 else: 

2057 return super(MSSQLCompiler, self).visit_table(table, **kwargs) 

2058 

2059 @_with_legacy_schema_aliasing 

2060 def visit_alias(self, alias, **kw): 

2061 # translate for schema-qualified table aliases 

2062 kw["mssql_aliased"] = alias.element 

2063 return super(MSSQLCompiler, self).visit_alias(alias, **kw) 

2064 

2065 @_with_legacy_schema_aliasing 

2066 def visit_column(self, column, add_to_result_map=None, **kw): 

2067 if ( 

2068 column.table is not None 

2069 and (not self.isupdate and not self.isdelete) 

2070 or self.is_subquery() 

2071 ): 

2072 # translate for schema-qualified table aliases 

2073 t = self._schema_aliased_table(column.table) 

2074 if t is not None: 

2075 converted = elements._corresponding_column_or_error(t, column) 

2076 if add_to_result_map is not None: 

2077 add_to_result_map( 

2078 column.name, 

2079 column.name, 

2080 (column, column.name, column.key), 

2081 column.type, 

2082 ) 

2083 

2084 return super(MSSQLCompiler, self).visit_column(converted, **kw) 

2085 

2086 return super(MSSQLCompiler, self).visit_column( 

2087 column, add_to_result_map=add_to_result_map, **kw 

2088 ) 

2089 

2090 def _schema_aliased_table(self, table): 

2091 if getattr(table, "schema", None) is not None: 

2092 if table not in self.tablealiases: 

2093 self.tablealiases[table] = table.alias() 

2094 return self.tablealiases[table] 

2095 else: 

2096 return None 

2097 

2098 def visit_extract(self, extract, **kw): 

2099 field = self.extract_map.get(extract.field, extract.field) 

2100 return "DATEPART(%s, %s)" % (field, self.process(extract.expr, **kw)) 

2101 

2102 def visit_savepoint(self, savepoint_stmt): 

2103 return "SAVE TRANSACTION %s" % self.preparer.format_savepoint( 

2104 savepoint_stmt 

2105 ) 

2106 

2107 def visit_rollback_to_savepoint(self, savepoint_stmt): 

2108 return "ROLLBACK TRANSACTION %s" % self.preparer.format_savepoint( 

2109 savepoint_stmt 

2110 ) 

2111 

2112 def visit_binary(self, binary, **kwargs): 

2113 """Move bind parameters to the right-hand side of an operator, where 

2114 possible. 

2115 

2116 """ 

2117 if ( 

2118 isinstance(binary.left, expression.BindParameter) 

2119 and binary.operator == operator.eq 

2120 and not isinstance(binary.right, expression.BindParameter) 

2121 ): 

2122 return self.process( 

2123 expression.BinaryExpression( 

2124 binary.right, binary.left, binary.operator 

2125 ), 

2126 **kwargs 

2127 ) 

2128 return super(MSSQLCompiler, self).visit_binary(binary, **kwargs) 

2129 

2130 def returning_clause(self, stmt, returning_cols): 

2131 # SQL server returning clause requires that the columns refer to 

2132 # the virtual table names "inserted" or "deleted". Here, we make 

2133 # a simple alias of our table with that name, and then adapt the 

2134 # columns we have from the list of RETURNING columns to that new name 

2135 # so that they render as "inserted.<colname>" / "deleted.<colname>". 

2136 

2137 if self.isinsert or self.isupdate: 

2138 target = stmt.table.alias("inserted") 

2139 else: 

2140 target = stmt.table.alias("deleted") 

2141 

2142 adapter = sql_util.ClauseAdapter(target) 

2143 

2144 # adapter.traverse() takes a column from our target table and returns 

2145 # the one that is linked to the "inserted" / "deleted" tables. So in 

2146 # order to retrieve these values back from the result (e.g. like 

2147 # row[column]), tell the compiler to also add the original unadapted 

2148 # column to the result map. Before #4877, these were (unknowingly) 

2149 # falling back using string name matching in the result set which 

2150 # necessarily used an expensive KeyError in order to match. 

2151 

2152 columns = [ 

2153 self._label_returning_column( 

2154 stmt, 

2155 adapter.traverse(c), 

2156 {"result_map_targets": (c,)}, 

2157 fallback_label_name=c._non_anon_label, 

2158 ) 

2159 for c in expression._select_iterables(returning_cols) 

2160 ] 

2161 

2162 return "OUTPUT " + ", ".join(columns) 

2163 

2164 def get_cte_preamble(self, recursive): 

2165 # SQL Server finds it too inconvenient to accept 

2166 # an entirely optional, SQL standard specified, 

2167 # "RECURSIVE" word with their "WITH", 

2168 # so here we go 

2169 return "WITH" 

2170 

2171 def label_select_column(self, select, column, asfrom): 

2172 if isinstance(column, expression.Function): 

2173 return column.label(None) 

2174 else: 

2175 return super(MSSQLCompiler, self).label_select_column( 

2176 select, column, asfrom 

2177 ) 

2178 

2179 def for_update_clause(self, select, **kw): 

2180 # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which 

2181 # SQLAlchemy doesn't use 

2182 return "" 

2183 

2184 def order_by_clause(self, select, **kw): 

2185 # MSSQL only allows ORDER BY in subqueries if there is a LIMIT 

2186 if ( 

2187 self.is_subquery() 

2188 and not select._limit 

2189 and ( 

2190 select._offset is None 

2191 or not self.dialect._supports_offset_fetch 

2192 ) 

2193 ): 

2194 # avoid processing the order by clause if we won't end up 

2195 # using it, because we don't want all the bind params tacked 

2196 # onto the positional list if that is what the dbapi requires 

2197 return "" 

2198 

2199 order_by = self.process(select._order_by_clause, **kw) 

2200 

2201 if order_by: 

2202 return " ORDER BY " + order_by 

2203 else: 

2204 return "" 

2205 

2206 def update_from_clause( 

2207 self, update_stmt, from_table, extra_froms, from_hints, **kw 

2208 ): 

2209 """Render the UPDATE..FROM clause specific to MSSQL. 

2210 

2211 In MSSQL, if the UPDATE statement involves an alias of the table to 

2212 be updated, then the table itself must be added to the FROM list as 

2213 well. Otherwise, it is optional. Here, we add it regardless. 

2214 

2215 """ 

2216 return "FROM " + ", ".join( 

2217 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw) 

2218 for t in [from_table] + extra_froms 

2219 ) 

2220 

2221 def delete_table_clause(self, delete_stmt, from_table, extra_froms): 

2222 """If we have extra froms make sure we render any alias as hint.""" 

2223 ashint = False 

2224 if extra_froms: 

2225 ashint = True 

2226 return from_table._compiler_dispatch( 

2227 self, asfrom=True, iscrud=True, ashint=ashint 

2228 ) 

2229 

2230 def delete_extra_from_clause( 

2231 self, delete_stmt, from_table, extra_froms, from_hints, **kw 

2232 ): 

2233 """Render the DELETE .. FROM clause specific to MSSQL. 

2234 

2235 Yes, it has the FROM keyword twice. 

2236 

2237 """ 

2238 return "FROM " + ", ".join( 

2239 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw) 

2240 for t in [from_table] + extra_froms 

2241 ) 

2242 

2243 def visit_empty_set_expr(self, type_): 

2244 return "SELECT 1 WHERE 1!=1" 

2245 

2246 def visit_is_distinct_from_binary(self, binary, operator, **kw): 

2247 return "NOT EXISTS (SELECT %s INTERSECT SELECT %s)" % ( 

2248 self.process(binary.left), 

2249 self.process(binary.right), 

2250 ) 

2251 

2252 def visit_is_not_distinct_from_binary(self, binary, operator, **kw): 

2253 return "EXISTS (SELECT %s INTERSECT SELECT %s)" % ( 

2254 self.process(binary.left), 

2255 self.process(binary.right), 

2256 ) 

2257 

2258 def _render_json_extract_from_binary(self, binary, operator, **kw): 

2259 # note we are intentionally calling upon the process() calls in the 

2260 # order in which they appear in the SQL String as this is used 

2261 # by positional parameter rendering 

2262 

2263 if binary.type._type_affinity is sqltypes.JSON: 

2264 return "JSON_QUERY(%s, %s)" % ( 

2265 self.process(binary.left, **kw), 

2266 self.process(binary.right, **kw), 

2267 ) 

2268 

2269 # as with other dialects, start with an explicit test for NULL 

2270 case_expression = "CASE JSON_VALUE(%s, %s) WHEN NULL THEN NULL" % ( 

2271 self.process(binary.left, **kw), 

2272 self.process(binary.right, **kw), 

2273 ) 

2274 

2275 if binary.type._type_affinity is sqltypes.Integer: 

2276 type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS INTEGER)" % ( 

2277 self.process(binary.left, **kw), 

2278 self.process(binary.right, **kw), 

2279 ) 

2280 elif binary.type._type_affinity is sqltypes.Numeric: 

2281 type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS %s)" % ( 

2282 self.process(binary.left, **kw), 

2283 self.process(binary.right, **kw), 

2284 "FLOAT" 

2285 if isinstance(binary.type, sqltypes.Float) 

2286 else "NUMERIC(%s, %s)" 

2287 % (binary.type.precision, binary.type.scale), 

2288 ) 

2289 elif binary.type._type_affinity is sqltypes.Boolean: 

2290 # the NULL handling is particularly weird with boolean, so 

2291 # explicitly return numeric (BIT) constants 

2292 type_expression = ( 

2293 "WHEN 'true' THEN 1 WHEN 'false' THEN 0 ELSE NULL" 

2294 ) 

2295 elif binary.type._type_affinity is sqltypes.String: 

2296 # TODO: does this comment (from mysql) apply to here, too? 

2297 # this fails with a JSON value that's a four byte unicode 

2298 # string. SQLite has the same problem at the moment 

2299 type_expression = "ELSE JSON_VALUE(%s, %s)" % ( 

2300 self.process(binary.left, **kw), 

2301 self.process(binary.right, **kw), 

2302 ) 

2303 else: 

2304 # other affinity....this is not expected right now 

2305 type_expression = "ELSE JSON_QUERY(%s, %s)" % ( 

2306 self.process(binary.left, **kw), 

2307 self.process(binary.right, **kw), 

2308 ) 

2309 

2310 return case_expression + " " + type_expression + " END" 

2311 

2312 def visit_json_getitem_op_binary(self, binary, operator, **kw): 

2313 return self._render_json_extract_from_binary(binary, operator, **kw) 

2314 

2315 def visit_json_path_getitem_op_binary(self, binary, operator, **kw): 

2316 return self._render_json_extract_from_binary(binary, operator, **kw) 

2317 

2318 def visit_sequence(self, seq, **kw): 

2319 return "NEXT VALUE FOR %s" % self.preparer.format_sequence(seq) 

2320 

2321 

2322class MSSQLStrictCompiler(MSSQLCompiler): 

2323 

2324 """A subclass of MSSQLCompiler which disables the usage of bind 

2325 parameters where not allowed natively by MS-SQL. 

2326 

2327 A dialect may use this compiler on a platform where native 

2328 binds are used. 

2329 

2330 """ 

2331 

2332 ansi_bind_rules = True 

2333 

2334 def visit_in_op_binary(self, binary, operator, **kw): 

2335 kw["literal_execute"] = True 

2336 return "%s IN %s" % ( 

2337 self.process(binary.left, **kw), 

2338 self.process(binary.right, **kw), 

2339 ) 

2340 

2341 def visit_not_in_op_binary(self, binary, operator, **kw): 

2342 kw["literal_execute"] = True 

2343 return "%s NOT IN %s" % ( 

2344 self.process(binary.left, **kw), 

2345 self.process(binary.right, **kw), 

2346 ) 

2347 

2348 def render_literal_value(self, value, type_): 

2349 """ 

2350 For date and datetime values, convert to a string 

2351 format acceptable to MSSQL. That seems to be the 

2352 so-called ODBC canonical date format which looks 

2353 like this: 

2354 

2355 yyyy-mm-dd hh:mi:ss.mmm(24h) 

2356 

2357 For other data types, call the base class implementation. 

2358 """ 

2359 # datetime and date are both subclasses of datetime.date 

2360 if issubclass(type(value), datetime.date): 

2361 # SQL Server wants single quotes around the date string. 

2362 return "'" + str(value) + "'" 

2363 else: 

2364 return super(MSSQLStrictCompiler, self).render_literal_value( 

2365 value, type_ 

2366 ) 

2367 

2368 

2369class MSDDLCompiler(compiler.DDLCompiler): 

2370 def get_column_specification(self, column, **kwargs): 

2371 colspec = self.preparer.format_column(column) 

2372 

2373 # type is not accepted in a computed column 

2374 if column.computed is not None: 

2375 colspec += " " + self.process(column.computed) 

2376 else: 

2377 colspec += " " + self.dialect.type_compiler.process( 

2378 column.type, type_expression=column 

2379 ) 

2380 

2381 if column.nullable is not None: 

2382 if ( 

2383 not column.nullable 

2384 or column.primary_key 

2385 or isinstance(column.default, sa_schema.Sequence) 

2386 or column.autoincrement is True 

2387 or column.identity 

2388 ): 

2389 colspec += " NOT NULL" 

2390 elif column.computed is None: 

2391 # don't specify "NULL" for computed columns 

2392 colspec += " NULL" 

2393 

2394 if column.table is None: 

2395 raise exc.CompileError( 

2396 "mssql requires Table-bound columns " 

2397 "in order to generate DDL" 

2398 ) 

2399 

2400 d_opt = column.dialect_options["mssql"] 

2401 start = d_opt["identity_start"] 

2402 increment = d_opt["identity_increment"] 

2403 if start is not None or increment is not None: 

2404 if column.identity: 

2405 raise exc.CompileError( 

2406 "Cannot specify options 'mssql_identity_start' and/or " 

2407 "'mssql_identity_increment' while also using the " 

2408 "'Identity' construct." 

2409 ) 

2410 util.warn_deprecated( 

2411 "The dialect options 'mssql_identity_start' and " 

2412 "'mssql_identity_increment' are deprecated. " 

2413 "Use the 'Identity' object instead.", 

2414 "1.4", 

2415 ) 

2416 

2417 if column.identity: 

2418 colspec += self.process(column.identity, **kwargs) 

2419 elif ( 

2420 column is column.table._autoincrement_column 

2421 or column.autoincrement is True 

2422 ) and ( 

2423 not isinstance(column.default, Sequence) or column.default.optional 

2424 ): 

2425 colspec += self.process(Identity(start=start, increment=increment)) 

2426 else: 

2427 default = self.get_column_default_string(column) 

2428 if default is not None: 

2429 colspec += " DEFAULT " + default 

2430 

2431 return colspec 

2432 

2433 def visit_create_index(self, create, include_schema=False): 

2434 index = create.element 

2435 self._verify_index_table(index) 

2436 preparer = self.preparer 

2437 text = "CREATE " 

2438 if index.unique: 

2439 text += "UNIQUE " 

2440 

2441 # handle clustering option 

2442 clustered = index.dialect_options["mssql"]["clustered"] 

2443 if clustered is not None: 

2444 if clustered: 

2445 text += "CLUSTERED " 

2446 else: 

2447 text += "NONCLUSTERED " 

2448 

2449 text += "INDEX %s ON %s (%s)" % ( 

2450 self._prepared_index_name(index, include_schema=include_schema), 

2451 preparer.format_table(index.table), 

2452 ", ".join( 

2453 self.sql_compiler.process( 

2454 expr, include_table=False, literal_binds=True 

2455 ) 

2456 for expr in index.expressions 

2457 ), 

2458 ) 

2459 

2460 # handle other included columns 

2461 if index.dialect_options["mssql"]["include"]: 

2462 inclusions = [ 

2463 index.table.c[col] 

2464 if isinstance(col, util.string_types) 

2465 else col 

2466 for col in index.dialect_options["mssql"]["include"] 

2467 ] 

2468 

2469 text += " INCLUDE (%s)" % ", ".join( 

2470 [preparer.quote(c.name) for c in inclusions] 

2471 ) 

2472 

2473 whereclause = index.dialect_options["mssql"]["where"] 

2474 

2475 if whereclause is not None: 

2476 whereclause = coercions.expect( 

2477 roles.DDLExpressionRole, whereclause 

2478 ) 

2479 

2480 where_compiled = self.sql_compiler.process( 

2481 whereclause, include_table=False, literal_binds=True 

2482 ) 

2483 text += " WHERE " + where_compiled 

2484 

2485 return text 

2486 

2487 def visit_drop_index(self, drop): 

2488 return "\nDROP INDEX %s ON %s" % ( 

2489 self._prepared_index_name(drop.element, include_schema=False), 

2490 self.preparer.format_table(drop.element.table), 

2491 ) 

2492 

2493 def visit_primary_key_constraint(self, constraint): 

2494 if len(constraint) == 0: 

2495 return "" 

2496 text = "" 

2497 if constraint.name is not None: 

2498 text += "CONSTRAINT %s " % self.preparer.format_constraint( 

2499 constraint 

2500 ) 

2501 text += "PRIMARY KEY " 

2502 

2503 clustered = constraint.dialect_options["mssql"]["clustered"] 

2504 if clustered is not None: 

2505 if clustered: 

2506 text += "CLUSTERED " 

2507 else: 

2508 text += "NONCLUSTERED " 

2509 

2510 text += "(%s)" % ", ".join( 

2511 self.preparer.quote(c.name) for c in constraint 

2512 ) 

2513 text += self.define_constraint_deferrability(constraint) 

2514 return text 

2515 

2516 def visit_unique_constraint(self, constraint): 

2517 if len(constraint) == 0: 

2518 return "" 

2519 text = "" 

2520 if constraint.name is not None: 

2521 formatted_name = self.preparer.format_constraint(constraint) 

2522 if formatted_name is not None: 

2523 text += "CONSTRAINT %s " % formatted_name 

2524 text += "UNIQUE " 

2525 

2526 clustered = constraint.dialect_options["mssql"]["clustered"] 

2527 if clustered is not None: 

2528 if clustered: 

2529 text += "CLUSTERED " 

2530 else: 

2531 text += "NONCLUSTERED " 

2532 

2533 text += "(%s)" % ", ".join( 

2534 self.preparer.quote(c.name) for c in constraint 

2535 ) 

2536 text += self.define_constraint_deferrability(constraint) 

2537 return text 

2538 

2539 def visit_computed_column(self, generated): 

2540 text = "AS (%s)" % self.sql_compiler.process( 

2541 generated.sqltext, include_table=False, literal_binds=True 

2542 ) 

2543 # explicitly check for True|False since None means server default 

2544 if generated.persisted is True: 

2545 text += " PERSISTED" 

2546 return text 

2547 

2548 def visit_create_sequence(self, create, **kw): 

2549 prefix = None 

2550 if create.element.data_type is not None: 

2551 data_type = create.element.data_type 

2552 prefix = " AS %s" % self.type_compiler.process(data_type) 

2553 return super(MSDDLCompiler, self).visit_create_sequence( 

2554 create, prefix=prefix, **kw 

2555 ) 

2556 

2557 def visit_identity_column(self, identity, **kw): 

2558 text = " IDENTITY" 

2559 if identity.start is not None or identity.increment is not None: 

2560 start = 1 if identity.start is None else identity.start 

2561 increment = 1 if identity.increment is None else identity.increment 

2562 text += "(%s,%s)" % (start, increment) 

2563 return text 

2564 

2565 

2566class MSIdentifierPreparer(compiler.IdentifierPreparer): 

2567 reserved_words = RESERVED_WORDS 

2568 

2569 def __init__(self, dialect): 

2570 super(MSIdentifierPreparer, self).__init__( 

2571 dialect, 

2572 initial_quote="[", 

2573 final_quote="]", 

2574 quote_case_sensitive_collations=False, 

2575 ) 

2576 

2577 def _escape_identifier(self, value): 

2578 return value.replace("]", "]]") 

2579 

2580 def _unescape_identifier(self, value): 

2581 return value.replace("]]", "]") 

2582 

2583 def quote_schema(self, schema, force=None): 

2584 """Prepare a quoted table and schema name.""" 

2585 

2586 # need to re-implement the deprecation warning entirely 

2587 if force is not None: 

2588 # not using the util.deprecated_params() decorator in this 

2589 # case because of the additional function call overhead on this 

2590 # very performance-critical spot. 

2591 util.warn_deprecated( 

2592 "The IdentifierPreparer.quote_schema.force parameter is " 

2593 "deprecated and will be removed in a future release. This " 

2594 "flag has no effect on the behavior of the " 

2595 "IdentifierPreparer.quote method; please refer to " 

2596 "quoted_name().", 

2597 version="1.3", 

2598 ) 

2599 

2600 dbname, owner = _schema_elements(schema) 

2601 if dbname: 

2602 result = "%s.%s" % (self.quote(dbname), self.quote(owner)) 

2603 elif owner: 

2604 result = self.quote(owner) 

2605 else: 

2606 result = "" 

2607 return result 

2608 

2609 

2610def _db_plus_owner_listing(fn): 

2611 def wrap(dialect, connection, schema=None, **kw): 

2612 dbname, owner = _owner_plus_db(dialect, schema) 

2613 return _switch_db( 

2614 dbname, 

2615 connection, 

2616 fn, 

2617 dialect, 

2618 connection, 

2619 dbname, 

2620 owner, 

2621 schema, 

2622 **kw 

2623 ) 

2624 

2625 return update_wrapper(wrap, fn) 

2626 

2627 

2628def _db_plus_owner(fn): 

2629 def wrap(dialect, connection, tablename, schema=None, **kw): 

2630 dbname, owner = _owner_plus_db(dialect, schema) 

2631 return _switch_db( 

2632 dbname, 

2633 connection, 

2634 fn, 

2635 dialect, 

2636 connection, 

2637 tablename, 

2638 dbname, 

2639 owner, 

2640 schema, 

2641 **kw 

2642 ) 

2643 

2644 return update_wrapper(wrap, fn) 

2645 

2646 

2647def _switch_db(dbname, connection, fn, *arg, **kw): 

2648 if dbname: 

2649 current_db = connection.exec_driver_sql("select db_name()").scalar() 

2650 if current_db != dbname: 

2651 connection.exec_driver_sql( 

2652 "use %s" % connection.dialect.identifier_preparer.quote(dbname) 

2653 ) 

2654 try: 

2655 return fn(*arg, **kw) 

2656 finally: 

2657 if dbname and current_db != dbname: 

2658 connection.exec_driver_sql( 

2659 "use %s" 

2660 % connection.dialect.identifier_preparer.quote(current_db) 

2661 ) 

2662 

2663 

2664def _owner_plus_db(dialect, schema): 

2665 if not schema: 

2666 return None, dialect.default_schema_name 

2667 elif "." in schema: 

2668 return _schema_elements(schema) 

2669 else: 

2670 return None, schema 

2671 

2672 

2673_memoized_schema = util.LRUCache() 

2674 

2675 

2676def _schema_elements(schema): 

2677 if isinstance(schema, quoted_name) and schema.quote: 

2678 return None, schema 

2679 

2680 if schema in _memoized_schema: 

2681 return _memoized_schema[schema] 

2682 

2683 # tests for this function are in: 

2684 # test/dialect/mssql/test_reflection.py -> 

2685 # OwnerPlusDBTest.test_owner_database_pairs 

2686 # test/dialect/mssql/test_compiler.py -> test_force_schema_* 

2687 # test/dialect/mssql/test_compiler.py -> test_schema_many_tokens_* 

2688 # 

2689 

2690 if schema.startswith("__[SCHEMA_"): 

2691 return None, schema 

2692 

2693 push = [] 

2694 symbol = "" 

2695 bracket = False 

2696 has_brackets = False 

2697 for token in re.split(r"(\[|\]|\.)", schema): 

2698 if not token: 

2699 continue 

2700 if token == "[": 

2701 bracket = True 

2702 has_brackets = True 

2703 elif token == "]": 

2704 bracket = False 

2705 elif not bracket and token == ".": 

2706 if has_brackets: 

2707 push.append("[%s]" % symbol) 

2708 else: 

2709 push.append(symbol) 

2710 symbol = "" 

2711 has_brackets = False 

2712 else: 

2713 symbol += token 

2714 if symbol: 

2715 push.append(symbol) 

2716 if len(push) > 1: 

2717 dbname, owner = ".".join(push[0:-1]), push[-1] 

2718 

2719 # test for internal brackets 

2720 if re.match(r".*\].*\[.*", dbname[1:-1]): 

2721 dbname = quoted_name(dbname, quote=False) 

2722 else: 

2723 dbname = dbname.lstrip("[").rstrip("]") 

2724 

2725 elif len(push): 

2726 dbname, owner = None, push[0] 

2727 else: 

2728 dbname, owner = None, None 

2729 

2730 _memoized_schema[schema] = dbname, owner 

2731 return dbname, owner 

2732 

2733 

2734class MSDialect(default.DefaultDialect): 

2735 # will assume it's at least mssql2005 

2736 name = "mssql" 

2737 supports_statement_cache = True 

2738 supports_default_values = True 

2739 supports_empty_insert = False 

2740 execution_ctx_cls = MSExecutionContext 

2741 use_scope_identity = True 

2742 max_identifier_length = 128 

2743 schema_name = "dbo" 

2744 

2745 implicit_returning = True 

2746 full_returning = True 

2747 

2748 colspecs = { 

2749 sqltypes.DateTime: _MSDateTime, 

2750 sqltypes.Date: _MSDate, 

2751 sqltypes.JSON: JSON, 

2752 sqltypes.JSON.JSONIndexType: JSONIndexType, 

2753 sqltypes.JSON.JSONPathType: JSONPathType, 

2754 sqltypes.Time: _BASETIMEIMPL, 

2755 sqltypes.Unicode: _MSUnicode, 

2756 sqltypes.UnicodeText: _MSUnicodeText, 

2757 DATETIMEOFFSET: DATETIMEOFFSET, 

2758 DATETIME2: DATETIME2, 

2759 SMALLDATETIME: SMALLDATETIME, 

2760 DATETIME: DATETIME, 

2761 } 

2762 

2763 engine_config_types = default.DefaultDialect.engine_config_types.union( 

2764 {"legacy_schema_aliasing": util.asbool} 

2765 ) 

2766 

2767 ischema_names = ischema_names 

2768 

2769 supports_sequences = True 

2770 sequences_optional = True 

2771 # T-SQL's actual default is -9223372036854775808 

2772 default_sequence_base = 1 

2773 

2774 supports_native_boolean = False 

2775 non_native_boolean_check_constraint = False 

2776 supports_unicode_binds = True 

2777 postfetch_lastrowid = True 

2778 _supports_offset_fetch = False 

2779 _supports_nvarchar_max = False 

2780 

2781 legacy_schema_aliasing = False 

2782 

2783 server_version_info = () 

2784 

2785 statement_compiler = MSSQLCompiler 

2786 ddl_compiler = MSDDLCompiler 

2787 type_compiler = MSTypeCompiler 

2788 preparer = MSIdentifierPreparer 

2789 

2790 construct_arguments = [ 

2791 (sa_schema.PrimaryKeyConstraint, {"clustered": None}), 

2792 (sa_schema.UniqueConstraint, {"clustered": None}), 

2793 (sa_schema.Index, {"clustered": None, "include": None, "where": None}), 

2794 ( 

2795 sa_schema.Column, 

2796 {"identity_start": None, "identity_increment": None}, 

2797 ), 

2798 ] 

2799 

2800 def __init__( 

2801 self, 

2802 query_timeout=None, 

2803 use_scope_identity=True, 

2804 schema_name="dbo", 

2805 isolation_level=None, 

2806 deprecate_large_types=None, 

2807 json_serializer=None, 

2808 json_deserializer=None, 

2809 legacy_schema_aliasing=None, 

2810 ignore_no_transaction_on_rollback=False, 

2811 **opts 

2812 ): 

2813 self.query_timeout = int(query_timeout or 0) 

2814 self.schema_name = schema_name 

2815 

2816 self.use_scope_identity = use_scope_identity 

2817 self.deprecate_large_types = deprecate_large_types 

2818 self.ignore_no_transaction_on_rollback = ( 

2819 ignore_no_transaction_on_rollback 

2820 ) 

2821 

2822 if legacy_schema_aliasing is not None: 

2823 util.warn_deprecated( 

2824 "The legacy_schema_aliasing parameter is " 

2825 "deprecated and will be removed in a future release.", 

2826 "1.4", 

2827 ) 

2828 self.legacy_schema_aliasing = legacy_schema_aliasing 

2829 

2830 super(MSDialect, self).__init__(**opts) 

2831 

2832 self.isolation_level = isolation_level 

2833 self._json_serializer = json_serializer 

2834 self._json_deserializer = json_deserializer 

2835 

2836 def do_savepoint(self, connection, name): 

2837 # give the DBAPI a push 

2838 connection.exec_driver_sql("IF @@TRANCOUNT = 0 BEGIN TRANSACTION") 

2839 super(MSDialect, self).do_savepoint(connection, name) 

2840 

2841 def do_release_savepoint(self, connection, name): 

2842 # SQL Server does not support RELEASE SAVEPOINT 

2843 pass 

2844 

2845 def do_rollback(self, dbapi_connection): 

2846 try: 

2847 super(MSDialect, self).do_rollback(dbapi_connection) 

2848 except self.dbapi.ProgrammingError as e: 

2849 if self.ignore_no_transaction_on_rollback and re.match( 

2850 r".*\b111214\b", str(e) 

2851 ): 

2852 util.warn( 

2853 "ProgrammingError 111214 " 

2854 "'No corresponding transaction found.' " 

2855 "has been suppressed via " 

2856 "ignore_no_transaction_on_rollback=True" 

2857 ) 

2858 else: 

2859 raise 

2860 

2861 _isolation_lookup = set( 

2862 [ 

2863 "SERIALIZABLE", 

2864 "READ UNCOMMITTED", 

2865 "READ COMMITTED", 

2866 "REPEATABLE READ", 

2867 "SNAPSHOT", 

2868 ] 

2869 ) 

2870 

2871 def set_isolation_level(self, connection, level): 

2872 level = level.replace("_", " ") 

2873 if level not in self._isolation_lookup: 

2874 raise exc.ArgumentError( 

2875 "Invalid value '%s' for isolation_level. " 

2876 "Valid isolation levels for %s are %s" 

2877 % (level, self.name, ", ".join(self._isolation_lookup)) 

2878 ) 

2879 cursor = connection.cursor() 

2880 cursor.execute("SET TRANSACTION ISOLATION LEVEL %s" % level) 

2881 cursor.close() 

2882 if level == "SNAPSHOT": 

2883 connection.commit() 

2884 

2885 def get_isolation_level(self, dbapi_connection): 

2886 cursor = dbapi_connection.cursor() 

2887 view_name = "sys.system_views" 

2888 try: 

2889 cursor.execute( 

2890 ( 

2891 "SELECT name FROM {} WHERE name IN " 

2892 "('dm_exec_sessions', 'dm_pdw_nodes_exec_sessions')" 

2893 ).format(view_name) 

2894 ) 

2895 row = cursor.fetchone() 

2896 if not row: 

2897 raise NotImplementedError( 

2898 "Can't fetch isolation level on this particular " 

2899 "SQL Server version." 

2900 ) 

2901 

2902 view_name = "sys.{}".format(row[0]) 

2903 

2904 cursor.execute( 

2905 """ 

2906 SELECT CASE transaction_isolation_level 

2907 WHEN 0 THEN NULL 

2908 WHEN 1 THEN 'READ UNCOMMITTED' 

2909 WHEN 2 THEN 'READ COMMITTED' 

2910 WHEN 3 THEN 'REPEATABLE READ' 

2911 WHEN 4 THEN 'SERIALIZABLE' 

2912 WHEN 5 THEN 'SNAPSHOT' END 

2913 AS TRANSACTION_ISOLATION_LEVEL 

2914 FROM {} 

2915 where session_id = @@SPID 

2916 """.format( 

2917 view_name 

2918 ) 

2919 ) 

2920 except self.dbapi.Error as err: 

2921 util.raise_( 

2922 NotImplementedError( 

2923 "Can't fetch isolation level; encountered error {} when " 

2924 'attempting to query the "{}" view.'.format(err, view_name) 

2925 ), 

2926 from_=err, 

2927 ) 

2928 else: 

2929 row = cursor.fetchone() 

2930 return row[0].upper() 

2931 finally: 

2932 cursor.close() 

2933 

2934 def initialize(self, connection): 

2935 super(MSDialect, self).initialize(connection) 

2936 self._setup_version_attributes() 

2937 self._setup_supports_nvarchar_max(connection) 

2938 

2939 def on_connect(self): 

2940 if self.isolation_level is not None: 

2941 

2942 def connect(conn): 

2943 self.set_isolation_level(conn, self.isolation_level) 

2944 

2945 return connect 

2946 else: 

2947 return None 

2948 

2949 def _setup_version_attributes(self): 

2950 if self.server_version_info[0] not in list(range(8, 17)): 

2951 util.warn( 

2952 "Unrecognized server version info '%s'. Some SQL Server " 

2953 "features may not function properly." 

2954 % ".".join(str(x) for x in self.server_version_info) 

2955 ) 

2956 

2957 if self.server_version_info >= MS_2008_VERSION: 

2958 self.supports_multivalues_insert = True 

2959 if self.deprecate_large_types is None: 

2960 self.deprecate_large_types = ( 

2961 self.server_version_info >= MS_2012_VERSION 

2962 ) 

2963 

2964 self._supports_offset_fetch = ( 

2965 self.server_version_info and self.server_version_info[0] >= 11 

2966 ) 

2967 

2968 def _setup_supports_nvarchar_max(self, connection): 

2969 try: 

2970 connection.scalar( 

2971 sql.text("SELECT CAST('test max support' AS NVARCHAR(max))") 

2972 ) 

2973 except exc.DBAPIError: 

2974 self._supports_nvarchar_max = False 

2975 else: 

2976 self._supports_nvarchar_max = True 

2977 

2978 def _get_default_schema_name(self, connection): 

2979 query = sql.text("SELECT schema_name()") 

2980 default_schema_name = connection.scalar(query) 

2981 if default_schema_name is not None: 

2982 # guard against the case where the default_schema_name is being 

2983 # fed back into a table reflection function. 

2984 return quoted_name(default_schema_name, quote=True) 

2985 else: 

2986 return self.schema_name 

2987 

2988 @_db_plus_owner 

2989 def has_table(self, connection, tablename, dbname, owner, schema): 

2990 self._ensure_has_table_connection(connection) 

2991 

2992 if tablename.startswith("#"): # temporary table 

2993 # mssql does not support temporary views 

2994 # SQL Error [4103] [S0001]: "#v": Temporary views are not allowed 

2995 return bool( 

2996 connection.scalar( 

2997 # U filters on user tables only. 

2998 text("SELECT object_id(:table_name, 'U')"), 

2999 {"table_name": "tempdb.dbo.[{}]".format(tablename)}, 

3000 ) 

3001 ) 

3002 

3003 else: 

3004 tables = ischema.tables 

3005 

3006 s = sql.select(tables.c.table_name, tables.c.table_type).where( 

3007 tables.c.table_name == tablename, 

3008 ) 

3009 

3010 if owner: 

3011 s = s.where(tables.c.table_schema == owner) 

3012 

3013 c = connection.execute(s) 

3014 

3015 return c.first() is not None 

3016 

3017 @_db_plus_owner 

3018 def has_sequence(self, connection, sequencename, dbname, owner, schema): 

3019 sequences = ischema.sequences 

3020 

3021 s = sql.select(sequences.c.sequence_name).where( 

3022 sequences.c.sequence_name == sequencename 

3023 ) 

3024 

3025 if owner: 

3026 s = s.where(sequences.c.sequence_schema == owner) 

3027 

3028 c = connection.execute(s) 

3029 

3030 return c.first() is not None 

3031 

3032 @reflection.cache 

3033 @_db_plus_owner_listing 

3034 def get_sequence_names(self, connection, dbname, owner, schema, **kw): 

3035 sequences = ischema.sequences 

3036 

3037 s = sql.select(sequences.c.sequence_name) 

3038 if owner: 

3039 s = s.where(sequences.c.sequence_schema == owner) 

3040 

3041 c = connection.execute(s) 

3042 

3043 return [row[0] for row in c] 

3044 

3045 @reflection.cache 

3046 def get_schema_names(self, connection, **kw): 

3047 s = sql.select(ischema.schemata.c.schema_name).order_by( 

3048 ischema.schemata.c.schema_name 

3049 ) 

3050 schema_names = [r[0] for r in connection.execute(s)] 

3051 return schema_names 

3052 

3053 @reflection.cache 

3054 @_db_plus_owner_listing 

3055 def get_table_names(self, connection, dbname, owner, schema, **kw): 

3056 tables = ischema.tables 

3057 s = ( 

3058 sql.select(tables.c.table_name) 

3059 .where( 

3060 sql.and_( 

3061 tables.c.table_schema == owner, 

3062 tables.c.table_type == "BASE TABLE", 

3063 ) 

3064 ) 

3065 .order_by(tables.c.table_name) 

3066 ) 

3067 table_names = [r[0] for r in connection.execute(s)] 

3068 return table_names 

3069 

3070 @reflection.cache 

3071 @_db_plus_owner_listing 

3072 def get_view_names(self, connection, dbname, owner, schema, **kw): 

3073 tables = ischema.tables 

3074 s = ( 

3075 sql.select(tables.c.table_name) 

3076 .where( 

3077 sql.and_( 

3078 tables.c.table_schema == owner, 

3079 tables.c.table_type == "VIEW", 

3080 ) 

3081 ) 

3082 .order_by(tables.c.table_name) 

3083 ) 

3084 view_names = [r[0] for r in connection.execute(s)] 

3085 return view_names 

3086 

3087 @reflection.cache 

3088 @_db_plus_owner 

3089 def get_indexes(self, connection, tablename, dbname, owner, schema, **kw): 

3090 filter_definition = ( 

3091 "ind.filter_definition" 

3092 if self.server_version_info >= MS_2008_VERSION 

3093 else "NULL as filter_definition" 

3094 ) 

3095 rp = connection.execution_options(future_result=True).execute( 

3096 sql.text( 

3097 "select ind.index_id, ind.is_unique, ind.name, " 

3098 "%s " 

3099 "from sys.indexes as ind join sys.tables as tab on " 

3100 "ind.object_id=tab.object_id " 

3101 "join sys.schemas as sch on sch.schema_id=tab.schema_id " 

3102 "where tab.name = :tabname " 

3103 "and sch.name=:schname " 

3104 "and ind.is_primary_key=0 and ind.type != 0" 

3105 % filter_definition 

3106 ) 

3107 .bindparams( 

3108 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()), 

3109 sql.bindparam("schname", owner, ischema.CoerceUnicode()), 

3110 ) 

3111 .columns(name=sqltypes.Unicode()) 

3112 ) 

3113 indexes = {} 

3114 for row in rp.mappings(): 

3115 indexes[row["index_id"]] = { 

3116 "name": row["name"], 

3117 "unique": row["is_unique"] == 1, 

3118 "column_names": [], 

3119 "include_columns": [], 

3120 } 

3121 

3122 if row["filter_definition"] is not None: 

3123 indexes[row["index_id"]].setdefault("dialect_options", {})[ 

3124 "mssql_where" 

3125 ] = row["filter_definition"] 

3126 

3127 rp = connection.execution_options(future_result=True).execute( 

3128 sql.text( 

3129 "select ind_col.index_id, ind_col.object_id, col.name, " 

3130 "ind_col.is_included_column " 

3131 "from sys.columns as col " 

3132 "join sys.tables as tab on tab.object_id=col.object_id " 

3133 "join sys.index_columns as ind_col on " 

3134 "(ind_col.column_id=col.column_id and " 

3135 "ind_col.object_id=tab.object_id) " 

3136 "join sys.schemas as sch on sch.schema_id=tab.schema_id " 

3137 "where tab.name=:tabname " 

3138 "and sch.name=:schname" 

3139 ) 

3140 .bindparams( 

3141 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()), 

3142 sql.bindparam("schname", owner, ischema.CoerceUnicode()), 

3143 ) 

3144 .columns(name=sqltypes.Unicode()) 

3145 ) 

3146 for row in rp.mappings(): 

3147 if row["index_id"] in indexes: 

3148 if row["is_included_column"]: 

3149 indexes[row["index_id"]]["include_columns"].append( 

3150 row["name"] 

3151 ) 

3152 else: 

3153 indexes[row["index_id"]]["column_names"].append( 

3154 row["name"] 

3155 ) 

3156 for index_info in indexes.values(): 

3157 # NOTE: "root level" include_columns is legacy, now part of 

3158 # dialect_options (issue #7382) 

3159 index_info.setdefault("dialect_options", {})[ 

3160 "mssql_include" 

3161 ] = index_info["include_columns"] 

3162 

3163 return list(indexes.values()) 

3164 

3165 @reflection.cache 

3166 @_db_plus_owner 

3167 def get_view_definition( 

3168 self, connection, viewname, dbname, owner, schema, **kw 

3169 ): 

3170 rp = connection.execute( 

3171 sql.text( 

3172 "select definition from sys.sql_modules as mod, " 

3173 "sys.views as views, " 

3174 "sys.schemas as sch" 

3175 " where " 

3176 "mod.object_id=views.object_id and " 

3177 "views.schema_id=sch.schema_id and " 

3178 "views.name=:viewname and sch.name=:schname" 

3179 ).bindparams( 

3180 sql.bindparam("viewname", viewname, ischema.CoerceUnicode()), 

3181 sql.bindparam("schname", owner, ischema.CoerceUnicode()), 

3182 ) 

3183 ) 

3184 

3185 if rp: 

3186 view_def = rp.scalar() 

3187 return view_def 

3188 

3189 def _temp_table_name_like_pattern(self, tablename): 

3190 # LIKE uses '%' to match zero or more characters and '_' to match any 

3191 # single character. We want to match literal underscores, so T-SQL 

3192 # requires that we enclose them in square brackets. 

3193 return tablename + ( 

3194 ("[_][_][_]%") if not tablename.startswith("##") else "" 

3195 ) 

3196 

3197 def _get_internal_temp_table_name(self, connection, tablename): 

3198 # it's likely that schema is always "dbo", but since we can 

3199 # get it here, let's get it. 

3200 # see https://stackoverflow.com/questions/8311959/ 

3201 # specifying-schema-for-temporary-tables 

3202 

3203 try: 

3204 return connection.execute( 

3205 sql.text( 

3206 "select table_schema, table_name " 

3207 "from tempdb.information_schema.tables " 

3208 "where table_name like :p1" 

3209 ), 

3210 {"p1": self._temp_table_name_like_pattern(tablename)}, 

3211 ).one() 

3212 except exc.MultipleResultsFound as me: 

3213 util.raise_( 

3214 exc.UnreflectableTableError( 

3215 "Found more than one temporary table named '%s' in tempdb " 

3216 "at this time. Cannot reliably resolve that name to its " 

3217 "internal table name." % tablename 

3218 ), 

3219 replace_context=me, 

3220 ) 

3221 except exc.NoResultFound as ne: 

3222 util.raise_( 

3223 exc.NoSuchTableError( 

3224 "Unable to find a temporary table named '%s' in tempdb." 

3225 % tablename 

3226 ), 

3227 replace_context=ne, 

3228 ) 

3229 

3230 @reflection.cache 

3231 @_db_plus_owner 

3232 def get_columns(self, connection, tablename, dbname, owner, schema, **kw): 

3233 is_temp_table = tablename.startswith("#") 

3234 if is_temp_table: 

3235 owner, tablename = self._get_internal_temp_table_name( 

3236 connection, tablename 

3237 ) 

3238 

3239 columns = ischema.mssql_temp_table_columns 

3240 else: 

3241 columns = ischema.columns 

3242 

3243 computed_cols = ischema.computed_columns 

3244 identity_cols = ischema.identity_columns 

3245 if owner: 

3246 whereclause = sql.and_( 

3247 columns.c.table_name == tablename, 

3248 columns.c.table_schema == owner, 

3249 ) 

3250 full_name = columns.c.table_schema + "." + columns.c.table_name 

3251 else: 

3252 whereclause = columns.c.table_name == tablename 

3253 full_name = columns.c.table_name 

3254 

3255 join = columns.join( 

3256 computed_cols, 

3257 onclause=sql.and_( 

3258 computed_cols.c.object_id == func.object_id(full_name), 

3259 computed_cols.c.name 

3260 == columns.c.column_name.collate("DATABASE_DEFAULT"), 

3261 ), 

3262 isouter=True, 

3263 ).join( 

3264 identity_cols, 

3265 onclause=sql.and_( 

3266 identity_cols.c.object_id == func.object_id(full_name), 

3267 identity_cols.c.name 

3268 == columns.c.column_name.collate("DATABASE_DEFAULT"), 

3269 ), 

3270 isouter=True, 

3271 ) 

3272 

3273 if self._supports_nvarchar_max: 

3274 computed_definition = computed_cols.c.definition 

3275 else: 

3276 # tds_version 4.2 does not support NVARCHAR(MAX) 

3277 computed_definition = sql.cast( 

3278 computed_cols.c.definition, NVARCHAR(4000) 

3279 ) 

3280 

3281 s = ( 

3282 sql.select( 

3283 columns, 

3284 computed_definition, 

3285 computed_cols.c.is_persisted, 

3286 identity_cols.c.is_identity, 

3287 identity_cols.c.seed_value, 

3288 identity_cols.c.increment_value, 

3289 ) 

3290 .where(whereclause) 

3291 .select_from(join) 

3292 .order_by(columns.c.ordinal_position) 

3293 ) 

3294 

3295 c = connection.execution_options(future_result=True).execute(s) 

3296 

3297 cols = [] 

3298 for row in c.mappings(): 

3299 name = row[columns.c.column_name] 

3300 type_ = row[columns.c.data_type] 

3301 nullable = row[columns.c.is_nullable] == "YES" 

3302 charlen = row[columns.c.character_maximum_length] 

3303 numericprec = row[columns.c.numeric_precision] 

3304 numericscale = row[columns.c.numeric_scale] 

3305 default = row[columns.c.column_default] 

3306 collation = row[columns.c.collation_name] 

3307 definition = row[computed_definition] 

3308 is_persisted = row[computed_cols.c.is_persisted] 

3309 is_identity = row[identity_cols.c.is_identity] 

3310 identity_start = row[identity_cols.c.seed_value] 

3311 identity_increment = row[identity_cols.c.increment_value] 

3312 

3313 coltype = self.ischema_names.get(type_, None) 

3314 

3315 kwargs = {} 

3316 if coltype in ( 

3317 MSString, 

3318 MSChar, 

3319 MSNVarchar, 

3320 MSNChar, 

3321 MSText, 

3322 MSNText, 

3323 MSBinary, 

3324 MSVarBinary, 

3325 sqltypes.LargeBinary, 

3326 ): 

3327 if charlen == -1: 

3328 charlen = None 

3329 kwargs["length"] = charlen 

3330 if collation: 

3331 kwargs["collation"] = collation 

3332 

3333 if coltype is None: 

3334 util.warn( 

3335 "Did not recognize type '%s' of column '%s'" 

3336 % (type_, name) 

3337 ) 

3338 coltype = sqltypes.NULLTYPE 

3339 else: 

3340 if issubclass(coltype, sqltypes.Numeric): 

3341 kwargs["precision"] = numericprec 

3342 

3343 if not issubclass(coltype, sqltypes.Float): 

3344 kwargs["scale"] = numericscale 

3345 

3346 coltype = coltype(**kwargs) 

3347 cdict = { 

3348 "name": name, 

3349 "type": coltype, 

3350 "nullable": nullable, 

3351 "default": default, 

3352 "autoincrement": is_identity is not None, 

3353 } 

3354 

3355 if definition is not None and is_persisted is not None: 

3356 cdict["computed"] = { 

3357 "sqltext": definition, 

3358 "persisted": is_persisted, 

3359 } 

3360 

3361 if is_identity is not None: 

3362 # identity_start and identity_increment are Decimal or None 

3363 if identity_start is None or identity_increment is None: 

3364 cdict["identity"] = {} 

3365 else: 

3366 if isinstance(coltype, sqltypes.BigInteger): 

3367 start = compat.long_type(identity_start) 

3368 increment = compat.long_type(identity_increment) 

3369 elif isinstance(coltype, sqltypes.Integer): 

3370 start = int(identity_start) 

3371 increment = int(identity_increment) 

3372 else: 

3373 start = identity_start 

3374 increment = identity_increment 

3375 

3376 cdict["identity"] = { 

3377 "start": start, 

3378 "increment": increment, 

3379 } 

3380 

3381 cols.append(cdict) 

3382 

3383 return cols 

3384 

3385 @reflection.cache 

3386 @_db_plus_owner 

3387 def get_pk_constraint( 

3388 self, connection, tablename, dbname, owner, schema, **kw 

3389 ): 

3390 pkeys = [] 

3391 TC = ischema.constraints 

3392 C = ischema.key_constraints.alias("C") 

3393 

3394 # Primary key constraints 

3395 s = ( 

3396 sql.select( 

3397 C.c.column_name, TC.c.constraint_type, C.c.constraint_name 

3398 ) 

3399 .where( 

3400 sql.and_( 

3401 TC.c.constraint_name == C.c.constraint_name, 

3402 TC.c.table_schema == C.c.table_schema, 

3403 C.c.table_name == tablename, 

3404 C.c.table_schema == owner, 

3405 ), 

3406 ) 

3407 .order_by(TC.c.constraint_name, C.c.ordinal_position) 

3408 ) 

3409 c = connection.execution_options(future_result=True).execute(s) 

3410 constraint_name = None 

3411 for row in c.mappings(): 

3412 if "PRIMARY" in row[TC.c.constraint_type.name]: 

3413 pkeys.append(row["COLUMN_NAME"]) 

3414 if constraint_name is None: 

3415 constraint_name = row[C.c.constraint_name.name] 

3416 return {"constrained_columns": pkeys, "name": constraint_name} 

3417 

3418 @reflection.cache 

3419 @_db_plus_owner 

3420 def get_foreign_keys( 

3421 self, connection, tablename, dbname, owner, schema, **kw 

3422 ): 

3423 # Foreign key constraints 

3424 s = ( 

3425 text( 

3426 """\ 

3427WITH fk_info AS ( 

3428 SELECT 

3429 ischema_ref_con.constraint_schema, 

3430 ischema_ref_con.constraint_name, 

3431 ischema_key_col.ordinal_position, 

3432 ischema_key_col.table_schema, 

3433 ischema_key_col.table_name, 

3434 ischema_ref_con.unique_constraint_schema, 

3435 ischema_ref_con.unique_constraint_name, 

3436 ischema_ref_con.match_option, 

3437 ischema_ref_con.update_rule, 

3438 ischema_ref_con.delete_rule, 

3439 ischema_key_col.column_name AS constrained_column 

3440 FROM 

3441 INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS ischema_ref_con 

3442 INNER JOIN 

3443 INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col ON 

3444 ischema_key_col.table_schema = ischema_ref_con.constraint_schema 

3445 AND ischema_key_col.constraint_name = 

3446 ischema_ref_con.constraint_name 

3447 WHERE ischema_key_col.table_name = :tablename 

3448 AND ischema_key_col.table_schema = :owner 

3449), 

3450constraint_info AS ( 

3451 SELECT 

3452 ischema_key_col.constraint_schema, 

3453 ischema_key_col.constraint_name, 

3454 ischema_key_col.ordinal_position, 

3455 ischema_key_col.table_schema, 

3456 ischema_key_col.table_name, 

3457 ischema_key_col.column_name 

3458 FROM 

3459 INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col 

3460), 

3461index_info AS ( 

3462 SELECT 

3463 sys.schemas.name AS index_schema, 

3464 sys.indexes.name AS index_name, 

3465 sys.index_columns.key_ordinal AS ordinal_position, 

3466 sys.schemas.name AS table_schema, 

3467 sys.objects.name AS table_name, 

3468 sys.columns.name AS column_name 

3469 FROM 

3470 sys.indexes 

3471 INNER JOIN 

3472 sys.objects ON 

3473 sys.objects.object_id = sys.indexes.object_id 

3474 INNER JOIN 

3475 sys.schemas ON 

3476 sys.schemas.schema_id = sys.objects.schema_id 

3477 INNER JOIN 

3478 sys.index_columns ON 

3479 sys.index_columns.object_id = sys.objects.object_id 

3480 AND sys.index_columns.index_id = sys.indexes.index_id 

3481 INNER JOIN 

3482 sys.columns ON 

3483 sys.columns.object_id = sys.indexes.object_id 

3484 AND sys.columns.column_id = sys.index_columns.column_id 

3485) 

3486 SELECT 

3487 fk_info.constraint_schema, 

3488 fk_info.constraint_name, 

3489 fk_info.ordinal_position, 

3490 fk_info.constrained_column, 

3491 constraint_info.table_schema AS referred_table_schema, 

3492 constraint_info.table_name AS referred_table_name, 

3493 constraint_info.column_name AS referred_column, 

3494 fk_info.match_option, 

3495 fk_info.update_rule, 

3496 fk_info.delete_rule 

3497 FROM 

3498 fk_info INNER JOIN constraint_info ON 

3499 constraint_info.constraint_schema = 

3500 fk_info.unique_constraint_schema 

3501 AND constraint_info.constraint_name = 

3502 fk_info.unique_constraint_name 

3503 AND constraint_info.ordinal_position = fk_info.ordinal_position 

3504 UNION 

3505 SELECT 

3506 fk_info.constraint_schema, 

3507 fk_info.constraint_name, 

3508 fk_info.ordinal_position, 

3509 fk_info.constrained_column, 

3510 index_info.table_schema AS referred_table_schema, 

3511 index_info.table_name AS referred_table_name, 

3512 index_info.column_name AS referred_column, 

3513 fk_info.match_option, 

3514 fk_info.update_rule, 

3515 fk_info.delete_rule 

3516 FROM 

3517 fk_info INNER JOIN index_info ON 

3518 index_info.index_schema = fk_info.unique_constraint_schema 

3519 AND index_info.index_name = fk_info.unique_constraint_name 

3520 AND index_info.ordinal_position = fk_info.ordinal_position 

3521 

3522 ORDER BY fk_info.constraint_schema, fk_info.constraint_name, 

3523 fk_info.ordinal_position 

3524""" 

3525 ) 

3526 .bindparams( 

3527 sql.bindparam("tablename", tablename, ischema.CoerceUnicode()), 

3528 sql.bindparam("owner", owner, ischema.CoerceUnicode()), 

3529 ) 

3530 .columns( 

3531 constraint_schema=sqltypes.Unicode(), 

3532 constraint_name=sqltypes.Unicode(), 

3533 table_schema=sqltypes.Unicode(), 

3534 table_name=sqltypes.Unicode(), 

3535 constrained_column=sqltypes.Unicode(), 

3536 referred_table_schema=sqltypes.Unicode(), 

3537 referred_table_name=sqltypes.Unicode(), 

3538 referred_column=sqltypes.Unicode(), 

3539 ) 

3540 ) 

3541 

3542 # group rows by constraint ID, to handle multi-column FKs 

3543 fkeys = [] 

3544 

3545 def fkey_rec(): 

3546 return { 

3547 "name": None, 

3548 "constrained_columns": [], 

3549 "referred_schema": None, 

3550 "referred_table": None, 

3551 "referred_columns": [], 

3552 "options": {}, 

3553 } 

3554 

3555 fkeys = util.defaultdict(fkey_rec) 

3556 

3557 for r in connection.execute(s).fetchall(): 

3558 ( 

3559 _, # constraint schema 

3560 rfknm, 

3561 _, # ordinal position 

3562 scol, 

3563 rschema, 

3564 rtbl, 

3565 rcol, 

3566 # TODO: we support match=<keyword> for foreign keys so 

3567 # we can support this also, PG has match=FULL for example 

3568 # but this seems to not be a valid value for SQL Server 

3569 _, # match rule 

3570 fkuprule, 

3571 fkdelrule, 

3572 ) = r 

3573 

3574 rec = fkeys[rfknm] 

3575 rec["name"] = rfknm 

3576 

3577 if fkuprule != "NO ACTION": 

3578 rec["options"]["onupdate"] = fkuprule 

3579 

3580 if fkdelrule != "NO ACTION": 

3581 rec["options"]["ondelete"] = fkdelrule 

3582 

3583 if not rec["referred_table"]: 

3584 rec["referred_table"] = rtbl 

3585 if schema is not None or owner != rschema: 

3586 if dbname: 

3587 rschema = dbname + "." + rschema 

3588 rec["referred_schema"] = rschema 

3589 

3590 local_cols, remote_cols = ( 

3591 rec["constrained_columns"], 

3592 rec["referred_columns"], 

3593 ) 

3594 

3595 local_cols.append(scol) 

3596 remote_cols.append(rcol) 

3597 

3598 return list(fkeys.values())