Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/mssql/base.py: 33%

1054 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# mssql/base.py 

2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7""" 

8.. dialect:: mssql 

9 :name: Microsoft SQL Server 

10 :full_support: 2017 

11 :normal_support: 2012+ 

12 :best_effort: 2005+ 

13 

14.. _mssql_external_dialects: 

15 

16External Dialects 

17----------------- 

18 

19In addition to the above DBAPI layers with native SQLAlchemy support, there 

20are third-party dialects for other DBAPI layers that are compatible 

21with SQL Server. See the "External Dialects" list on the 

22:ref:`dialect_toplevel` page. 

23 

24.. _mssql_identity: 

25 

26Auto Increment Behavior / IDENTITY Columns 

27------------------------------------------ 

28 

29SQL Server provides so-called "auto incrementing" behavior using the 

30``IDENTITY`` construct, which can be placed on any single integer column in a 

31table. SQLAlchemy considers ``IDENTITY`` within its default "autoincrement" 

32behavior for an integer primary key column, described at 

33:paramref:`_schema.Column.autoincrement`. This means that by default, 

34the first integer primary key column in a :class:`_schema.Table` will be 

35considered to be the identity column - unless it is associated with a 

36:class:`.Sequence` - and will generate DDL as such:: 

37 

38 from sqlalchemy import Table, MetaData, Column, Integer 

39 

40 m = MetaData() 

41 t = Table('t', m, 

42 Column('id', Integer, primary_key=True), 

43 Column('x', Integer)) 

44 m.create_all(engine) 

45 

46The above example will generate DDL as: 

47 

48.. sourcecode:: sql 

49 

50 CREATE TABLE t ( 

51 id INTEGER NOT NULL IDENTITY, 

52 x INTEGER NULL, 

53 PRIMARY KEY (id) 

54 ) 

55 

56For the case where this default generation of ``IDENTITY`` is not desired, 

57specify ``False`` for the :paramref:`_schema.Column.autoincrement` flag, 

58on the first integer primary key column:: 

59 

60 m = MetaData() 

61 t = Table('t', m, 

62 Column('id', Integer, primary_key=True, autoincrement=False), 

63 Column('x', Integer)) 

64 m.create_all(engine) 

65 

66To add the ``IDENTITY`` keyword to a non-primary key column, specify 

67``True`` for the :paramref:`_schema.Column.autoincrement` flag on the desired 

68:class:`_schema.Column` object, and ensure that 

69:paramref:`_schema.Column.autoincrement` 

70is set to ``False`` on any integer primary key column:: 

71 

72 m = MetaData() 

73 t = Table('t', m, 

74 Column('id', Integer, primary_key=True, autoincrement=False), 

75 Column('x', Integer, autoincrement=True)) 

76 m.create_all(engine) 

77 

78.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct 

79 in a :class:`_schema.Column` to specify the start and increment 

80 parameters of an IDENTITY. These replace 

81 the use of the :class:`.Sequence` object in order to specify these values. 

82 

83.. deprecated:: 1.4 

84 

85 The ``mssql_identity_start`` and ``mssql_identity_increment`` parameters 

86 to :class:`_schema.Column` are deprecated and should we replaced by 

87 an :class:`_schema.Identity` object. Specifying both ways of configuring 

88 an IDENTITY will result in a compile error. 

89 These options are also no longer returned as part of the 

90 ``dialect_options`` key in :meth:`_reflection.Inspector.get_columns`. 

91 Use the information in the ``identity`` key instead. 

92 

93.. deprecated:: 1.3 

94 

95 The use of :class:`.Sequence` to specify IDENTITY characteristics is 

96 deprecated and will be removed in a future release. Please use 

97 the :class:`_schema.Identity` object parameters 

98 :paramref:`_schema.Identity.start` and 

99 :paramref:`_schema.Identity.increment`. 

100 

101.. versionchanged:: 1.4 Removed the ability to use a :class:`.Sequence` 

102 object to modify IDENTITY characteristics. :class:`.Sequence` objects 

103 now only manipulate true T-SQL SEQUENCE types. 

104 

105.. note:: 

106 

107 There can only be one IDENTITY column on the table. When using 

108 ``autoincrement=True`` to enable the IDENTITY keyword, SQLAlchemy does not 

109 guard against multiple columns specifying the option simultaneously. The 

110 SQL Server database will instead reject the ``CREATE TABLE`` statement. 

111 

112.. note:: 

113 

114 An INSERT statement which attempts to provide a value for a column that is 

115 marked with IDENTITY will be rejected by SQL Server. In order for the 

116 value to be accepted, a session-level option "SET IDENTITY_INSERT" must be 

117 enabled. The SQLAlchemy SQL Server dialect will perform this operation 

118 automatically when using a core :class:`_expression.Insert` 

119 construct; if the 

120 execution specifies a value for the IDENTITY column, the "IDENTITY_INSERT" 

121 option will be enabled for the span of that statement's invocation.However, 

122 this scenario is not high performing and should not be relied upon for 

123 normal use. If a table doesn't actually require IDENTITY behavior in its 

124 integer primary key column, the keyword should be disabled when creating 

125 the table by ensuring that ``autoincrement=False`` is set. 

126 

127Controlling "Start" and "Increment" 

128^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

129 

130Specific control over the "start" and "increment" values for 

131the ``IDENTITY`` generator are provided using the 

132:paramref:`_schema.Identity.start` and :paramref:`_schema.Identity.increment` 

133parameters passed to the :class:`_schema.Identity` object:: 

134 

135 from sqlalchemy import Table, Integer, Column, Identity 

136 

137 test = Table( 

138 'test', metadata, 

139 Column( 

140 'id', 

141 Integer, 

142 primary_key=True, 

143 Identity(start=100, increment=10) 

144 ), 

145 Column('name', String(20)) 

146 ) 

147 

148The CREATE TABLE for the above :class:`_schema.Table` object would be: 

149 

150.. sourcecode:: sql 

151 

152 CREATE TABLE test ( 

153 id INTEGER NOT NULL IDENTITY(100,10) PRIMARY KEY, 

154 name VARCHAR(20) NULL, 

155 ) 

156 

157.. note:: 

158 

159 The :class:`_schema.Identity` object supports many other parameter in 

160 addition to ``start`` and ``increment``. These are not supported by 

161 SQL Server and will be ignored when generating the CREATE TABLE ddl. 

162 

163.. versionchanged:: 1.3.19 The :class:`_schema.Identity` object is 

164 now used to affect the 

165 ``IDENTITY`` generator for a :class:`_schema.Column` under SQL Server. 

166 Previously, the :class:`.Sequence` object was used. As SQL Server now 

167 supports real sequences as a separate construct, :class:`.Sequence` will be 

168 functional in the normal way starting from SQLAlchemy version 1.4. 

169 

170 

171Using IDENTITY with Non-Integer numeric types 

172^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

173 

174SQL Server also allows ``IDENTITY`` to be used with ``NUMERIC`` columns. To 

175implement this pattern smoothly in SQLAlchemy, the primary datatype of the 

176column should remain as ``Integer``, however the underlying implementation 

177type deployed to the SQL Server database can be specified as ``Numeric`` using 

178:meth:`.TypeEngine.with_variant`:: 

179 

180 from sqlalchemy import Column 

181 from sqlalchemy import Integer 

182 from sqlalchemy import Numeric 

183 from sqlalchemy import String 

184 from sqlalchemy.ext.declarative import declarative_base 

185 

186 Base = declarative_base() 

187 

188 class TestTable(Base): 

189 __tablename__ = "test" 

190 id = Column( 

191 Integer().with_variant(Numeric(10, 0), "mssql"), 

192 primary_key=True, 

193 autoincrement=True, 

194 ) 

195 name = Column(String) 

196 

197In the above example, ``Integer().with_variant()`` provides clear usage 

198information that accurately describes the intent of the code. The general 

199restriction that ``autoincrement`` only applies to ``Integer`` is established 

200at the metadata level and not at the per-dialect level. 

201 

202When using the above pattern, the primary key identifier that comes back from 

203the insertion of a row, which is also the value that would be assigned to an 

204ORM object such as ``TestTable`` above, will be an instance of ``Decimal()`` 

205and not ``int`` when using SQL Server. The numeric return type of the 

206:class:`_types.Numeric` type can be changed to return floats by passing False 

207to :paramref:`_types.Numeric.asdecimal`. To normalize the return type of the 

208above ``Numeric(10, 0)`` to return Python ints (which also support "long" 

209integer values in Python 3), use :class:`_types.TypeDecorator` as follows:: 

210 

211 from sqlalchemy import TypeDecorator 

212 

213 class NumericAsInteger(TypeDecorator): 

214 '''normalize floating point return values into ints''' 

215 

216 impl = Numeric(10, 0, asdecimal=False) 

217 cache_ok = True 

218 

219 def process_result_value(self, value, dialect): 

220 if value is not None: 

221 value = int(value) 

222 return value 

223 

224 class TestTable(Base): 

225 __tablename__ = "test" 

226 id = Column( 

227 Integer().with_variant(NumericAsInteger, "mssql"), 

228 primary_key=True, 

229 autoincrement=True, 

230 ) 

231 name = Column(String) 

232 

233 

234INSERT behavior 

235^^^^^^^^^^^^^^^^ 

236 

237Handling of the ``IDENTITY`` column at INSERT time involves two key 

238techniques. The most common is being able to fetch the "last inserted value" 

239for a given ``IDENTITY`` column, a process which SQLAlchemy performs 

240implicitly in many cases, most importantly within the ORM. 

241 

242The process for fetching this value has several variants: 

243 

244* In the vast majority of cases, RETURNING is used in conjunction with INSERT 

245 statements on SQL Server in order to get newly generated primary key values: 

246 

247 .. sourcecode:: sql 

248 

249 INSERT INTO t (x) OUTPUT inserted.id VALUES (?) 

250 

251* When RETURNING is not available or has been disabled via 

252 ``implicit_returning=False``, either the ``scope_identity()`` function or 

253 the ``@@identity`` variable is used; behavior varies by backend: 

254 

255 * when using PyODBC, the phrase ``; select scope_identity()`` will be 

256 appended to the end of the INSERT statement; a second result set will be 

257 fetched in order to receive the value. Given a table as:: 

258 

259 t = Table('t', m, Column('id', Integer, primary_key=True), 

260 Column('x', Integer), 

261 implicit_returning=False) 

262 

263 an INSERT will look like: 

264 

265 .. sourcecode:: sql 

266 

267 INSERT INTO t (x) VALUES (?); select scope_identity() 

268 

269 * Other dialects such as pymssql will call upon 

270 ``SELECT scope_identity() AS lastrowid`` subsequent to an INSERT 

271 statement. If the flag ``use_scope_identity=False`` is passed to 

272 :func:`_sa.create_engine`, 

273 the statement ``SELECT @@identity AS lastrowid`` 

274 is used instead. 

275 

276A table that contains an ``IDENTITY`` column will prohibit an INSERT statement 

277that refers to the identity column explicitly. The SQLAlchemy dialect will 

278detect when an INSERT construct, created using a core 

279:func:`_expression.insert` 

280construct (not a plain string SQL), refers to the identity column, and 

281in this case will emit ``SET IDENTITY_INSERT ON`` prior to the insert 

282statement proceeding, and ``SET IDENTITY_INSERT OFF`` subsequent to the 

283execution. Given this example:: 

284 

285 m = MetaData() 

286 t = Table('t', m, Column('id', Integer, primary_key=True), 

287 Column('x', Integer)) 

288 m.create_all(engine) 

289 

290 with engine.begin() as conn: 

291 conn.execute(t.insert(), {'id': 1, 'x':1}, {'id':2, 'x':2}) 

292 

293The above column will be created with IDENTITY, however the INSERT statement 

294we emit is specifying explicit values. In the echo output we can see 

295how SQLAlchemy handles this: 

296 

297.. sourcecode:: sql 

298 

299 CREATE TABLE t ( 

300 id INTEGER NOT NULL IDENTITY(1,1), 

301 x INTEGER NULL, 

302 PRIMARY KEY (id) 

303 ) 

304 

305 COMMIT 

306 SET IDENTITY_INSERT t ON 

307 INSERT INTO t (id, x) VALUES (?, ?) 

308 ((1, 1), (2, 2)) 

309 SET IDENTITY_INSERT t OFF 

310 COMMIT 

311 

312 

313 

314This is an auxiliary use case suitable for testing and bulk insert scenarios. 

315 

316SEQUENCE support 

317---------------- 

318 

319The :class:`.Sequence` object now creates "real" sequences, i.e., 

320``CREATE SEQUENCE``. To provide compatibility with other dialects, 

321:class:`.Sequence` defaults to a start value of 1, even though the 

322T-SQL defaults is -9223372036854775808. 

323 

324.. versionadded:: 1.4.0 

325 

326MAX on VARCHAR / NVARCHAR 

327------------------------- 

328 

329SQL Server supports the special string "MAX" within the 

330:class:`_types.VARCHAR` and :class:`_types.NVARCHAR` datatypes, 

331to indicate "maximum length possible". The dialect currently handles this as 

332a length of "None" in the base type, rather than supplying a 

333dialect-specific version of these types, so that a base type 

334specified such as ``VARCHAR(None)`` can assume "unlengthed" behavior on 

335more than one backend without using dialect-specific types. 

336 

337To build a SQL Server VARCHAR or NVARCHAR with MAX length, use None:: 

338 

339 my_table = Table( 

340 'my_table', metadata, 

341 Column('my_data', VARCHAR(None)), 

342 Column('my_n_data', NVARCHAR(None)) 

343 ) 

344 

345 

346Collation Support 

347----------------- 

348 

349Character collations are supported by the base string types, 

350specified by the string argument "collation":: 

351 

352 from sqlalchemy import VARCHAR 

353 Column('login', VARCHAR(32, collation='Latin1_General_CI_AS')) 

354 

355When such a column is associated with a :class:`_schema.Table`, the 

356CREATE TABLE statement for this column will yield:: 

357 

358 login VARCHAR(32) COLLATE Latin1_General_CI_AS NULL 

359 

360LIMIT/OFFSET Support 

361-------------------- 

362 

363MSSQL has added support for LIMIT / OFFSET as of SQL Server 2012, via the 

364"OFFSET n ROWS" and "FETCH NEXT n ROWS" clauses. SQLAlchemy supports these 

365syntaxes automatically if SQL Server 2012 or greater is detected. 

366 

367.. versionchanged:: 1.4 support added for SQL Server "OFFSET n ROWS" and 

368 "FETCH NEXT n ROWS" syntax. 

369 

370For statements that specify only LIMIT and no OFFSET, all versions of SQL 

371Server support the TOP keyword. This syntax is used for all SQL Server 

372versions when no OFFSET clause is present. A statement such as:: 

373 

374 select(some_table).limit(5) 

375 

376will render similarly to:: 

377 

378 SELECT TOP 5 col1, col2.. FROM table 

379 

380For versions of SQL Server prior to SQL Server 2012, a statement that uses 

381LIMIT and OFFSET, or just OFFSET alone, will be rendered using the 

382``ROW_NUMBER()`` window function. A statement such as:: 

383 

384 select(some_table).order_by(some_table.c.col3).limit(5).offset(10) 

385 

386will render similarly to:: 

387 

388 SELECT anon_1.col1, anon_1.col2 FROM (SELECT col1, col2, 

389 ROW_NUMBER() OVER (ORDER BY col3) AS 

390 mssql_rn FROM table WHERE t.x = :x_1) AS 

391 anon_1 WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1 

392 

393Note that when using LIMIT and/or OFFSET, whether using the older 

394or newer SQL Server syntaxes, the statement must have an ORDER BY as well, 

395else a :class:`.CompileError` is raised. 

396 

397.. _mssql_isolation_level: 

398 

399Transaction Isolation Level 

400--------------------------- 

401 

402All SQL Server dialects support setting of transaction isolation level 

403both via a dialect-specific parameter 

404:paramref:`_sa.create_engine.isolation_level` 

405accepted by :func:`_sa.create_engine`, 

406as well as the :paramref:`.Connection.execution_options.isolation_level` 

407argument as passed to 

408:meth:`_engine.Connection.execution_options`. 

409This feature works by issuing the 

410command ``SET TRANSACTION ISOLATION LEVEL <level>`` for 

411each new connection. 

412 

413To set isolation level using :func:`_sa.create_engine`:: 

414 

415 engine = create_engine( 

416 "mssql+pyodbc://scott:tiger@ms_2008", 

417 isolation_level="REPEATABLE READ" 

418 ) 

419 

420To set using per-connection execution options:: 

421 

422 connection = engine.connect() 

423 connection = connection.execution_options( 

424 isolation_level="READ COMMITTED" 

425 ) 

426 

427Valid values for ``isolation_level`` include: 

428 

429* ``AUTOCOMMIT`` - pyodbc / pymssql-specific 

430* ``READ COMMITTED`` 

431* ``READ UNCOMMITTED`` 

432* ``REPEATABLE READ`` 

433* ``SERIALIZABLE`` 

434* ``SNAPSHOT`` - specific to SQL Server 

435 

436There are also more options for isolation level configurations, such as 

437"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply 

438different isolation level settings. See the discussion at 

439:ref:`dbapi_autocommit` for background. 

440 

441.. seealso:: 

442 

443 :ref:`dbapi_autocommit` 

444 

445.. _mssql_reset_on_return: 

446 

447Temporary Table / Resource Reset for Connection Pooling 

448------------------------------------------------------- 

449 

450The :class:`.QueuePool` connection pool implementation used 

451by the SQLAlchemy :class:`_sa.Engine` object includes 

452:ref:`reset on return <pool_reset_on_return>` behavior that will invoke 

453the DBAPI ``.rollback()`` method when connections are returned to the pool. 

454While this rollback will clear out the immediate state used by the previous 

455transaction, it does not cover a wider range of session-level state, including 

456temporary tables as well as other server state such as prepared statement 

457handles and statement caches. An undocumented SQL Server procedure known 

458as ``sp_reset_connection`` is known to be a workaround for this issue which 

459will reset most of the session state that builds up on a connection, including 

460temporary tables. 

461 

462To install ``sp_reset_connection`` as the means of performing reset-on-return, 

463the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated in the 

464example below (**requires SQLAlchemy 1.4.43 or greater**). The 

465:paramref:`_sa.create_engine.pool_reset_on_return` parameter is set to ``None`` 

466so that the custom scheme can replace the default behavior completely. The 

467custom hook implementation calls ``.rollback()`` in any case, as it's usually 

468important that the DBAPI's own tracking of commit/rollback will remain 

469consistent with the state of the transaction:: 

470 

471 from sqlalchemy import create_engine 

472 from sqlalchemy import event 

473 

474 mssql_engine = create_engine( 

475 "mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server", 

476 

477 # disable default reset-on-return scheme 

478 pool_reset_on_return=None, 

479 ) 

480 

481 

482 @event.listens_for(mssql_engine, "reset") 

483 def _reset_mssql(dbapi_connection, connection_record, reset_state): 

484 dbapi_connection.execute("{call sys.sp_reset_connection}") 

485 

486 # so that the DBAPI itself knows that the connection has been 

487 # reset 

488 dbapi_connection.rollback() 

489 

490.. versionchanged:: 1.4.43 Ensured the :meth:`.PoolEvents.reset` event 

491 is invoked for all "reset" occurrences, so that it's appropriate 

492 as a place for custom "reset" handlers. Previous schemes which 

493 use the :meth:`.PoolEvents.checkin` handler remain usable as well. 

494 

495.. seealso:: 

496 

497 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation 

498 

499Nullability 

500----------- 

501MSSQL has support for three levels of column nullability. The default 

502nullability allows nulls and is explicit in the CREATE TABLE 

503construct:: 

504 

505 name VARCHAR(20) NULL 

506 

507If ``nullable=None`` is specified then no specification is made. In 

508other words the database's configured default is used. This will 

509render:: 

510 

511 name VARCHAR(20) 

512 

513If ``nullable`` is ``True`` or ``False`` then the column will be 

514``NULL`` or ``NOT NULL`` respectively. 

515 

516Date / Time Handling 

517-------------------- 

518DATE and TIME are supported. Bind parameters are converted 

519to datetime.datetime() objects as required by most MSSQL drivers, 

520and results are processed from strings if needed. 

521The DATE and TIME types are not available for MSSQL 2005 and 

522previous - if a server version below 2008 is detected, DDL 

523for these types will be issued as DATETIME. 

524 

525.. _mssql_large_type_deprecation: 

526 

527Large Text/Binary Type Deprecation 

528---------------------------------- 

529 

530Per 

531`SQL Server 2012/2014 Documentation <https://technet.microsoft.com/en-us/library/ms187993.aspx>`_, 

532the ``NTEXT``, ``TEXT`` and ``IMAGE`` datatypes are to be removed from SQL 

533Server in a future release. SQLAlchemy normally relates these types to the 

534:class:`.UnicodeText`, :class:`_expression.TextClause` and 

535:class:`.LargeBinary` datatypes. 

536 

537In order to accommodate this change, a new flag ``deprecate_large_types`` 

538is added to the dialect, which will be automatically set based on detection 

539of the server version in use, if not otherwise set by the user. The 

540behavior of this flag is as follows: 

541 

542* When this flag is ``True``, the :class:`.UnicodeText`, 

543 :class:`_expression.TextClause` and 

544 :class:`.LargeBinary` datatypes, when used to render DDL, will render the 

545 types ``NVARCHAR(max)``, ``VARCHAR(max)``, and ``VARBINARY(max)``, 

546 respectively. This is a new behavior as of the addition of this flag. 

547 

548* When this flag is ``False``, the :class:`.UnicodeText`, 

549 :class:`_expression.TextClause` and 

550 :class:`.LargeBinary` datatypes, when used to render DDL, will render the 

551 types ``NTEXT``, ``TEXT``, and ``IMAGE``, 

552 respectively. This is the long-standing behavior of these types. 

553 

554* The flag begins with the value ``None``, before a database connection is 

555 established. If the dialect is used to render DDL without the flag being 

556 set, it is interpreted the same as ``False``. 

557 

558* On first connection, the dialect detects if SQL Server version 2012 or 

559 greater is in use; if the flag is still at ``None``, it sets it to ``True`` 

560 or ``False`` based on whether 2012 or greater is detected. 

561 

562* The flag can be set to either ``True`` or ``False`` when the dialect 

563 is created, typically via :func:`_sa.create_engine`:: 

564 

565 eng = create_engine("mssql+pymssql://user:pass@host/db", 

566 deprecate_large_types=True) 

567 

568* Complete control over whether the "old" or "new" types are rendered is 

569 available in all SQLAlchemy versions by using the UPPERCASE type objects 

570 instead: :class:`_types.NVARCHAR`, :class:`_types.VARCHAR`, 

571 :class:`_types.VARBINARY`, :class:`_types.TEXT`, :class:`_mssql.NTEXT`, 

572 :class:`_mssql.IMAGE` 

573 will always remain fixed and always output exactly that 

574 type. 

575 

576.. versionadded:: 1.0.0 

577 

578.. _multipart_schema_names: 

579 

580Multipart Schema Names 

581---------------------- 

582 

583SQL Server schemas sometimes require multiple parts to their "schema" 

584qualifier, that is, including the database name and owner name as separate 

585tokens, such as ``mydatabase.dbo.some_table``. These multipart names can be set 

586at once using the :paramref:`_schema.Table.schema` argument of 

587:class:`_schema.Table`:: 

588 

589 Table( 

590 "some_table", metadata, 

591 Column("q", String(50)), 

592 schema="mydatabase.dbo" 

593 ) 

594 

595When performing operations such as table or component reflection, a schema 

596argument that contains a dot will be split into separate 

597"database" and "owner" components in order to correctly query the SQL 

598Server information schema tables, as these two values are stored separately. 

599Additionally, when rendering the schema name for DDL or SQL, the two 

600components will be quoted separately for case sensitive names and other 

601special characters. Given an argument as below:: 

602 

603 Table( 

604 "some_table", metadata, 

605 Column("q", String(50)), 

606 schema="MyDataBase.dbo" 

607 ) 

608 

609The above schema would be rendered as ``[MyDataBase].dbo``, and also in 

610reflection, would be reflected using "dbo" as the owner and "MyDataBase" 

611as the database name. 

612 

613To control how the schema name is broken into database / owner, 

614specify brackets (which in SQL Server are quoting characters) in the name. 

615Below, the "owner" will be considered as ``MyDataBase.dbo`` and the 

616"database" will be None:: 

617 

618 Table( 

619 "some_table", metadata, 

620 Column("q", String(50)), 

621 schema="[MyDataBase.dbo]" 

622 ) 

623 

624To individually specify both database and owner name with special characters 

625or embedded dots, use two sets of brackets:: 

626 

627 Table( 

628 "some_table", metadata, 

629 Column("q", String(50)), 

630 schema="[MyDataBase.Period].[MyOwner.Dot]" 

631 ) 

632 

633 

634.. versionchanged:: 1.2 the SQL Server dialect now treats brackets as 

635 identifier delimiters splitting the schema into separate database 

636 and owner tokens, to allow dots within either name itself. 

637 

638.. _legacy_schema_rendering: 

639 

640Legacy Schema Mode 

641------------------ 

642 

643Very old versions of the MSSQL dialect introduced the behavior such that a 

644schema-qualified table would be auto-aliased when used in a 

645SELECT statement; given a table:: 

646 

647 account_table = Table( 

648 'account', metadata, 

649 Column('id', Integer, primary_key=True), 

650 Column('info', String(100)), 

651 schema="customer_schema" 

652 ) 

653 

654this legacy mode of rendering would assume that "customer_schema.account" 

655would not be accepted by all parts of the SQL statement, as illustrated 

656below:: 

657 

658 >>> eng = create_engine("mssql+pymssql://mydsn", legacy_schema_aliasing=True) 

659 >>> print(account_table.select().compile(eng)) 

660 SELECT account_1.id, account_1.info 

661 FROM customer_schema.account AS account_1 

662 

663This mode of behavior is now off by default, as it appears to have served 

664no purpose; however in the case that legacy applications rely upon it, 

665it is available using the ``legacy_schema_aliasing`` argument to 

666:func:`_sa.create_engine` as illustrated above. 

667 

668.. versionchanged:: 1.1 the ``legacy_schema_aliasing`` flag introduced 

669 in version 1.0.5 to allow disabling of legacy mode for schemas now 

670 defaults to False. 

671 

672.. deprecated:: 1.4 

673 

674 The ``legacy_schema_aliasing`` flag is now 

675 deprecated and will be removed in a future release. 

676 

677.. _mssql_indexes: 

678 

679Clustered Index Support 

680----------------------- 

681 

682The MSSQL dialect supports clustered indexes (and primary keys) via the 

683``mssql_clustered`` option. This option is available to :class:`.Index`, 

684:class:`.UniqueConstraint`. and :class:`.PrimaryKeyConstraint`. 

685 

686To generate a clustered index:: 

687 

688 Index("my_index", table.c.x, mssql_clustered=True) 

689 

690which renders the index as ``CREATE CLUSTERED INDEX my_index ON table (x)``. 

691 

692To generate a clustered primary key use:: 

693 

694 Table('my_table', metadata, 

695 Column('x', ...), 

696 Column('y', ...), 

697 PrimaryKeyConstraint("x", "y", mssql_clustered=True)) 

698 

699which will render the table, for example, as:: 

700 

701 CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL, 

702 PRIMARY KEY CLUSTERED (x, y)) 

703 

704Similarly, we can generate a clustered unique constraint using:: 

705 

706 Table('my_table', metadata, 

707 Column('x', ...), 

708 Column('y', ...), 

709 PrimaryKeyConstraint("x"), 

710 UniqueConstraint("y", mssql_clustered=True), 

711 ) 

712 

713To explicitly request a non-clustered primary key (for example, when 

714a separate clustered index is desired), use:: 

715 

716 Table('my_table', metadata, 

717 Column('x', ...), 

718 Column('y', ...), 

719 PrimaryKeyConstraint("x", "y", mssql_clustered=False)) 

720 

721which will render the table, for example, as:: 

722 

723 CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL, 

724 PRIMARY KEY NONCLUSTERED (x, y)) 

725 

726.. versionchanged:: 1.1 the ``mssql_clustered`` option now defaults 

727 to None, rather than False. ``mssql_clustered=False`` now explicitly 

728 renders the NONCLUSTERED clause, whereas None omits the CLUSTERED 

729 clause entirely, allowing SQL Server defaults to take effect. 

730 

731 

732MSSQL-Specific Index Options 

733----------------------------- 

734 

735In addition to clustering, the MSSQL dialect supports other special options 

736for :class:`.Index`. 

737 

738INCLUDE 

739^^^^^^^ 

740 

741The ``mssql_include`` option renders INCLUDE(colname) for the given string 

742names:: 

743 

744 Index("my_index", table.c.x, mssql_include=['y']) 

745 

746would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)`` 

747 

748.. _mssql_index_where: 

749 

750Filtered Indexes 

751^^^^^^^^^^^^^^^^ 

752 

753The ``mssql_where`` option renders WHERE(condition) for the given string 

754names:: 

755 

756 Index("my_index", table.c.x, mssql_where=table.c.x > 10) 

757 

758would render the index as ``CREATE INDEX my_index ON table (x) WHERE x > 10``. 

759 

760.. versionadded:: 1.3.4 

761 

762Index ordering 

763^^^^^^^^^^^^^^ 

764 

765Index ordering is available via functional expressions, such as:: 

766 

767 Index("my_index", table.c.x.desc()) 

768 

769would render the index as ``CREATE INDEX my_index ON table (x DESC)`` 

770 

771.. seealso:: 

772 

773 :ref:`schema_indexes_functional` 

774 

775Compatibility Levels 

776-------------------- 

777MSSQL supports the notion of setting compatibility levels at the 

778database level. This allows, for instance, to run a database that 

779is compatible with SQL2000 while running on a SQL2005 database 

780server. ``server_version_info`` will always return the database 

781server version information (in this case SQL2005) and not the 

782compatibility level information. Because of this, if running under 

783a backwards compatibility mode SQLAlchemy may attempt to use T-SQL 

784statements that are unable to be parsed by the database server. 

785 

786Triggers 

787-------- 

788 

789SQLAlchemy by default uses OUTPUT INSERTED to get at newly 

790generated primary key values via IDENTITY columns or other 

791server side defaults. MS-SQL does not 

792allow the usage of OUTPUT INSERTED on tables that have triggers. 

793To disable the usage of OUTPUT INSERTED on a per-table basis, 

794specify ``implicit_returning=False`` for each :class:`_schema.Table` 

795which has triggers:: 

796 

797 Table('mytable', metadata, 

798 Column('id', Integer, primary_key=True), 

799 # ..., 

800 implicit_returning=False 

801 ) 

802 

803Declarative form:: 

804 

805 class MyClass(Base): 

806 # ... 

807 __table_args__ = {'implicit_returning':False} 

808 

809 

810This option can also be specified engine-wide using the 

811``implicit_returning=False`` argument on :func:`_sa.create_engine`. 

812 

813.. _mssql_rowcount_versioning: 

814 

815Rowcount Support / ORM Versioning 

816--------------------------------- 

817 

818The SQL Server drivers may have limited ability to return the number 

819of rows updated from an UPDATE or DELETE statement. 

820 

821As of this writing, the PyODBC driver is not able to return a rowcount when 

822OUTPUT INSERTED is used. This impacts the SQLAlchemy ORM's versioning feature 

823in many cases where server-side value generators are in use in that while the 

824versioning operations can succeed, the ORM cannot always check that an UPDATE 

825or DELETE statement matched the number of rows expected, which is how it 

826verifies that the version identifier matched. When this condition occurs, a 

827warning will be emitted but the operation will proceed. 

828 

829The use of OUTPUT INSERTED can be disabled by setting the 

830:paramref:`_schema.Table.implicit_returning` flag to ``False`` on a particular 

831:class:`_schema.Table`, which in declarative looks like:: 

832 

833 class MyTable(Base): 

834 __tablename__ = 'mytable' 

835 id = Column(Integer, primary_key=True) 

836 stuff = Column(String(10)) 

837 timestamp = Column(TIMESTAMP(), default=text('DEFAULT')) 

838 __mapper_args__ = { 

839 'version_id_col': timestamp, 

840 'version_id_generator': False, 

841 } 

842 __table_args__ = { 

843 'implicit_returning': False 

844 } 

845 

846Enabling Snapshot Isolation 

847--------------------------- 

848 

849SQL Server has a default transaction 

850isolation mode that locks entire tables, and causes even mildly concurrent 

851applications to have long held locks and frequent deadlocks. 

852Enabling snapshot isolation for the database as a whole is recommended 

853for modern levels of concurrency support. This is accomplished via the 

854following ALTER DATABASE commands executed at the SQL prompt:: 

855 

856 ALTER DATABASE MyDatabase SET ALLOW_SNAPSHOT_ISOLATION ON 

857 

858 ALTER DATABASE MyDatabase SET READ_COMMITTED_SNAPSHOT ON 

859 

860Background on SQL Server snapshot isolation is available at 

861https://msdn.microsoft.com/en-us/library/ms175095.aspx. 

862 

863""" # noqa 

864 

865import codecs 

866import datetime 

867import operator 

868import re 

869 

870from . import information_schema as ischema 

871from .json import JSON 

872from .json import JSONIndexType 

873from .json import JSONPathType 

874from ... import exc 

875from ... import Identity 

876from ... import schema as sa_schema 

877from ... import Sequence 

878from ... import sql 

879from ... import text 

880from ... import types as sqltypes 

881from ... import util 

882from ...engine import cursor as _cursor 

883from ...engine import default 

884from ...engine import reflection 

885from ...sql import coercions 

886from ...sql import compiler 

887from ...sql import elements 

888from ...sql import expression 

889from ...sql import func 

890from ...sql import quoted_name 

891from ...sql import roles 

892from ...sql import util as sql_util 

893from ...types import BIGINT 

894from ...types import BINARY 

895from ...types import CHAR 

896from ...types import DATE 

897from ...types import DATETIME 

898from ...types import DECIMAL 

899from ...types import FLOAT 

900from ...types import INTEGER 

901from ...types import NCHAR 

902from ...types import NUMERIC 

903from ...types import NVARCHAR 

904from ...types import SMALLINT 

905from ...types import TEXT 

906from ...types import VARCHAR 

907from ...util import compat 

908from ...util import update_wrapper 

909from ...util.langhelpers import public_factory 

910 

911 

912# https://sqlserverbuilds.blogspot.com/ 

913MS_2017_VERSION = (14,) 

914MS_2016_VERSION = (13,) 

915MS_2014_VERSION = (12,) 

916MS_2012_VERSION = (11,) 

917MS_2008_VERSION = (10,) 

918MS_2005_VERSION = (9,) 

919MS_2000_VERSION = (8,) 

920 

921RESERVED_WORDS = set( 

922 [ 

923 "add", 

924 "all", 

925 "alter", 

926 "and", 

927 "any", 

928 "as", 

929 "asc", 

930 "authorization", 

931 "backup", 

932 "begin", 

933 "between", 

934 "break", 

935 "browse", 

936 "bulk", 

937 "by", 

938 "cascade", 

939 "case", 

940 "check", 

941 "checkpoint", 

942 "close", 

943 "clustered", 

944 "coalesce", 

945 "collate", 

946 "column", 

947 "commit", 

948 "compute", 

949 "constraint", 

950 "contains", 

951 "containstable", 

952 "continue", 

953 "convert", 

954 "create", 

955 "cross", 

956 "current", 

957 "current_date", 

958 "current_time", 

959 "current_timestamp", 

960 "current_user", 

961 "cursor", 

962 "database", 

963 "dbcc", 

964 "deallocate", 

965 "declare", 

966 "default", 

967 "delete", 

968 "deny", 

969 "desc", 

970 "disk", 

971 "distinct", 

972 "distributed", 

973 "double", 

974 "drop", 

975 "dump", 

976 "else", 

977 "end", 

978 "errlvl", 

979 "escape", 

980 "except", 

981 "exec", 

982 "execute", 

983 "exists", 

984 "exit", 

985 "external", 

986 "fetch", 

987 "file", 

988 "fillfactor", 

989 "for", 

990 "foreign", 

991 "freetext", 

992 "freetexttable", 

993 "from", 

994 "full", 

995 "function", 

996 "goto", 

997 "grant", 

998 "group", 

999 "having", 

1000 "holdlock", 

1001 "identity", 

1002 "identity_insert", 

1003 "identitycol", 

1004 "if", 

1005 "in", 

1006 "index", 

1007 "inner", 

1008 "insert", 

1009 "intersect", 

1010 "into", 

1011 "is", 

1012 "join", 

1013 "key", 

1014 "kill", 

1015 "left", 

1016 "like", 

1017 "lineno", 

1018 "load", 

1019 "merge", 

1020 "national", 

1021 "nocheck", 

1022 "nonclustered", 

1023 "not", 

1024 "null", 

1025 "nullif", 

1026 "of", 

1027 "off", 

1028 "offsets", 

1029 "on", 

1030 "open", 

1031 "opendatasource", 

1032 "openquery", 

1033 "openrowset", 

1034 "openxml", 

1035 "option", 

1036 "or", 

1037 "order", 

1038 "outer", 

1039 "over", 

1040 "percent", 

1041 "pivot", 

1042 "plan", 

1043 "precision", 

1044 "primary", 

1045 "print", 

1046 "proc", 

1047 "procedure", 

1048 "public", 

1049 "raiserror", 

1050 "read", 

1051 "readtext", 

1052 "reconfigure", 

1053 "references", 

1054 "replication", 

1055 "restore", 

1056 "restrict", 

1057 "return", 

1058 "revert", 

1059 "revoke", 

1060 "right", 

1061 "rollback", 

1062 "rowcount", 

1063 "rowguidcol", 

1064 "rule", 

1065 "save", 

1066 "schema", 

1067 "securityaudit", 

1068 "select", 

1069 "session_user", 

1070 "set", 

1071 "setuser", 

1072 "shutdown", 

1073 "some", 

1074 "statistics", 

1075 "system_user", 

1076 "table", 

1077 "tablesample", 

1078 "textsize", 

1079 "then", 

1080 "to", 

1081 "top", 

1082 "tran", 

1083 "transaction", 

1084 "trigger", 

1085 "truncate", 

1086 "tsequal", 

1087 "union", 

1088 "unique", 

1089 "unpivot", 

1090 "update", 

1091 "updatetext", 

1092 "use", 

1093 "user", 

1094 "values", 

1095 "varying", 

1096 "view", 

1097 "waitfor", 

1098 "when", 

1099 "where", 

1100 "while", 

1101 "with", 

1102 "writetext", 

1103 ] 

1104) 

1105 

1106 

1107class REAL(sqltypes.REAL): 

1108 __visit_name__ = "REAL" 

1109 

1110 def __init__(self, **kw): 

1111 # REAL is a synonym for FLOAT(24) on SQL server. 

1112 # it is only accepted as the word "REAL" in DDL, the numeric 

1113 # precision value is not allowed to be present 

1114 kw.setdefault("precision", 24) 

1115 super(REAL, self).__init__(**kw) 

1116 

1117 

1118class TINYINT(sqltypes.Integer): 

1119 __visit_name__ = "TINYINT" 

1120 

1121 

1122# MSSQL DATE/TIME types have varied behavior, sometimes returning 

1123# strings. MSDate/TIME check for everything, and always 

1124# filter bind parameters into datetime objects (required by pyodbc, 

1125# not sure about other dialects). 

1126 

1127 

1128class _MSDate(sqltypes.Date): 

1129 def bind_processor(self, dialect): 

1130 def process(value): 

1131 if type(value) == datetime.date: 

1132 return datetime.datetime(value.year, value.month, value.day) 

1133 else: 

1134 return value 

1135 

1136 return process 

1137 

1138 _reg = re.compile(r"(\d+)-(\d+)-(\d+)") 

1139 

1140 def result_processor(self, dialect, coltype): 

1141 def process(value): 

1142 if isinstance(value, datetime.datetime): 

1143 return value.date() 

1144 elif isinstance(value, util.string_types): 

1145 m = self._reg.match(value) 

1146 if not m: 

1147 raise ValueError( 

1148 "could not parse %r as a date value" % (value,) 

1149 ) 

1150 return datetime.date(*[int(x or 0) for x in m.groups()]) 

1151 else: 

1152 return value 

1153 

1154 return process 

1155 

1156 

1157class TIME(sqltypes.TIME): 

1158 def __init__(self, precision=None, **kwargs): 

1159 self.precision = precision 

1160 super(TIME, self).__init__() 

1161 

1162 __zero_date = datetime.date(1900, 1, 1) 

1163 

1164 def bind_processor(self, dialect): 

1165 def process(value): 

1166 if isinstance(value, datetime.datetime): 

1167 value = datetime.datetime.combine( 

1168 self.__zero_date, value.time() 

1169 ) 

1170 elif isinstance(value, datetime.time): 

1171 """issue #5339 

1172 per: https://github.com/mkleehammer/pyodbc/wiki/Tips-and-Tricks-by-Database-Platform#time-columns 

1173 pass TIME value as string 

1174 """ # noqa 

1175 value = str(value) 

1176 return value 

1177 

1178 return process 

1179 

1180 _reg = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d{0,6}))?") 

1181 

1182 def result_processor(self, dialect, coltype): 

1183 def process(value): 

1184 if isinstance(value, datetime.datetime): 

1185 return value.time() 

1186 elif isinstance(value, util.string_types): 

1187 m = self._reg.match(value) 

1188 if not m: 

1189 raise ValueError( 

1190 "could not parse %r as a time value" % (value,) 

1191 ) 

1192 return datetime.time(*[int(x or 0) for x in m.groups()]) 

1193 else: 

1194 return value 

1195 

1196 return process 

1197 

1198 

1199_MSTime = TIME 

1200 

1201 

1202class _BASETIMEIMPL(TIME): 

1203 __visit_name__ = "_BASETIMEIMPL" 

1204 

1205 

1206class _DateTimeBase(object): 

1207 def bind_processor(self, dialect): 

1208 def process(value): 

1209 if type(value) == datetime.date: 

1210 return datetime.datetime(value.year, value.month, value.day) 

1211 else: 

1212 return value 

1213 

1214 return process 

1215 

1216 

1217class _MSDateTime(_DateTimeBase, sqltypes.DateTime): 

1218 pass 

1219 

1220 

1221class SMALLDATETIME(_DateTimeBase, sqltypes.DateTime): 

1222 __visit_name__ = "SMALLDATETIME" 

1223 

1224 

1225class DATETIME2(_DateTimeBase, sqltypes.DateTime): 

1226 __visit_name__ = "DATETIME2" 

1227 

1228 def __init__(self, precision=None, **kw): 

1229 super(DATETIME2, self).__init__(**kw) 

1230 self.precision = precision 

1231 

1232 

1233class DATETIMEOFFSET(_DateTimeBase, sqltypes.DateTime): 

1234 __visit_name__ = "DATETIMEOFFSET" 

1235 

1236 def __init__(self, precision=None, **kw): 

1237 super(DATETIMEOFFSET, self).__init__(**kw) 

1238 self.precision = precision 

1239 

1240 

1241class _UnicodeLiteral(object): 

1242 def literal_processor(self, dialect): 

1243 def process(value): 

1244 

1245 value = value.replace("'", "''") 

1246 

1247 if dialect.identifier_preparer._double_percents: 

1248 value = value.replace("%", "%%") 

1249 

1250 return "N'%s'" % value 

1251 

1252 return process 

1253 

1254 

1255class _MSUnicode(_UnicodeLiteral, sqltypes.Unicode): 

1256 pass 

1257 

1258 

1259class _MSUnicodeText(_UnicodeLiteral, sqltypes.UnicodeText): 

1260 pass 

1261 

1262 

1263class TIMESTAMP(sqltypes._Binary): 

1264 """Implement the SQL Server TIMESTAMP type. 

1265 

1266 Note this is **completely different** than the SQL Standard 

1267 TIMESTAMP type, which is not supported by SQL Server. It 

1268 is a read-only datatype that does not support INSERT of values. 

1269 

1270 .. versionadded:: 1.2 

1271 

1272 .. seealso:: 

1273 

1274 :class:`_mssql.ROWVERSION` 

1275 

1276 """ 

1277 

1278 __visit_name__ = "TIMESTAMP" 

1279 

1280 # expected by _Binary to be present 

1281 length = None 

1282 

1283 def __init__(self, convert_int=False): 

1284 """Construct a TIMESTAMP or ROWVERSION type. 

1285 

1286 :param convert_int: if True, binary integer values will 

1287 be converted to integers on read. 

1288 

1289 .. versionadded:: 1.2 

1290 

1291 """ 

1292 self.convert_int = convert_int 

1293 

1294 def result_processor(self, dialect, coltype): 

1295 super_ = super(TIMESTAMP, self).result_processor(dialect, coltype) 

1296 if self.convert_int: 

1297 

1298 def process(value): 

1299 value = super_(value) 

1300 if value is not None: 

1301 # https://stackoverflow.com/a/30403242/34549 

1302 value = int(codecs.encode(value, "hex"), 16) 

1303 return value 

1304 

1305 return process 

1306 else: 

1307 return super_ 

1308 

1309 

1310class ROWVERSION(TIMESTAMP): 

1311 """Implement the SQL Server ROWVERSION type. 

1312 

1313 The ROWVERSION datatype is a SQL Server synonym for the TIMESTAMP 

1314 datatype, however current SQL Server documentation suggests using 

1315 ROWVERSION for new datatypes going forward. 

1316 

1317 The ROWVERSION datatype does **not** reflect (e.g. introspect) from the 

1318 database as itself; the returned datatype will be 

1319 :class:`_mssql.TIMESTAMP`. 

1320 

1321 This is a read-only datatype that does not support INSERT of values. 

1322 

1323 .. versionadded:: 1.2 

1324 

1325 .. seealso:: 

1326 

1327 :class:`_mssql.TIMESTAMP` 

1328 

1329 """ 

1330 

1331 __visit_name__ = "ROWVERSION" 

1332 

1333 

1334class NTEXT(sqltypes.UnicodeText): 

1335 

1336 """MSSQL NTEXT type, for variable-length unicode text up to 2^30 

1337 characters.""" 

1338 

1339 __visit_name__ = "NTEXT" 

1340 

1341 

1342class VARBINARY(sqltypes.VARBINARY, sqltypes.LargeBinary): 

1343 """The MSSQL VARBINARY type. 

1344 

1345 This type adds additional features to the core :class:`_types.VARBINARY` 

1346 type, including "deprecate_large_types" mode where 

1347 either ``VARBINARY(max)`` or IMAGE is rendered, as well as the SQL 

1348 Server ``FILESTREAM`` option. 

1349 

1350 .. versionadded:: 1.0.0 

1351 

1352 .. seealso:: 

1353 

1354 :ref:`mssql_large_type_deprecation` 

1355 

1356 """ 

1357 

1358 __visit_name__ = "VARBINARY" 

1359 

1360 def __init__(self, length=None, filestream=False): 

1361 """ 

1362 Construct a VARBINARY type. 

1363 

1364 :param length: optional, a length for the column for use in 

1365 DDL statements, for those binary types that accept a length, 

1366 such as the MySQL BLOB type. 

1367 

1368 :param filestream=False: if True, renders the ``FILESTREAM`` keyword 

1369 in the table definition. In this case ``length`` must be ``None`` 

1370 or ``'max'``. 

1371 

1372 .. versionadded:: 1.4.31 

1373 

1374 """ 

1375 

1376 self.filestream = filestream 

1377 if self.filestream and length not in (None, "max"): 

1378 raise ValueError( 

1379 "length must be None or 'max' when setting filestream" 

1380 ) 

1381 super(VARBINARY, self).__init__(length=length) 

1382 

1383 

1384class IMAGE(sqltypes.LargeBinary): 

1385 __visit_name__ = "IMAGE" 

1386 

1387 

1388class XML(sqltypes.Text): 

1389 """MSSQL XML type. 

1390 

1391 This is a placeholder type for reflection purposes that does not include 

1392 any Python-side datatype support. It also does not currently support 

1393 additional arguments, such as "CONTENT", "DOCUMENT", 

1394 "xml_schema_collection". 

1395 

1396 .. versionadded:: 1.1.11 

1397 

1398 """ 

1399 

1400 __visit_name__ = "XML" 

1401 

1402 

1403class BIT(sqltypes.Boolean): 

1404 """MSSQL BIT type. 

1405 

1406 Both pyodbc and pymssql return values from BIT columns as 

1407 Python <class 'bool'> so just subclass Boolean. 

1408 

1409 """ 

1410 

1411 __visit_name__ = "BIT" 

1412 

1413 

1414class MONEY(sqltypes.TypeEngine): 

1415 __visit_name__ = "MONEY" 

1416 

1417 

1418class SMALLMONEY(sqltypes.TypeEngine): 

1419 __visit_name__ = "SMALLMONEY" 

1420 

1421 

1422class UNIQUEIDENTIFIER(sqltypes.TypeEngine): 

1423 __visit_name__ = "UNIQUEIDENTIFIER" 

1424 

1425 

1426class SQL_VARIANT(sqltypes.TypeEngine): 

1427 __visit_name__ = "SQL_VARIANT" 

1428 

1429 

1430class TryCast(sql.elements.Cast): 

1431 """Represent a SQL Server TRY_CAST expression.""" 

1432 

1433 __visit_name__ = "try_cast" 

1434 

1435 stringify_dialect = "mssql" 

1436 inherit_cache = True 

1437 

1438 def __init__(self, *arg, **kw): 

1439 """Create a TRY_CAST expression. 

1440 

1441 :class:`.TryCast` is a subclass of SQLAlchemy's :class:`.Cast` 

1442 construct, and works in the same way, except that the SQL expression 

1443 rendered is "TRY_CAST" rather than "CAST":: 

1444 

1445 from sqlalchemy import select 

1446 from sqlalchemy import Numeric 

1447 from sqlalchemy.dialects.mssql import try_cast 

1448 

1449 stmt = select( 

1450 try_cast(product_table.c.unit_price, Numeric(10, 4)) 

1451 ) 

1452 

1453 The above would render:: 

1454 

1455 SELECT TRY_CAST (product_table.unit_price AS NUMERIC(10, 4)) 

1456 FROM product_table 

1457 

1458 .. versionadded:: 1.3.7 

1459 

1460 """ 

1461 super(TryCast, self).__init__(*arg, **kw) 

1462 

1463 

1464try_cast = public_factory(TryCast, ".dialects.mssql.try_cast") 

1465 

1466# old names. 

1467MSDateTime = _MSDateTime 

1468MSDate = _MSDate 

1469MSReal = REAL 

1470MSTinyInteger = TINYINT 

1471MSTime = TIME 

1472MSSmallDateTime = SMALLDATETIME 

1473MSDateTime2 = DATETIME2 

1474MSDateTimeOffset = DATETIMEOFFSET 

1475MSText = TEXT 

1476MSNText = NTEXT 

1477MSString = VARCHAR 

1478MSNVarchar = NVARCHAR 

1479MSChar = CHAR 

1480MSNChar = NCHAR 

1481MSBinary = BINARY 

1482MSVarBinary = VARBINARY 

1483MSImage = IMAGE 

1484MSBit = BIT 

1485MSMoney = MONEY 

1486MSSmallMoney = SMALLMONEY 

1487MSUniqueIdentifier = UNIQUEIDENTIFIER 

1488MSVariant = SQL_VARIANT 

1489 

1490ischema_names = { 

1491 "int": INTEGER, 

1492 "bigint": BIGINT, 

1493 "smallint": SMALLINT, 

1494 "tinyint": TINYINT, 

1495 "varchar": VARCHAR, 

1496 "nvarchar": NVARCHAR, 

1497 "char": CHAR, 

1498 "nchar": NCHAR, 

1499 "text": TEXT, 

1500 "ntext": NTEXT, 

1501 "decimal": DECIMAL, 

1502 "numeric": NUMERIC, 

1503 "float": FLOAT, 

1504 "datetime": DATETIME, 

1505 "datetime2": DATETIME2, 

1506 "datetimeoffset": DATETIMEOFFSET, 

1507 "date": DATE, 

1508 "time": TIME, 

1509 "smalldatetime": SMALLDATETIME, 

1510 "binary": BINARY, 

1511 "varbinary": VARBINARY, 

1512 "bit": BIT, 

1513 "real": REAL, 

1514 "image": IMAGE, 

1515 "xml": XML, 

1516 "timestamp": TIMESTAMP, 

1517 "money": MONEY, 

1518 "smallmoney": SMALLMONEY, 

1519 "uniqueidentifier": UNIQUEIDENTIFIER, 

1520 "sql_variant": SQL_VARIANT, 

1521} 

1522 

1523 

1524class MSTypeCompiler(compiler.GenericTypeCompiler): 

1525 def _extend(self, spec, type_, length=None): 

1526 """Extend a string-type declaration with standard SQL 

1527 COLLATE annotations. 

1528 

1529 """ 

1530 

1531 if getattr(type_, "collation", None): 

1532 collation = "COLLATE %s" % type_.collation 

1533 else: 

1534 collation = None 

1535 

1536 if not length: 

1537 length = type_.length 

1538 

1539 if length: 

1540 spec = spec + "(%s)" % length 

1541 

1542 return " ".join([c for c in (spec, collation) if c is not None]) 

1543 

1544 def visit_FLOAT(self, type_, **kw): 

1545 precision = getattr(type_, "precision", None) 

1546 if precision is None: 

1547 return "FLOAT" 

1548 else: 

1549 return "FLOAT(%(precision)s)" % {"precision": precision} 

1550 

1551 def visit_TINYINT(self, type_, **kw): 

1552 return "TINYINT" 

1553 

1554 def visit_TIME(self, type_, **kw): 

1555 precision = getattr(type_, "precision", None) 

1556 if precision is not None: 

1557 return "TIME(%s)" % precision 

1558 else: 

1559 return "TIME" 

1560 

1561 def visit_TIMESTAMP(self, type_, **kw): 

1562 return "TIMESTAMP" 

1563 

1564 def visit_ROWVERSION(self, type_, **kw): 

1565 return "ROWVERSION" 

1566 

1567 def visit_datetime(self, type_, **kw): 

1568 if type_.timezone: 

1569 return self.visit_DATETIMEOFFSET(type_, **kw) 

1570 else: 

1571 return self.visit_DATETIME(type_, **kw) 

1572 

1573 def visit_DATETIMEOFFSET(self, type_, **kw): 

1574 precision = getattr(type_, "precision", None) 

1575 if precision is not None: 

1576 return "DATETIMEOFFSET(%s)" % type_.precision 

1577 else: 

1578 return "DATETIMEOFFSET" 

1579 

1580 def visit_DATETIME2(self, type_, **kw): 

1581 precision = getattr(type_, "precision", None) 

1582 if precision is not None: 

1583 return "DATETIME2(%s)" % precision 

1584 else: 

1585 return "DATETIME2" 

1586 

1587 def visit_SMALLDATETIME(self, type_, **kw): 

1588 return "SMALLDATETIME" 

1589 

1590 def visit_unicode(self, type_, **kw): 

1591 return self.visit_NVARCHAR(type_, **kw) 

1592 

1593 def visit_text(self, type_, **kw): 

1594 if self.dialect.deprecate_large_types: 

1595 return self.visit_VARCHAR(type_, **kw) 

1596 else: 

1597 return self.visit_TEXT(type_, **kw) 

1598 

1599 def visit_unicode_text(self, type_, **kw): 

1600 if self.dialect.deprecate_large_types: 

1601 return self.visit_NVARCHAR(type_, **kw) 

1602 else: 

1603 return self.visit_NTEXT(type_, **kw) 

1604 

1605 def visit_NTEXT(self, type_, **kw): 

1606 return self._extend("NTEXT", type_) 

1607 

1608 def visit_TEXT(self, type_, **kw): 

1609 return self._extend("TEXT", type_) 

1610 

1611 def visit_VARCHAR(self, type_, **kw): 

1612 return self._extend("VARCHAR", type_, length=type_.length or "max") 

1613 

1614 def visit_CHAR(self, type_, **kw): 

1615 return self._extend("CHAR", type_) 

1616 

1617 def visit_NCHAR(self, type_, **kw): 

1618 return self._extend("NCHAR", type_) 

1619 

1620 def visit_NVARCHAR(self, type_, **kw): 

1621 return self._extend("NVARCHAR", type_, length=type_.length or "max") 

1622 

1623 def visit_date(self, type_, **kw): 

1624 if self.dialect.server_version_info < MS_2008_VERSION: 

1625 return self.visit_DATETIME(type_, **kw) 

1626 else: 

1627 return self.visit_DATE(type_, **kw) 

1628 

1629 def visit__BASETIMEIMPL(self, type_, **kw): 

1630 return self.visit_time(type_, **kw) 

1631 

1632 def visit_time(self, type_, **kw): 

1633 if self.dialect.server_version_info < MS_2008_VERSION: 

1634 return self.visit_DATETIME(type_, **kw) 

1635 else: 

1636 return self.visit_TIME(type_, **kw) 

1637 

1638 def visit_large_binary(self, type_, **kw): 

1639 if self.dialect.deprecate_large_types: 

1640 return self.visit_VARBINARY(type_, **kw) 

1641 else: 

1642 return self.visit_IMAGE(type_, **kw) 

1643 

1644 def visit_IMAGE(self, type_, **kw): 

1645 return "IMAGE" 

1646 

1647 def visit_XML(self, type_, **kw): 

1648 return "XML" 

1649 

1650 def visit_VARBINARY(self, type_, **kw): 

1651 text = self._extend("VARBINARY", type_, length=type_.length or "max") 

1652 if getattr(type_, "filestream", False): 

1653 text += " FILESTREAM" 

1654 return text 

1655 

1656 def visit_boolean(self, type_, **kw): 

1657 return self.visit_BIT(type_) 

1658 

1659 def visit_BIT(self, type_, **kw): 

1660 return "BIT" 

1661 

1662 def visit_JSON(self, type_, **kw): 

1663 # this is a bit of a break with SQLAlchemy's convention of 

1664 # "UPPERCASE name goes to UPPERCASE type name with no modification" 

1665 return self._extend("NVARCHAR", type_, length="max") 

1666 

1667 def visit_MONEY(self, type_, **kw): 

1668 return "MONEY" 

1669 

1670 def visit_SMALLMONEY(self, type_, **kw): 

1671 return "SMALLMONEY" 

1672 

1673 def visit_UNIQUEIDENTIFIER(self, type_, **kw): 

1674 return "UNIQUEIDENTIFIER" 

1675 

1676 def visit_SQL_VARIANT(self, type_, **kw): 

1677 return "SQL_VARIANT" 

1678 

1679 

1680class MSExecutionContext(default.DefaultExecutionContext): 

1681 _enable_identity_insert = False 

1682 _select_lastrowid = False 

1683 _lastrowid = None 

1684 _rowcount = None 

1685 

1686 def _opt_encode(self, statement): 

1687 

1688 if not self.dialect.supports_unicode_statements: 

1689 encoded = self.dialect._encoder(statement)[0] 

1690 else: 

1691 encoded = statement 

1692 

1693 if self.compiled and self.compiled.schema_translate_map: 

1694 

1695 rst = self.compiled.preparer._render_schema_translates 

1696 encoded = rst(encoded, self.compiled.schema_translate_map) 

1697 

1698 return encoded 

1699 

1700 def pre_exec(self): 

1701 """Activate IDENTITY_INSERT if needed.""" 

1702 

1703 if self.isinsert: 

1704 tbl = self.compiled.compile_state.dml_table 

1705 id_column = tbl._autoincrement_column 

1706 insert_has_identity = (id_column is not None) and ( 

1707 not isinstance(id_column.default, Sequence) 

1708 ) 

1709 

1710 if insert_has_identity: 

1711 compile_state = self.compiled.dml_compile_state 

1712 self._enable_identity_insert = ( 

1713 id_column.key in self.compiled_parameters[0] 

1714 ) or ( 

1715 compile_state._dict_parameters 

1716 and (id_column.key in compile_state._insert_col_keys) 

1717 ) 

1718 

1719 else: 

1720 self._enable_identity_insert = False 

1721 

1722 self._select_lastrowid = ( 

1723 not self.compiled.inline 

1724 and insert_has_identity 

1725 and not self.compiled.returning 

1726 and not self._enable_identity_insert 

1727 and not self.executemany 

1728 ) 

1729 

1730 if self._enable_identity_insert: 

1731 self.root_connection._cursor_execute( 

1732 self.cursor, 

1733 self._opt_encode( 

1734 "SET IDENTITY_INSERT %s ON" 

1735 % self.identifier_preparer.format_table(tbl) 

1736 ), 

1737 (), 

1738 self, 

1739 ) 

1740 

1741 def post_exec(self): 

1742 """Disable IDENTITY_INSERT if enabled.""" 

1743 

1744 conn = self.root_connection 

1745 

1746 if self.isinsert or self.isupdate or self.isdelete: 

1747 self._rowcount = self.cursor.rowcount 

1748 

1749 if self._select_lastrowid: 

1750 if self.dialect.use_scope_identity: 

1751 conn._cursor_execute( 

1752 self.cursor, 

1753 "SELECT scope_identity() AS lastrowid", 

1754 (), 

1755 self, 

1756 ) 

1757 else: 

1758 conn._cursor_execute( 

1759 self.cursor, "SELECT @@identity AS lastrowid", (), self 

1760 ) 

1761 # fetchall() ensures the cursor is consumed without closing it 

1762 row = self.cursor.fetchall()[0] 

1763 self._lastrowid = int(row[0]) 

1764 

1765 elif ( 

1766 self.isinsert or self.isupdate or self.isdelete 

1767 ) and self.compiled.returning: 

1768 self.cursor_fetch_strategy = ( 

1769 _cursor.FullyBufferedCursorFetchStrategy( 

1770 self.cursor, 

1771 self.cursor.description, 

1772 self.cursor.fetchall(), 

1773 ) 

1774 ) 

1775 

1776 if self._enable_identity_insert: 

1777 conn._cursor_execute( 

1778 self.cursor, 

1779 self._opt_encode( 

1780 "SET IDENTITY_INSERT %s OFF" 

1781 % self.identifier_preparer.format_table( 

1782 self.compiled.compile_state.dml_table 

1783 ) 

1784 ), 

1785 (), 

1786 self, 

1787 ) 

1788 

1789 def get_lastrowid(self): 

1790 return self._lastrowid 

1791 

1792 @property 

1793 def rowcount(self): 

1794 if self._rowcount is not None: 

1795 return self._rowcount 

1796 else: 

1797 return self.cursor.rowcount 

1798 

1799 def handle_dbapi_exception(self, e): 

1800 if self._enable_identity_insert: 

1801 try: 

1802 self.cursor.execute( 

1803 self._opt_encode( 

1804 "SET IDENTITY_INSERT %s OFF" 

1805 % self.identifier_preparer.format_table( 

1806 self.compiled.compile_state.dml_table 

1807 ) 

1808 ) 

1809 ) 

1810 except Exception: 

1811 pass 

1812 

1813 def fire_sequence(self, seq, type_): 

1814 return self._execute_scalar( 

1815 ( 

1816 "SELECT NEXT VALUE FOR %s" 

1817 % self.identifier_preparer.format_sequence(seq) 

1818 ), 

1819 type_, 

1820 ) 

1821 

1822 def get_insert_default(self, column): 

1823 if ( 

1824 isinstance(column, sa_schema.Column) 

1825 and column is column.table._autoincrement_column 

1826 and isinstance(column.default, sa_schema.Sequence) 

1827 and column.default.optional 

1828 ): 

1829 return None 

1830 return super(MSExecutionContext, self).get_insert_default(column) 

1831 

1832 

1833class MSSQLCompiler(compiler.SQLCompiler): 

1834 returning_precedes_values = True 

1835 

1836 extract_map = util.update_copy( 

1837 compiler.SQLCompiler.extract_map, 

1838 { 

1839 "doy": "dayofyear", 

1840 "dow": "weekday", 

1841 "milliseconds": "millisecond", 

1842 "microseconds": "microsecond", 

1843 }, 

1844 ) 

1845 

1846 def __init__(self, *args, **kwargs): 

1847 self.tablealiases = {} 

1848 super(MSSQLCompiler, self).__init__(*args, **kwargs) 

1849 

1850 def _with_legacy_schema_aliasing(fn): 

1851 def decorate(self, *arg, **kw): 

1852 if self.dialect.legacy_schema_aliasing: 

1853 return fn(self, *arg, **kw) 

1854 else: 

1855 super_ = getattr(super(MSSQLCompiler, self), fn.__name__) 

1856 return super_(*arg, **kw) 

1857 

1858 return decorate 

1859 

1860 def visit_now_func(self, fn, **kw): 

1861 return "CURRENT_TIMESTAMP" 

1862 

1863 def visit_current_date_func(self, fn, **kw): 

1864 return "GETDATE()" 

1865 

1866 def visit_length_func(self, fn, **kw): 

1867 return "LEN%s" % self.function_argspec(fn, **kw) 

1868 

1869 def visit_char_length_func(self, fn, **kw): 

1870 return "LEN%s" % self.function_argspec(fn, **kw) 

1871 

1872 def visit_concat_op_binary(self, binary, operator, **kw): 

1873 return "%s + %s" % ( 

1874 self.process(binary.left, **kw), 

1875 self.process(binary.right, **kw), 

1876 ) 

1877 

1878 def visit_true(self, expr, **kw): 

1879 return "1" 

1880 

1881 def visit_false(self, expr, **kw): 

1882 return "0" 

1883 

1884 def visit_match_op_binary(self, binary, operator, **kw): 

1885 return "CONTAINS (%s, %s)" % ( 

1886 self.process(binary.left, **kw), 

1887 self.process(binary.right, **kw), 

1888 ) 

1889 

1890 def get_select_precolumns(self, select, **kw): 

1891 """MS-SQL puts TOP, it's version of LIMIT here""" 

1892 

1893 s = super(MSSQLCompiler, self).get_select_precolumns(select, **kw) 

1894 

1895 if select._has_row_limiting_clause and self._use_top(select): 

1896 # ODBC drivers and possibly others 

1897 # don't support bind params in the SELECT clause on SQL Server. 

1898 # so have to use literal here. 

1899 kw["literal_execute"] = True 

1900 s += "TOP %s " % self.process( 

1901 self._get_limit_or_fetch(select), **kw 

1902 ) 

1903 if select._fetch_clause is not None: 

1904 if select._fetch_clause_options["percent"]: 

1905 s += "PERCENT " 

1906 if select._fetch_clause_options["with_ties"]: 

1907 s += "WITH TIES " 

1908 

1909 return s 

1910 

1911 def get_from_hint_text(self, table, text): 

1912 return text 

1913 

1914 def get_crud_hint_text(self, table, text): 

1915 return text 

1916 

1917 def _get_limit_or_fetch(self, select): 

1918 if select._fetch_clause is None: 

1919 return select._limit_clause 

1920 else: 

1921 return select._fetch_clause 

1922 

1923 def _use_top(self, select): 

1924 return (select._offset_clause is None) and ( 

1925 select._simple_int_clause(select._limit_clause) 

1926 or ( 

1927 # limit can use TOP with is by itself. fetch only uses TOP 

1928 # when it needs to because of PERCENT and/or WITH TIES 

1929 select._simple_int_clause(select._fetch_clause) 

1930 and ( 

1931 select._fetch_clause_options["percent"] 

1932 or select._fetch_clause_options["with_ties"] 

1933 ) 

1934 ) 

1935 ) 

1936 

1937 def fetch_clause(self, cs, **kwargs): 

1938 return "" 

1939 

1940 def limit_clause(self, cs, **kwargs): 

1941 return "" 

1942 

1943 def _check_can_use_fetch_limit(self, select): 

1944 # to use ROW_NUMBER(), an ORDER BY is required. 

1945 # OFFSET are FETCH are options of the ORDER BY clause 

1946 if not select._order_by_clause.clauses: 

1947 raise exc.CompileError( 

1948 "MSSQL requires an order_by when " 

1949 "using an OFFSET or a non-simple " 

1950 "LIMIT clause" 

1951 ) 

1952 

1953 if select._fetch_clause_options is not None and ( 

1954 select._fetch_clause_options["percent"] 

1955 or select._fetch_clause_options["with_ties"] 

1956 ): 

1957 raise exc.CompileError( 

1958 "MSSQL needs TOP to use PERCENT and/or WITH TIES. " 

1959 "Only simple fetch without offset can be used." 

1960 ) 

1961 

1962 def _row_limit_clause(self, select, **kw): 

1963 """MSSQL 2012 supports OFFSET/FETCH operators 

1964 Use it instead subquery with row_number 

1965 

1966 """ 

1967 

1968 if self.dialect._supports_offset_fetch and not self._use_top(select): 

1969 self._check_can_use_fetch_limit(select) 

1970 

1971 text = "" 

1972 

1973 if select._offset_clause is not None: 

1974 offset_str = self.process(select._offset_clause, **kw) 

1975 else: 

1976 offset_str = "0" 

1977 text += "\n OFFSET %s ROWS" % offset_str 

1978 

1979 limit = self._get_limit_or_fetch(select) 

1980 

1981 if limit is not None: 

1982 text += "\n FETCH FIRST %s ROWS ONLY" % self.process( 

1983 limit, **kw 

1984 ) 

1985 return text 

1986 else: 

1987 return "" 

1988 

1989 def visit_try_cast(self, element, **kw): 

1990 return "TRY_CAST (%s AS %s)" % ( 

1991 self.process(element.clause, **kw), 

1992 self.process(element.typeclause, **kw), 

1993 ) 

1994 

1995 def translate_select_structure(self, select_stmt, **kwargs): 

1996 """Look for ``LIMIT`` and OFFSET in a select statement, and if 

1997 so tries to wrap it in a subquery with ``row_number()`` criterion. 

1998 MSSQL 2012 and above are excluded 

1999 

2000 """ 

2001 select = select_stmt 

2002 

2003 if ( 

2004 select._has_row_limiting_clause 

2005 and not self.dialect._supports_offset_fetch 

2006 and not self._use_top(select) 

2007 and not getattr(select, "_mssql_visit", None) 

2008 ): 

2009 self._check_can_use_fetch_limit(select) 

2010 

2011 _order_by_clauses = [ 

2012 sql_util.unwrap_label_reference(elem) 

2013 for elem in select._order_by_clause.clauses 

2014 ] 

2015 

2016 limit_clause = self._get_limit_or_fetch(select) 

2017 offset_clause = select._offset_clause 

2018 

2019 select = select._generate() 

2020 select._mssql_visit = True 

2021 select = ( 

2022 select.add_columns( 

2023 sql.func.ROW_NUMBER() 

2024 .over(order_by=_order_by_clauses) 

2025 .label("mssql_rn") 

2026 ) 

2027 .order_by(None) 

2028 .alias() 

2029 ) 

2030 

2031 mssql_rn = sql.column("mssql_rn") 

2032 limitselect = sql.select( 

2033 *[c for c in select.c if c.key != "mssql_rn"] 

2034 ) 

2035 if offset_clause is not None: 

2036 limitselect = limitselect.where(mssql_rn > offset_clause) 

2037 if limit_clause is not None: 

2038 limitselect = limitselect.where( 

2039 mssql_rn <= (limit_clause + offset_clause) 

2040 ) 

2041 else: 

2042 limitselect = limitselect.where(mssql_rn <= (limit_clause)) 

2043 return limitselect 

2044 else: 

2045 return select 

2046 

2047 @_with_legacy_schema_aliasing 

2048 def visit_table(self, table, mssql_aliased=False, iscrud=False, **kwargs): 

2049 if mssql_aliased is table or iscrud: 

2050 return super(MSSQLCompiler, self).visit_table(table, **kwargs) 

2051 

2052 # alias schema-qualified tables 

2053 alias = self._schema_aliased_table(table) 

2054 if alias is not None: 

2055 return self.process(alias, mssql_aliased=table, **kwargs) 

2056 else: 

2057 return super(MSSQLCompiler, self).visit_table(table, **kwargs) 

2058 

2059 @_with_legacy_schema_aliasing 

2060 def visit_alias(self, alias, **kw): 

2061 # translate for schema-qualified table aliases 

2062 kw["mssql_aliased"] = alias.element 

2063 return super(MSSQLCompiler, self).visit_alias(alias, **kw) 

2064 

2065 @_with_legacy_schema_aliasing 

2066 def visit_column(self, column, add_to_result_map=None, **kw): 

2067 if ( 

2068 column.table is not None 

2069 and (not self.isupdate and not self.isdelete) 

2070 or self.is_subquery() 

2071 ): 

2072 # translate for schema-qualified table aliases 

2073 t = self._schema_aliased_table(column.table) 

2074 if t is not None: 

2075 converted = elements._corresponding_column_or_error(t, column) 

2076 if add_to_result_map is not None: 

2077 add_to_result_map( 

2078 column.name, 

2079 column.name, 

2080 (column, column.name, column.key), 

2081 column.type, 

2082 ) 

2083 

2084 return super(MSSQLCompiler, self).visit_column(converted, **kw) 

2085 

2086 return super(MSSQLCompiler, self).visit_column( 

2087 column, add_to_result_map=add_to_result_map, **kw 

2088 ) 

2089 

2090 def _schema_aliased_table(self, table): 

2091 if getattr(table, "schema", None) is not None: 

2092 if table not in self.tablealiases: 

2093 self.tablealiases[table] = table.alias() 

2094 return self.tablealiases[table] 

2095 else: 

2096 return None 

2097 

2098 def visit_extract(self, extract, **kw): 

2099 field = self.extract_map.get(extract.field, extract.field) 

2100 return "DATEPART(%s, %s)" % (field, self.process(extract.expr, **kw)) 

2101 

2102 def visit_savepoint(self, savepoint_stmt): 

2103 return "SAVE TRANSACTION %s" % self.preparer.format_savepoint( 

2104 savepoint_stmt 

2105 ) 

2106 

2107 def visit_rollback_to_savepoint(self, savepoint_stmt): 

2108 return "ROLLBACK TRANSACTION %s" % self.preparer.format_savepoint( 

2109 savepoint_stmt 

2110 ) 

2111 

2112 def visit_binary(self, binary, **kwargs): 

2113 """Move bind parameters to the right-hand side of an operator, where 

2114 possible. 

2115 

2116 """ 

2117 if ( 

2118 isinstance(binary.left, expression.BindParameter) 

2119 and binary.operator == operator.eq 

2120 and not isinstance(binary.right, expression.BindParameter) 

2121 ): 

2122 return self.process( 

2123 expression.BinaryExpression( 

2124 binary.right, binary.left, binary.operator 

2125 ), 

2126 **kwargs 

2127 ) 

2128 return super(MSSQLCompiler, self).visit_binary(binary, **kwargs) 

2129 

2130 def returning_clause(self, stmt, returning_cols): 

2131 # SQL server returning clause requires that the columns refer to 

2132 # the virtual table names "inserted" or "deleted". Here, we make 

2133 # a simple alias of our table with that name, and then adapt the 

2134 # columns we have from the list of RETURNING columns to that new name 

2135 # so that they render as "inserted.<colname>" / "deleted.<colname>". 

2136 

2137 if self.isinsert or self.isupdate: 

2138 target = stmt.table.alias("inserted") 

2139 else: 

2140 target = stmt.table.alias("deleted") 

2141 

2142 adapter = sql_util.ClauseAdapter(target) 

2143 

2144 # adapter.traverse() takes a column from our target table and returns 

2145 # the one that is linked to the "inserted" / "deleted" tables. So in 

2146 # order to retrieve these values back from the result (e.g. like 

2147 # row[column]), tell the compiler to also add the original unadapted 

2148 # column to the result map. Before #4877, these were (unknowingly) 

2149 # falling back using string name matching in the result set which 

2150 # necessarily used an expensive KeyError in order to match. 

2151 

2152 columns = [ 

2153 self._label_returning_column( 

2154 stmt, 

2155 adapter.traverse(c), 

2156 {"result_map_targets": (c,)}, 

2157 fallback_label_name=c._non_anon_label, 

2158 ) 

2159 for c in expression._select_iterables(returning_cols) 

2160 ] 

2161 

2162 return "OUTPUT " + ", ".join(columns) 

2163 

2164 def get_cte_preamble(self, recursive): 

2165 # SQL Server finds it too inconvenient to accept 

2166 # an entirely optional, SQL standard specified, 

2167 # "RECURSIVE" word with their "WITH", 

2168 # so here we go 

2169 return "WITH" 

2170 

2171 def label_select_column(self, select, column, asfrom): 

2172 if isinstance(column, expression.Function): 

2173 return column.label(None) 

2174 else: 

2175 return super(MSSQLCompiler, self).label_select_column( 

2176 select, column, asfrom 

2177 ) 

2178 

2179 def for_update_clause(self, select, **kw): 

2180 # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which 

2181 # SQLAlchemy doesn't use 

2182 return "" 

2183 

2184 def order_by_clause(self, select, **kw): 

2185 # MSSQL only allows ORDER BY in subqueries if there is a LIMIT 

2186 if ( 

2187 self.is_subquery() 

2188 and not select._limit 

2189 and ( 

2190 select._offset is None 

2191 or not self.dialect._supports_offset_fetch 

2192 ) 

2193 ): 

2194 # avoid processing the order by clause if we won't end up 

2195 # using it, because we don't want all the bind params tacked 

2196 # onto the positional list if that is what the dbapi requires 

2197 return "" 

2198 

2199 order_by = self.process(select._order_by_clause, **kw) 

2200 

2201 if order_by: 

2202 return " ORDER BY " + order_by 

2203 else: 

2204 return "" 

2205 

2206 def update_from_clause( 

2207 self, update_stmt, from_table, extra_froms, from_hints, **kw 

2208 ): 

2209 """Render the UPDATE..FROM clause specific to MSSQL. 

2210 

2211 In MSSQL, if the UPDATE statement involves an alias of the table to 

2212 be updated, then the table itself must be added to the FROM list as 

2213 well. Otherwise, it is optional. Here, we add it regardless. 

2214 

2215 """ 

2216 return "FROM " + ", ".join( 

2217 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw) 

2218 for t in [from_table] + extra_froms 

2219 ) 

2220 

2221 def delete_table_clause(self, delete_stmt, from_table, extra_froms): 

2222 """If we have extra froms make sure we render any alias as hint.""" 

2223 ashint = False 

2224 if extra_froms: 

2225 ashint = True 

2226 return from_table._compiler_dispatch( 

2227 self, asfrom=True, iscrud=True, ashint=ashint 

2228 ) 

2229 

2230 def delete_extra_from_clause( 

2231 self, delete_stmt, from_table, extra_froms, from_hints, **kw 

2232 ): 

2233 """Render the DELETE .. FROM clause specific to MSSQL. 

2234 

2235 Yes, it has the FROM keyword twice. 

2236 

2237 """ 

2238 return "FROM " + ", ".join( 

2239 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw) 

2240 for t in [from_table] + extra_froms 

2241 ) 

2242 

2243 def visit_empty_set_expr(self, type_): 

2244 return "SELECT 1 WHERE 1!=1" 

2245 

2246 def visit_is_distinct_from_binary(self, binary, operator, **kw): 

2247 return "NOT EXISTS (SELECT %s INTERSECT SELECT %s)" % ( 

2248 self.process(binary.left), 

2249 self.process(binary.right), 

2250 ) 

2251 

2252 def visit_is_not_distinct_from_binary(self, binary, operator, **kw): 

2253 return "EXISTS (SELECT %s INTERSECT SELECT %s)" % ( 

2254 self.process(binary.left), 

2255 self.process(binary.right), 

2256 ) 

2257 

2258 def _render_json_extract_from_binary(self, binary, operator, **kw): 

2259 # note we are intentionally calling upon the process() calls in the 

2260 # order in which they appear in the SQL String as this is used 

2261 # by positional parameter rendering 

2262 

2263 if binary.type._type_affinity is sqltypes.JSON: 

2264 return "JSON_QUERY(%s, %s)" % ( 

2265 self.process(binary.left, **kw), 

2266 self.process(binary.right, **kw), 

2267 ) 

2268 

2269 # as with other dialects, start with an explicit test for NULL 

2270 case_expression = "CASE JSON_VALUE(%s, %s) WHEN NULL THEN NULL" % ( 

2271 self.process(binary.left, **kw), 

2272 self.process(binary.right, **kw), 

2273 ) 

2274 

2275 if binary.type._type_affinity is sqltypes.Integer: 

2276 type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS INTEGER)" % ( 

2277 self.process(binary.left, **kw), 

2278 self.process(binary.right, **kw), 

2279 ) 

2280 elif binary.type._type_affinity is sqltypes.Numeric: 

2281 type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS %s)" % ( 

2282 self.process(binary.left, **kw), 

2283 self.process(binary.right, **kw), 

2284 "FLOAT" 

2285 if isinstance(binary.type, sqltypes.Float) 

2286 else "NUMERIC(%s, %s)" 

2287 % (binary.type.precision, binary.type.scale), 

2288 ) 

2289 elif binary.type._type_affinity is sqltypes.Boolean: 

2290 # the NULL handling is particularly weird with boolean, so 

2291 # explicitly return numeric (BIT) constants 

2292 type_expression = ( 

2293 "WHEN 'true' THEN 1 WHEN 'false' THEN 0 ELSE NULL" 

2294 ) 

2295 elif binary.type._type_affinity is sqltypes.String: 

2296 # TODO: does this comment (from mysql) apply to here, too? 

2297 # this fails with a JSON value that's a four byte unicode 

2298 # string. SQLite has the same problem at the moment 

2299 type_expression = "ELSE JSON_VALUE(%s, %s)" % ( 

2300 self.process(binary.left, **kw), 

2301 self.process(binary.right, **kw), 

2302 ) 

2303 else: 

2304 # other affinity....this is not expected right now 

2305 type_expression = "ELSE JSON_QUERY(%s, %s)" % ( 

2306 self.process(binary.left, **kw), 

2307 self.process(binary.right, **kw), 

2308 ) 

2309 

2310 return case_expression + " " + type_expression + " END" 

2311 

2312 def visit_json_getitem_op_binary(self, binary, operator, **kw): 

2313 return self._render_json_extract_from_binary(binary, operator, **kw) 

2314 

2315 def visit_json_path_getitem_op_binary(self, binary, operator, **kw): 

2316 return self._render_json_extract_from_binary(binary, operator, **kw) 

2317 

2318 def visit_sequence(self, seq, **kw): 

2319 return "NEXT VALUE FOR %s" % self.preparer.format_sequence(seq) 

2320 

2321 

2322class MSSQLStrictCompiler(MSSQLCompiler): 

2323 

2324 """A subclass of MSSQLCompiler which disables the usage of bind 

2325 parameters where not allowed natively by MS-SQL. 

2326 

2327 A dialect may use this compiler on a platform where native 

2328 binds are used. 

2329 

2330 """ 

2331 

2332 ansi_bind_rules = True 

2333 

2334 def visit_in_op_binary(self, binary, operator, **kw): 

2335 kw["literal_execute"] = True 

2336 return "%s IN %s" % ( 

2337 self.process(binary.left, **kw), 

2338 self.process(binary.right, **kw), 

2339 ) 

2340 

2341 def visit_not_in_op_binary(self, binary, operator, **kw): 

2342 kw["literal_execute"] = True 

2343 return "%s NOT IN %s" % ( 

2344 self.process(binary.left, **kw), 

2345 self.process(binary.right, **kw), 

2346 ) 

2347 

2348 def render_literal_value(self, value, type_): 

2349 """ 

2350 For date and datetime values, convert to a string 

2351 format acceptable to MSSQL. That seems to be the 

2352 so-called ODBC canonical date format which looks 

2353 like this: 

2354 

2355 yyyy-mm-dd hh:mi:ss.mmm(24h) 

2356 

2357 For other data types, call the base class implementation. 

2358 """ 

2359 # datetime and date are both subclasses of datetime.date 

2360 if issubclass(type(value), datetime.date): 

2361 # SQL Server wants single quotes around the date string. 

2362 return "'" + str(value) + "'" 

2363 else: 

2364 return super(MSSQLStrictCompiler, self).render_literal_value( 

2365 value, type_ 

2366 ) 

2367 

2368 

2369class MSDDLCompiler(compiler.DDLCompiler): 

2370 def get_column_specification(self, column, **kwargs): 

2371 colspec = self.preparer.format_column(column) 

2372 

2373 # type is not accepted in a computed column 

2374 if column.computed is not None: 

2375 colspec += " " + self.process(column.computed) 

2376 else: 

2377 colspec += " " + self.dialect.type_compiler.process( 

2378 column.type, type_expression=column 

2379 ) 

2380 

2381 if column.nullable is not None: 

2382 if ( 

2383 not column.nullable 

2384 or column.primary_key 

2385 or isinstance(column.default, sa_schema.Sequence) 

2386 or column.autoincrement is True 

2387 or column.identity 

2388 ): 

2389 colspec += " NOT NULL" 

2390 elif column.computed is None: 

2391 # don't specify "NULL" for computed columns 

2392 colspec += " NULL" 

2393 

2394 if column.table is None: 

2395 raise exc.CompileError( 

2396 "mssql requires Table-bound columns " 

2397 "in order to generate DDL" 

2398 ) 

2399 

2400 d_opt = column.dialect_options["mssql"] 

2401 start = d_opt["identity_start"] 

2402 increment = d_opt["identity_increment"] 

2403 if start is not None or increment is not None: 

2404 if column.identity: 

2405 raise exc.CompileError( 

2406 "Cannot specify options 'mssql_identity_start' and/or " 

2407 "'mssql_identity_increment' while also using the " 

2408 "'Identity' construct." 

2409 ) 

2410 util.warn_deprecated( 

2411 "The dialect options 'mssql_identity_start' and " 

2412 "'mssql_identity_increment' are deprecated. " 

2413 "Use the 'Identity' object instead.", 

2414 "1.4", 

2415 ) 

2416 

2417 if column.identity: 

2418 colspec += self.process(column.identity, **kwargs) 

2419 elif ( 

2420 column is column.table._autoincrement_column 

2421 or column.autoincrement is True 

2422 ) and ( 

2423 not isinstance(column.default, Sequence) or column.default.optional 

2424 ): 

2425 colspec += self.process(Identity(start=start, increment=increment)) 

2426 else: 

2427 default = self.get_column_default_string(column) 

2428 if default is not None: 

2429 colspec += " DEFAULT " + default 

2430 

2431 return colspec 

2432 

2433 def visit_create_index(self, create, include_schema=False): 

2434 index = create.element 

2435 self._verify_index_table(index) 

2436 preparer = self.preparer 

2437 text = "CREATE " 

2438 if index.unique: 

2439 text += "UNIQUE " 

2440 

2441 # handle clustering option 

2442 clustered = index.dialect_options["mssql"]["clustered"] 

2443 if clustered is not None: 

2444 if clustered: 

2445 text += "CLUSTERED " 

2446 else: 

2447 text += "NONCLUSTERED " 

2448 

2449 text += "INDEX %s ON %s (%s)" % ( 

2450 self._prepared_index_name(index, include_schema=include_schema), 

2451 preparer.format_table(index.table), 

2452 ", ".join( 

2453 self.sql_compiler.process( 

2454 expr, include_table=False, literal_binds=True 

2455 ) 

2456 for expr in index.expressions 

2457 ), 

2458 ) 

2459 

2460 # handle other included columns 

2461 if index.dialect_options["mssql"]["include"]: 

2462 inclusions = [ 

2463 index.table.c[col] 

2464 if isinstance(col, util.string_types) 

2465 else col 

2466 for col in index.dialect_options["mssql"]["include"] 

2467 ] 

2468 

2469 text += " INCLUDE (%s)" % ", ".join( 

2470 [preparer.quote(c.name) for c in inclusions] 

2471 ) 

2472 

2473 whereclause = index.dialect_options["mssql"]["where"] 

2474 

2475 if whereclause is not None: 

2476 whereclause = coercions.expect( 

2477 roles.DDLExpressionRole, whereclause 

2478 ) 

2479 

2480 where_compiled = self.sql_compiler.process( 

2481 whereclause, include_table=False, literal_binds=True 

2482 ) 

2483 text += " WHERE " + where_compiled 

2484 

2485 return text 

2486 

2487 def visit_drop_index(self, drop): 

2488 return "\nDROP INDEX %s ON %s" % ( 

2489 self._prepared_index_name(drop.element, include_schema=False), 

2490 self.preparer.format_table(drop.element.table), 

2491 ) 

2492 

2493 def visit_primary_key_constraint(self, constraint): 

2494 if len(constraint) == 0: 

2495 return "" 

2496 text = "" 

2497 if constraint.name is not None: 

2498 text += "CONSTRAINT %s " % self.preparer.format_constraint( 

2499 constraint 

2500 ) 

2501 text += "PRIMARY KEY " 

2502 

2503 clustered = constraint.dialect_options["mssql"]["clustered"] 

2504 if clustered is not None: 

2505 if clustered: 

2506 text += "CLUSTERED " 

2507 else: 

2508 text += "NONCLUSTERED " 

2509 

2510 text += "(%s)" % ", ".join( 

2511 self.preparer.quote(c.name) for c in constraint 

2512 ) 

2513 text += self.define_constraint_deferrability(constraint) 

2514 return text 

2515 

2516 def visit_unique_constraint(self, constraint): 

2517 if len(constraint) == 0: 

2518 return "" 

2519 text = "" 

2520 if constraint.name is not None: 

2521 formatted_name = self.preparer.format_constraint(constraint) 

2522 if formatted_name is not None: 

2523 text += "CONSTRAINT %s " % formatted_name 

2524 text += "UNIQUE " 

2525 

2526 clustered = constraint.dialect_options["mssql"]["clustered"] 

2527 if clustered is not None: 

2528 if clustered: 

2529 text += "CLUSTERED " 

2530 else: 

2531 text += "NONCLUSTERED " 

2532 

2533 text += "(%s)" % ", ".join( 

2534 self.preparer.quote(c.name) for c in constraint 

2535 ) 

2536 text += self.define_constraint_deferrability(constraint) 

2537 return text 

2538 

2539 def visit_computed_column(self, generated): 

2540 text = "AS (%s)" % self.sql_compiler.process( 

2541 generated.sqltext, include_table=False, literal_binds=True 

2542 ) 

2543 # explicitly check for True|False since None means server default 

2544 if generated.persisted is True: 

2545 text += " PERSISTED" 

2546 return text 

2547 

2548 def visit_create_sequence(self, create, **kw): 

2549 prefix = None 

2550 if create.element.data_type is not None: 

2551 data_type = create.element.data_type 

2552 prefix = " AS %s" % self.type_compiler.process(data_type) 

2553 return super(MSDDLCompiler, self).visit_create_sequence( 

2554 create, prefix=prefix, **kw 

2555 ) 

2556 

2557 def visit_identity_column(self, identity, **kw): 

2558 text = " IDENTITY" 

2559 if identity.start is not None or identity.increment is not None: 

2560 start = 1 if identity.start is None else identity.start 

2561 increment = 1 if identity.increment is None else identity.increment 

2562 text += "(%s,%s)" % (start, increment) 

2563 return text 

2564 

2565 

2566class MSIdentifierPreparer(compiler.IdentifierPreparer): 

2567 reserved_words = RESERVED_WORDS 

2568 

2569 def __init__(self, dialect): 

2570 super(MSIdentifierPreparer, self).__init__( 

2571 dialect, 

2572 initial_quote="[", 

2573 final_quote="]", 

2574 quote_case_sensitive_collations=False, 

2575 ) 

2576 

2577 def _escape_identifier(self, value): 

2578 return value.replace("]", "]]") 

2579 

2580 def _unescape_identifier(self, value): 

2581 return value.replace("]]", "]") 

2582 

2583 def quote_schema(self, schema, force=None): 

2584 """Prepare a quoted table and schema name.""" 

2585 

2586 # need to re-implement the deprecation warning entirely 

2587 if force is not None: 

2588 # not using the util.deprecated_params() decorator in this 

2589 # case because of the additional function call overhead on this 

2590 # very performance-critical spot. 

2591 util.warn_deprecated( 

2592 "The IdentifierPreparer.quote_schema.force parameter is " 

2593 "deprecated and will be removed in a future release. This " 

2594 "flag has no effect on the behavior of the " 

2595 "IdentifierPreparer.quote method; please refer to " 

2596 "quoted_name().", 

2597 version="1.3", 

2598 ) 

2599 

2600 dbname, owner = _schema_elements(schema) 

2601 if dbname: 

2602 result = "%s.%s" % (self.quote(dbname), self.quote(owner)) 

2603 elif owner: 

2604 result = self.quote(owner) 

2605 else: 

2606 result = "" 

2607 return result 

2608 

2609 

2610def _db_plus_owner_listing(fn): 

2611 def wrap(dialect, connection, schema=None, **kw): 

2612 dbname, owner = _owner_plus_db(dialect, schema) 

2613 return _switch_db( 

2614 dbname, 

2615 connection, 

2616 fn, 

2617 dialect, 

2618 connection, 

2619 dbname, 

2620 owner, 

2621 schema, 

2622 **kw 

2623 ) 

2624 

2625 return update_wrapper(wrap, fn) 

2626 

2627 

2628def _db_plus_owner(fn): 

2629 def wrap(dialect, connection, tablename, schema=None, **kw): 

2630 dbname, owner = _owner_plus_db(dialect, schema) 

2631 return _switch_db( 

2632 dbname, 

2633 connection, 

2634 fn, 

2635 dialect, 

2636 connection, 

2637 tablename, 

2638 dbname, 

2639 owner, 

2640 schema, 

2641 **kw 

2642 ) 

2643 

2644 return update_wrapper(wrap, fn) 

2645 

2646 

2647def _switch_db(dbname, connection, fn, *arg, **kw): 

2648 if dbname: 

2649 current_db = connection.exec_driver_sql("select db_name()").scalar() 

2650 if current_db != dbname: 

2651 connection.exec_driver_sql( 

2652 "use %s" % connection.dialect.identifier_preparer.quote(dbname) 

2653 ) 

2654 try: 

2655 return fn(*arg, **kw) 

2656 finally: 

2657 if dbname and current_db != dbname: 

2658 connection.exec_driver_sql( 

2659 "use %s" 

2660 % connection.dialect.identifier_preparer.quote(current_db) 

2661 ) 

2662 

2663 

2664def _owner_plus_db(dialect, schema): 

2665 if not schema: 

2666 return None, dialect.default_schema_name 

2667 else: 

2668 return _schema_elements(schema) 

2669 

2670 

2671_memoized_schema = util.LRUCache() 

2672 

2673 

2674def _schema_elements(schema): 

2675 if isinstance(schema, quoted_name) and schema.quote: 

2676 return None, schema 

2677 

2678 if schema in _memoized_schema: 

2679 return _memoized_schema[schema] 

2680 

2681 # tests for this function are in: 

2682 # test/dialect/mssql/test_reflection.py -> 

2683 # OwnerPlusDBTest.test_owner_database_pairs 

2684 # test/dialect/mssql/test_compiler.py -> test_force_schema_* 

2685 # test/dialect/mssql/test_compiler.py -> test_schema_many_tokens_* 

2686 # 

2687 

2688 if schema.startswith("__[SCHEMA_"): 

2689 return None, schema 

2690 

2691 push = [] 

2692 symbol = "" 

2693 bracket = False 

2694 has_brackets = False 

2695 for token in re.split(r"(\[|\]|\.)", schema): 

2696 if not token: 

2697 continue 

2698 if token == "[": 

2699 bracket = True 

2700 has_brackets = True 

2701 elif token == "]": 

2702 bracket = False 

2703 elif not bracket and token == ".": 

2704 if has_brackets: 

2705 push.append("[%s]" % symbol) 

2706 else: 

2707 push.append(symbol) 

2708 symbol = "" 

2709 has_brackets = False 

2710 else: 

2711 symbol += token 

2712 if symbol: 

2713 push.append(symbol) 

2714 if len(push) > 1: 

2715 dbname, owner = ".".join(push[0:-1]), push[-1] 

2716 

2717 # test for internal brackets 

2718 if re.match(r".*\].*\[.*", dbname[1:-1]): 

2719 dbname = quoted_name(dbname, quote=False) 

2720 else: 

2721 dbname = dbname.lstrip("[").rstrip("]") 

2722 

2723 elif len(push): 

2724 dbname, owner = None, push[0] 

2725 else: 

2726 dbname, owner = None, None 

2727 

2728 _memoized_schema[schema] = dbname, owner 

2729 return dbname, owner 

2730 

2731 

2732class MSDialect(default.DefaultDialect): 

2733 # will assume it's at least mssql2005 

2734 name = "mssql" 

2735 supports_statement_cache = True 

2736 supports_default_values = True 

2737 supports_empty_insert = False 

2738 execution_ctx_cls = MSExecutionContext 

2739 use_scope_identity = True 

2740 max_identifier_length = 128 

2741 schema_name = "dbo" 

2742 

2743 implicit_returning = True 

2744 full_returning = True 

2745 

2746 colspecs = { 

2747 sqltypes.DateTime: _MSDateTime, 

2748 sqltypes.Date: _MSDate, 

2749 sqltypes.JSON: JSON, 

2750 sqltypes.JSON.JSONIndexType: JSONIndexType, 

2751 sqltypes.JSON.JSONPathType: JSONPathType, 

2752 sqltypes.Time: _BASETIMEIMPL, 

2753 sqltypes.Unicode: _MSUnicode, 

2754 sqltypes.UnicodeText: _MSUnicodeText, 

2755 DATETIMEOFFSET: DATETIMEOFFSET, 

2756 DATETIME2: DATETIME2, 

2757 SMALLDATETIME: SMALLDATETIME, 

2758 DATETIME: DATETIME, 

2759 } 

2760 

2761 engine_config_types = default.DefaultDialect.engine_config_types.union( 

2762 {"legacy_schema_aliasing": util.asbool} 

2763 ) 

2764 

2765 ischema_names = ischema_names 

2766 

2767 supports_sequences = True 

2768 sequences_optional = True 

2769 # T-SQL's actual default is -9223372036854775808 

2770 default_sequence_base = 1 

2771 

2772 supports_native_boolean = False 

2773 non_native_boolean_check_constraint = False 

2774 supports_unicode_binds = True 

2775 postfetch_lastrowid = True 

2776 _supports_offset_fetch = False 

2777 _supports_nvarchar_max = False 

2778 

2779 legacy_schema_aliasing = False 

2780 

2781 server_version_info = () 

2782 

2783 statement_compiler = MSSQLCompiler 

2784 ddl_compiler = MSDDLCompiler 

2785 type_compiler = MSTypeCompiler 

2786 preparer = MSIdentifierPreparer 

2787 

2788 construct_arguments = [ 

2789 (sa_schema.PrimaryKeyConstraint, {"clustered": None}), 

2790 (sa_schema.UniqueConstraint, {"clustered": None}), 

2791 (sa_schema.Index, {"clustered": None, "include": None, "where": None}), 

2792 ( 

2793 sa_schema.Column, 

2794 {"identity_start": None, "identity_increment": None}, 

2795 ), 

2796 ] 

2797 

2798 def __init__( 

2799 self, 

2800 query_timeout=None, 

2801 use_scope_identity=True, 

2802 schema_name="dbo", 

2803 isolation_level=None, 

2804 deprecate_large_types=None, 

2805 json_serializer=None, 

2806 json_deserializer=None, 

2807 legacy_schema_aliasing=None, 

2808 ignore_no_transaction_on_rollback=False, 

2809 **opts 

2810 ): 

2811 self.query_timeout = int(query_timeout or 0) 

2812 self.schema_name = schema_name 

2813 

2814 self.use_scope_identity = use_scope_identity 

2815 self.deprecate_large_types = deprecate_large_types 

2816 self.ignore_no_transaction_on_rollback = ( 

2817 ignore_no_transaction_on_rollback 

2818 ) 

2819 

2820 if legacy_schema_aliasing is not None: 

2821 util.warn_deprecated( 

2822 "The legacy_schema_aliasing parameter is " 

2823 "deprecated and will be removed in a future release.", 

2824 "1.4", 

2825 ) 

2826 self.legacy_schema_aliasing = legacy_schema_aliasing 

2827 

2828 super(MSDialect, self).__init__(**opts) 

2829 

2830 self.isolation_level = isolation_level 

2831 self._json_serializer = json_serializer 

2832 self._json_deserializer = json_deserializer 

2833 

2834 def do_savepoint(self, connection, name): 

2835 # give the DBAPI a push 

2836 connection.exec_driver_sql("IF @@TRANCOUNT = 0 BEGIN TRANSACTION") 

2837 super(MSDialect, self).do_savepoint(connection, name) 

2838 

2839 def do_release_savepoint(self, connection, name): 

2840 # SQL Server does not support RELEASE SAVEPOINT 

2841 pass 

2842 

2843 def do_rollback(self, dbapi_connection): 

2844 try: 

2845 super(MSDialect, self).do_rollback(dbapi_connection) 

2846 except self.dbapi.ProgrammingError as e: 

2847 if self.ignore_no_transaction_on_rollback and re.match( 

2848 r".*\b111214\b", str(e) 

2849 ): 

2850 util.warn( 

2851 "ProgrammingError 111214 " 

2852 "'No corresponding transaction found.' " 

2853 "has been suppressed via " 

2854 "ignore_no_transaction_on_rollback=True" 

2855 ) 

2856 else: 

2857 raise 

2858 

2859 _isolation_lookup = set( 

2860 [ 

2861 "SERIALIZABLE", 

2862 "READ UNCOMMITTED", 

2863 "READ COMMITTED", 

2864 "REPEATABLE READ", 

2865 "SNAPSHOT", 

2866 ] 

2867 ) 

2868 

2869 def set_isolation_level(self, connection, level): 

2870 level = level.replace("_", " ") 

2871 if level not in self._isolation_lookup: 

2872 raise exc.ArgumentError( 

2873 "Invalid value '%s' for isolation_level. " 

2874 "Valid isolation levels for %s are %s" 

2875 % (level, self.name, ", ".join(self._isolation_lookup)) 

2876 ) 

2877 cursor = connection.cursor() 

2878 cursor.execute("SET TRANSACTION ISOLATION LEVEL %s" % level) 

2879 cursor.close() 

2880 if level == "SNAPSHOT": 

2881 connection.commit() 

2882 

2883 def get_isolation_level(self, dbapi_connection): 

2884 cursor = dbapi_connection.cursor() 

2885 view_name = "sys.system_views" 

2886 try: 

2887 cursor.execute( 

2888 ( 

2889 "SELECT name FROM {} WHERE name IN " 

2890 "('dm_exec_sessions', 'dm_pdw_nodes_exec_sessions')" 

2891 ).format(view_name) 

2892 ) 

2893 row = cursor.fetchone() 

2894 if not row: 

2895 raise NotImplementedError( 

2896 "Can't fetch isolation level on this particular " 

2897 "SQL Server version." 

2898 ) 

2899 

2900 view_name = "sys.{}".format(row[0]) 

2901 

2902 cursor.execute( 

2903 """ 

2904 SELECT CASE transaction_isolation_level 

2905 WHEN 0 THEN NULL 

2906 WHEN 1 THEN 'READ UNCOMMITTED' 

2907 WHEN 2 THEN 'READ COMMITTED' 

2908 WHEN 3 THEN 'REPEATABLE READ' 

2909 WHEN 4 THEN 'SERIALIZABLE' 

2910 WHEN 5 THEN 'SNAPSHOT' END 

2911 AS TRANSACTION_ISOLATION_LEVEL 

2912 FROM {} 

2913 where session_id = @@SPID 

2914 """.format( 

2915 view_name 

2916 ) 

2917 ) 

2918 except self.dbapi.Error as err: 

2919 util.raise_( 

2920 NotImplementedError( 

2921 "Can't fetch isolation level; encountered error {} when " 

2922 'attempting to query the "{}" view.'.format(err, view_name) 

2923 ), 

2924 from_=err, 

2925 ) 

2926 else: 

2927 row = cursor.fetchone() 

2928 return row[0].upper() 

2929 finally: 

2930 cursor.close() 

2931 

2932 def initialize(self, connection): 

2933 super(MSDialect, self).initialize(connection) 

2934 self._setup_version_attributes() 

2935 self._setup_supports_nvarchar_max(connection) 

2936 

2937 def on_connect(self): 

2938 if self.isolation_level is not None: 

2939 

2940 def connect(conn): 

2941 self.set_isolation_level(conn, self.isolation_level) 

2942 

2943 return connect 

2944 else: 

2945 return None 

2946 

2947 def _setup_version_attributes(self): 

2948 if self.server_version_info[0] not in list(range(8, 17)): 

2949 util.warn( 

2950 "Unrecognized server version info '%s'. Some SQL Server " 

2951 "features may not function properly." 

2952 % ".".join(str(x) for x in self.server_version_info) 

2953 ) 

2954 

2955 if self.server_version_info >= MS_2008_VERSION: 

2956 self.supports_multivalues_insert = True 

2957 if self.deprecate_large_types is None: 

2958 self.deprecate_large_types = ( 

2959 self.server_version_info >= MS_2012_VERSION 

2960 ) 

2961 

2962 self._supports_offset_fetch = ( 

2963 self.server_version_info and self.server_version_info[0] >= 11 

2964 ) 

2965 

2966 def _setup_supports_nvarchar_max(self, connection): 

2967 try: 

2968 connection.scalar( 

2969 sql.text("SELECT CAST('test max support' AS NVARCHAR(max))") 

2970 ) 

2971 except exc.DBAPIError: 

2972 self._supports_nvarchar_max = False 

2973 else: 

2974 self._supports_nvarchar_max = True 

2975 

2976 def _get_default_schema_name(self, connection): 

2977 query = sql.text("SELECT schema_name()") 

2978 default_schema_name = connection.scalar(query) 

2979 if default_schema_name is not None: 

2980 # guard against the case where the default_schema_name is being 

2981 # fed back into a table reflection function. 

2982 return quoted_name(default_schema_name, quote=True) 

2983 else: 

2984 return self.schema_name 

2985 

2986 @_db_plus_owner 

2987 def has_table(self, connection, tablename, dbname, owner, schema): 

2988 self._ensure_has_table_connection(connection) 

2989 

2990 if tablename.startswith("#"): # temporary table 

2991 # mssql does not support temporary views 

2992 # SQL Error [4103] [S0001]: "#v": Temporary views are not allowed 

2993 return bool( 

2994 connection.scalar( 

2995 # U filters on user tables only. 

2996 text("SELECT object_id(:table_name, 'U')"), 

2997 {"table_name": "tempdb.dbo.[{}]".format(tablename)}, 

2998 ) 

2999 ) 

3000 

3001 else: 

3002 tables = ischema.tables 

3003 

3004 s = sql.select(tables.c.table_name, tables.c.table_type).where( 

3005 tables.c.table_name == tablename, 

3006 ) 

3007 

3008 if owner: 

3009 s = s.where(tables.c.table_schema == owner) 

3010 

3011 c = connection.execute(s) 

3012 

3013 return c.first() is not None 

3014 

3015 @_db_plus_owner 

3016 def has_sequence(self, connection, sequencename, dbname, owner, schema): 

3017 sequences = ischema.sequences 

3018 

3019 s = sql.select(sequences.c.sequence_name).where( 

3020 sequences.c.sequence_name == sequencename 

3021 ) 

3022 

3023 if owner: 

3024 s = s.where(sequences.c.sequence_schema == owner) 

3025 

3026 c = connection.execute(s) 

3027 

3028 return c.first() is not None 

3029 

3030 @reflection.cache 

3031 @_db_plus_owner_listing 

3032 def get_sequence_names(self, connection, dbname, owner, schema, **kw): 

3033 sequences = ischema.sequences 

3034 

3035 s = sql.select(sequences.c.sequence_name) 

3036 if owner: 

3037 s = s.where(sequences.c.sequence_schema == owner) 

3038 

3039 c = connection.execute(s) 

3040 

3041 return [row[0] for row in c] 

3042 

3043 @reflection.cache 

3044 def get_schema_names(self, connection, **kw): 

3045 s = sql.select(ischema.schemata.c.schema_name).order_by( 

3046 ischema.schemata.c.schema_name 

3047 ) 

3048 schema_names = [r[0] for r in connection.execute(s)] 

3049 return schema_names 

3050 

3051 @reflection.cache 

3052 @_db_plus_owner_listing 

3053 def get_table_names(self, connection, dbname, owner, schema, **kw): 

3054 tables = ischema.tables 

3055 s = ( 

3056 sql.select(tables.c.table_name) 

3057 .where( 

3058 sql.and_( 

3059 tables.c.table_schema == owner, 

3060 tables.c.table_type == "BASE TABLE", 

3061 ) 

3062 ) 

3063 .order_by(tables.c.table_name) 

3064 ) 

3065 table_names = [r[0] for r in connection.execute(s)] 

3066 return table_names 

3067 

3068 @reflection.cache 

3069 @_db_plus_owner_listing 

3070 def get_view_names(self, connection, dbname, owner, schema, **kw): 

3071 tables = ischema.tables 

3072 s = ( 

3073 sql.select(tables.c.table_name) 

3074 .where( 

3075 sql.and_( 

3076 tables.c.table_schema == owner, 

3077 tables.c.table_type == "VIEW", 

3078 ) 

3079 ) 

3080 .order_by(tables.c.table_name) 

3081 ) 

3082 view_names = [r[0] for r in connection.execute(s)] 

3083 return view_names 

3084 

3085 @reflection.cache 

3086 @_db_plus_owner 

3087 def get_indexes(self, connection, tablename, dbname, owner, schema, **kw): 

3088 filter_definition = ( 

3089 "ind.filter_definition" 

3090 if self.server_version_info >= MS_2008_VERSION 

3091 else "NULL as filter_definition" 

3092 ) 

3093 rp = connection.execution_options(future_result=True).execute( 

3094 sql.text( 

3095 "select ind.index_id, ind.is_unique, ind.name, " 

3096 "%s " 

3097 "from sys.indexes as ind join sys.tables as tab on " 

3098 "ind.object_id=tab.object_id " 

3099 "join sys.schemas as sch on sch.schema_id=tab.schema_id " 

3100 "where tab.name = :tabname " 

3101 "and sch.name=:schname " 

3102 "and ind.is_primary_key=0 and ind.type != 0" 

3103 % filter_definition 

3104 ) 

3105 .bindparams( 

3106 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()), 

3107 sql.bindparam("schname", owner, ischema.CoerceUnicode()), 

3108 ) 

3109 .columns(name=sqltypes.Unicode()) 

3110 ) 

3111 indexes = {} 

3112 for row in rp.mappings(): 

3113 indexes[row["index_id"]] = { 

3114 "name": row["name"], 

3115 "unique": row["is_unique"] == 1, 

3116 "column_names": [], 

3117 "include_columns": [], 

3118 } 

3119 

3120 if row["filter_definition"] is not None: 

3121 indexes[row["index_id"]].setdefault("dialect_options", {})[ 

3122 "mssql_where" 

3123 ] = row["filter_definition"] 

3124 

3125 rp = connection.execution_options(future_result=True).execute( 

3126 sql.text( 

3127 "select ind_col.index_id, ind_col.object_id, col.name, " 

3128 "ind_col.is_included_column " 

3129 "from sys.columns as col " 

3130 "join sys.tables as tab on tab.object_id=col.object_id " 

3131 "join sys.index_columns as ind_col on " 

3132 "(ind_col.column_id=col.column_id and " 

3133 "ind_col.object_id=tab.object_id) " 

3134 "join sys.schemas as sch on sch.schema_id=tab.schema_id " 

3135 "where tab.name=:tabname " 

3136 "and sch.name=:schname" 

3137 ) 

3138 .bindparams( 

3139 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()), 

3140 sql.bindparam("schname", owner, ischema.CoerceUnicode()), 

3141 ) 

3142 .columns(name=sqltypes.Unicode()) 

3143 ) 

3144 for row in rp.mappings(): 

3145 if row["index_id"] in indexes: 

3146 if row["is_included_column"]: 

3147 indexes[row["index_id"]]["include_columns"].append( 

3148 row["name"] 

3149 ) 

3150 else: 

3151 indexes[row["index_id"]]["column_names"].append( 

3152 row["name"] 

3153 ) 

3154 for index_info in indexes.values(): 

3155 # NOTE: "root level" include_columns is legacy, now part of 

3156 # dialect_options (issue #7382) 

3157 index_info.setdefault("dialect_options", {})[ 

3158 "mssql_include" 

3159 ] = index_info["include_columns"] 

3160 

3161 return list(indexes.values()) 

3162 

3163 @reflection.cache 

3164 @_db_plus_owner 

3165 def get_view_definition( 

3166 self, connection, viewname, dbname, owner, schema, **kw 

3167 ): 

3168 rp = connection.execute( 

3169 sql.text( 

3170 "select definition from sys.sql_modules as mod, " 

3171 "sys.views as views, " 

3172 "sys.schemas as sch" 

3173 " where " 

3174 "mod.object_id=views.object_id and " 

3175 "views.schema_id=sch.schema_id and " 

3176 "views.name=:viewname and sch.name=:schname" 

3177 ).bindparams( 

3178 sql.bindparam("viewname", viewname, ischema.CoerceUnicode()), 

3179 sql.bindparam("schname", owner, ischema.CoerceUnicode()), 

3180 ) 

3181 ) 

3182 

3183 if rp: 

3184 view_def = rp.scalar() 

3185 return view_def 

3186 

3187 def _temp_table_name_like_pattern(self, tablename): 

3188 # LIKE uses '%' to match zero or more characters and '_' to match any 

3189 # single character. We want to match literal underscores, so T-SQL 

3190 # requires that we enclose them in square brackets. 

3191 return tablename + ( 

3192 ("[_][_][_]%") if not tablename.startswith("##") else "" 

3193 ) 

3194 

3195 def _get_internal_temp_table_name(self, connection, tablename): 

3196 # it's likely that schema is always "dbo", but since we can 

3197 # get it here, let's get it. 

3198 # see https://stackoverflow.com/questions/8311959/ 

3199 # specifying-schema-for-temporary-tables 

3200 

3201 try: 

3202 return connection.execute( 

3203 sql.text( 

3204 "select table_schema, table_name " 

3205 "from tempdb.information_schema.tables " 

3206 "where table_name like :p1" 

3207 ), 

3208 {"p1": self._temp_table_name_like_pattern(tablename)}, 

3209 ).one() 

3210 except exc.MultipleResultsFound as me: 

3211 util.raise_( 

3212 exc.UnreflectableTableError( 

3213 "Found more than one temporary table named '%s' in tempdb " 

3214 "at this time. Cannot reliably resolve that name to its " 

3215 "internal table name." % tablename 

3216 ), 

3217 replace_context=me, 

3218 ) 

3219 except exc.NoResultFound as ne: 

3220 util.raise_( 

3221 exc.NoSuchTableError( 

3222 "Unable to find a temporary table named '%s' in tempdb." 

3223 % tablename 

3224 ), 

3225 replace_context=ne, 

3226 ) 

3227 

3228 @reflection.cache 

3229 @_db_plus_owner 

3230 def get_columns(self, connection, tablename, dbname, owner, schema, **kw): 

3231 is_temp_table = tablename.startswith("#") 

3232 if is_temp_table: 

3233 owner, tablename = self._get_internal_temp_table_name( 

3234 connection, tablename 

3235 ) 

3236 

3237 columns = ischema.mssql_temp_table_columns 

3238 else: 

3239 columns = ischema.columns 

3240 

3241 computed_cols = ischema.computed_columns 

3242 identity_cols = ischema.identity_columns 

3243 if owner: 

3244 whereclause = sql.and_( 

3245 columns.c.table_name == tablename, 

3246 columns.c.table_schema == owner, 

3247 ) 

3248 full_name = columns.c.table_schema + "." + columns.c.table_name 

3249 else: 

3250 whereclause = columns.c.table_name == tablename 

3251 full_name = columns.c.table_name 

3252 

3253 join = columns.join( 

3254 computed_cols, 

3255 onclause=sql.and_( 

3256 computed_cols.c.object_id == func.object_id(full_name), 

3257 computed_cols.c.name 

3258 == columns.c.column_name.collate("DATABASE_DEFAULT"), 

3259 ), 

3260 isouter=True, 

3261 ).join( 

3262 identity_cols, 

3263 onclause=sql.and_( 

3264 identity_cols.c.object_id == func.object_id(full_name), 

3265 identity_cols.c.name 

3266 == columns.c.column_name.collate("DATABASE_DEFAULT"), 

3267 ), 

3268 isouter=True, 

3269 ) 

3270 

3271 if self._supports_nvarchar_max: 

3272 computed_definition = computed_cols.c.definition 

3273 else: 

3274 # tds_version 4.2 does not support NVARCHAR(MAX) 

3275 computed_definition = sql.cast( 

3276 computed_cols.c.definition, NVARCHAR(4000) 

3277 ) 

3278 

3279 s = ( 

3280 sql.select( 

3281 columns, 

3282 computed_definition, 

3283 computed_cols.c.is_persisted, 

3284 identity_cols.c.is_identity, 

3285 identity_cols.c.seed_value, 

3286 identity_cols.c.increment_value, 

3287 ) 

3288 .where(whereclause) 

3289 .select_from(join) 

3290 .order_by(columns.c.ordinal_position) 

3291 ) 

3292 

3293 c = connection.execution_options(future_result=True).execute(s) 

3294 

3295 cols = [] 

3296 for row in c.mappings(): 

3297 name = row[columns.c.column_name] 

3298 type_ = row[columns.c.data_type] 

3299 nullable = row[columns.c.is_nullable] == "YES" 

3300 charlen = row[columns.c.character_maximum_length] 

3301 numericprec = row[columns.c.numeric_precision] 

3302 numericscale = row[columns.c.numeric_scale] 

3303 default = row[columns.c.column_default] 

3304 collation = row[columns.c.collation_name] 

3305 definition = row[computed_definition] 

3306 is_persisted = row[computed_cols.c.is_persisted] 

3307 is_identity = row[identity_cols.c.is_identity] 

3308 identity_start = row[identity_cols.c.seed_value] 

3309 identity_increment = row[identity_cols.c.increment_value] 

3310 

3311 coltype = self.ischema_names.get(type_, None) 

3312 

3313 kwargs = {} 

3314 if coltype in ( 

3315 MSString, 

3316 MSChar, 

3317 MSNVarchar, 

3318 MSNChar, 

3319 MSText, 

3320 MSNText, 

3321 MSBinary, 

3322 MSVarBinary, 

3323 sqltypes.LargeBinary, 

3324 ): 

3325 if charlen == -1: 

3326 charlen = None 

3327 kwargs["length"] = charlen 

3328 if collation: 

3329 kwargs["collation"] = collation 

3330 

3331 if coltype is None: 

3332 util.warn( 

3333 "Did not recognize type '%s' of column '%s'" 

3334 % (type_, name) 

3335 ) 

3336 coltype = sqltypes.NULLTYPE 

3337 else: 

3338 if issubclass(coltype, sqltypes.Numeric): 

3339 kwargs["precision"] = numericprec 

3340 

3341 if not issubclass(coltype, sqltypes.Float): 

3342 kwargs["scale"] = numericscale 

3343 

3344 coltype = coltype(**kwargs) 

3345 cdict = { 

3346 "name": name, 

3347 "type": coltype, 

3348 "nullable": nullable, 

3349 "default": default, 

3350 "autoincrement": is_identity is not None, 

3351 } 

3352 

3353 if definition is not None and is_persisted is not None: 

3354 cdict["computed"] = { 

3355 "sqltext": definition, 

3356 "persisted": is_persisted, 

3357 } 

3358 

3359 if is_identity is not None: 

3360 # identity_start and identity_increment are Decimal or None 

3361 if identity_start is None or identity_increment is None: 

3362 cdict["identity"] = {} 

3363 else: 

3364 if isinstance(coltype, sqltypes.BigInteger): 

3365 start = compat.long_type(identity_start) 

3366 increment = compat.long_type(identity_increment) 

3367 elif isinstance(coltype, sqltypes.Integer): 

3368 start = int(identity_start) 

3369 increment = int(identity_increment) 

3370 else: 

3371 start = identity_start 

3372 increment = identity_increment 

3373 

3374 cdict["identity"] = { 

3375 "start": start, 

3376 "increment": increment, 

3377 } 

3378 

3379 cols.append(cdict) 

3380 

3381 return cols 

3382 

3383 @reflection.cache 

3384 @_db_plus_owner 

3385 def get_pk_constraint( 

3386 self, connection, tablename, dbname, owner, schema, **kw 

3387 ): 

3388 pkeys = [] 

3389 TC = ischema.constraints 

3390 C = ischema.key_constraints.alias("C") 

3391 

3392 # Primary key constraints 

3393 s = ( 

3394 sql.select( 

3395 C.c.column_name, TC.c.constraint_type, C.c.constraint_name 

3396 ) 

3397 .where( 

3398 sql.and_( 

3399 TC.c.constraint_name == C.c.constraint_name, 

3400 TC.c.table_schema == C.c.table_schema, 

3401 C.c.table_name == tablename, 

3402 C.c.table_schema == owner, 

3403 ), 

3404 ) 

3405 .order_by(TC.c.constraint_name, C.c.ordinal_position) 

3406 ) 

3407 c = connection.execution_options(future_result=True).execute(s) 

3408 constraint_name = None 

3409 for row in c.mappings(): 

3410 if "PRIMARY" in row[TC.c.constraint_type.name]: 

3411 pkeys.append(row["COLUMN_NAME"]) 

3412 if constraint_name is None: 

3413 constraint_name = row[C.c.constraint_name.name] 

3414 return {"constrained_columns": pkeys, "name": constraint_name} 

3415 

3416 @reflection.cache 

3417 @_db_plus_owner 

3418 def get_foreign_keys( 

3419 self, connection, tablename, dbname, owner, schema, **kw 

3420 ): 

3421 # Foreign key constraints 

3422 s = ( 

3423 text( 

3424 """\ 

3425WITH fk_info AS ( 

3426 SELECT 

3427 ischema_ref_con.constraint_schema, 

3428 ischema_ref_con.constraint_name, 

3429 ischema_key_col.ordinal_position, 

3430 ischema_key_col.table_schema, 

3431 ischema_key_col.table_name, 

3432 ischema_ref_con.unique_constraint_schema, 

3433 ischema_ref_con.unique_constraint_name, 

3434 ischema_ref_con.match_option, 

3435 ischema_ref_con.update_rule, 

3436 ischema_ref_con.delete_rule, 

3437 ischema_key_col.column_name AS constrained_column 

3438 FROM 

3439 INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS ischema_ref_con 

3440 INNER JOIN 

3441 INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col ON 

3442 ischema_key_col.table_schema = ischema_ref_con.constraint_schema 

3443 AND ischema_key_col.constraint_name = 

3444 ischema_ref_con.constraint_name 

3445 WHERE ischema_key_col.table_name = :tablename 

3446 AND ischema_key_col.table_schema = :owner 

3447), 

3448constraint_info AS ( 

3449 SELECT 

3450 ischema_key_col.constraint_schema, 

3451 ischema_key_col.constraint_name, 

3452 ischema_key_col.ordinal_position, 

3453 ischema_key_col.table_schema, 

3454 ischema_key_col.table_name, 

3455 ischema_key_col.column_name 

3456 FROM 

3457 INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col 

3458), 

3459index_info AS ( 

3460 SELECT 

3461 sys.schemas.name AS index_schema, 

3462 sys.indexes.name AS index_name, 

3463 sys.index_columns.key_ordinal AS ordinal_position, 

3464 sys.schemas.name AS table_schema, 

3465 sys.objects.name AS table_name, 

3466 sys.columns.name AS column_name 

3467 FROM 

3468 sys.indexes 

3469 INNER JOIN 

3470 sys.objects ON 

3471 sys.objects.object_id = sys.indexes.object_id 

3472 INNER JOIN 

3473 sys.schemas ON 

3474 sys.schemas.schema_id = sys.objects.schema_id 

3475 INNER JOIN 

3476 sys.index_columns ON 

3477 sys.index_columns.object_id = sys.objects.object_id 

3478 AND sys.index_columns.index_id = sys.indexes.index_id 

3479 INNER JOIN 

3480 sys.columns ON 

3481 sys.columns.object_id = sys.indexes.object_id 

3482 AND sys.columns.column_id = sys.index_columns.column_id 

3483) 

3484 SELECT 

3485 fk_info.constraint_schema, 

3486 fk_info.constraint_name, 

3487 fk_info.ordinal_position, 

3488 fk_info.constrained_column, 

3489 constraint_info.table_schema AS referred_table_schema, 

3490 constraint_info.table_name AS referred_table_name, 

3491 constraint_info.column_name AS referred_column, 

3492 fk_info.match_option, 

3493 fk_info.update_rule, 

3494 fk_info.delete_rule 

3495 FROM 

3496 fk_info INNER JOIN constraint_info ON 

3497 constraint_info.constraint_schema = 

3498 fk_info.unique_constraint_schema 

3499 AND constraint_info.constraint_name = 

3500 fk_info.unique_constraint_name 

3501 AND constraint_info.ordinal_position = fk_info.ordinal_position 

3502 UNION 

3503 SELECT 

3504 fk_info.constraint_schema, 

3505 fk_info.constraint_name, 

3506 fk_info.ordinal_position, 

3507 fk_info.constrained_column, 

3508 index_info.table_schema AS referred_table_schema, 

3509 index_info.table_name AS referred_table_name, 

3510 index_info.column_name AS referred_column, 

3511 fk_info.match_option, 

3512 fk_info.update_rule, 

3513 fk_info.delete_rule 

3514 FROM 

3515 fk_info INNER JOIN index_info ON 

3516 index_info.index_schema = fk_info.unique_constraint_schema 

3517 AND index_info.index_name = fk_info.unique_constraint_name 

3518 AND index_info.ordinal_position = fk_info.ordinal_position 

3519 

3520 ORDER BY fk_info.constraint_schema, fk_info.constraint_name, 

3521 fk_info.ordinal_position 

3522""" 

3523 ) 

3524 .bindparams( 

3525 sql.bindparam("tablename", tablename, ischema.CoerceUnicode()), 

3526 sql.bindparam("owner", owner, ischema.CoerceUnicode()), 

3527 ) 

3528 .columns( 

3529 constraint_schema=sqltypes.Unicode(), 

3530 constraint_name=sqltypes.Unicode(), 

3531 table_schema=sqltypes.Unicode(), 

3532 table_name=sqltypes.Unicode(), 

3533 constrained_column=sqltypes.Unicode(), 

3534 referred_table_schema=sqltypes.Unicode(), 

3535 referred_table_name=sqltypes.Unicode(), 

3536 referred_column=sqltypes.Unicode(), 

3537 ) 

3538 ) 

3539 

3540 # group rows by constraint ID, to handle multi-column FKs 

3541 fkeys = [] 

3542 

3543 def fkey_rec(): 

3544 return { 

3545 "name": None, 

3546 "constrained_columns": [], 

3547 "referred_schema": None, 

3548 "referred_table": None, 

3549 "referred_columns": [], 

3550 "options": {}, 

3551 } 

3552 

3553 fkeys = util.defaultdict(fkey_rec) 

3554 

3555 for r in connection.execute(s).fetchall(): 

3556 ( 

3557 _, # constraint schema 

3558 rfknm, 

3559 _, # ordinal position 

3560 scol, 

3561 rschema, 

3562 rtbl, 

3563 rcol, 

3564 # TODO: we support match=<keyword> for foreign keys so 

3565 # we can support this also, PG has match=FULL for example 

3566 # but this seems to not be a valid value for SQL Server 

3567 _, # match rule 

3568 fkuprule, 

3569 fkdelrule, 

3570 ) = r 

3571 

3572 rec = fkeys[rfknm] 

3573 rec["name"] = rfknm 

3574 

3575 if fkuprule != "NO ACTION": 

3576 rec["options"]["onupdate"] = fkuprule 

3577 

3578 if fkdelrule != "NO ACTION": 

3579 rec["options"]["ondelete"] = fkdelrule 

3580 

3581 if not rec["referred_table"]: 

3582 rec["referred_table"] = rtbl 

3583 if schema is not None or owner != rschema: 

3584 if dbname: 

3585 rschema = dbname + "." + rschema 

3586 rec["referred_schema"] = rschema 

3587 

3588 local_cols, remote_cols = ( 

3589 rec["constrained_columns"], 

3590 rec["referred_columns"], 

3591 ) 

3592 

3593 local_cols.append(scol) 

3594 remote_cols.append(rcol) 

3595 

3596 return list(fkeys.values())