1# dialects/mssql/pyodbc.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9r"""
10.. dialect:: mssql+pyodbc
11 :name: PyODBC
12 :dbapi: pyodbc
13 :connectstring: mssql+pyodbc://<username>:<password>@<dsnname>
14 :url: https://pypi.org/project/pyodbc/
15
16Connecting to PyODBC
17--------------------
18
19The URL here is to be translated to PyODBC connection strings, as
20detailed in `ConnectionStrings <https://code.google.com/p/pyodbc/wiki/ConnectionStrings>`_.
21
22DSN Connections
23^^^^^^^^^^^^^^^
24
25A DSN connection in ODBC means that a pre-existing ODBC datasource is
26configured on the client machine. The application then specifies the name
27of this datasource, which encompasses details such as the specific ODBC driver
28in use as well as the network address of the database. Assuming a datasource
29is configured on the client, a basic DSN-based connection looks like::
30
31 engine = create_engine("mssql+pyodbc://scott:tiger@some_dsn")
32
33Which above, will pass the following connection string to PyODBC:
34
35.. sourcecode:: text
36
37 DSN=some_dsn;UID=scott;PWD=tiger
38
39If the username and password are omitted, the DSN form will also add
40the ``Trusted_Connection=yes`` directive to the ODBC string.
41
42Hostname Connections
43^^^^^^^^^^^^^^^^^^^^
44
45Hostname-based connections are also supported by pyodbc. These are often
46easier to use than a DSN and have the additional advantage that the specific
47database name to connect towards may be specified locally in the URL, rather
48than it being fixed as part of a datasource configuration.
49
50When using a hostname connection, the driver name must also be specified in the
51query parameters of the URL. As these names usually have spaces in them, the
52name must be URL encoded which means using plus signs for spaces::
53
54 engine = create_engine(
55 "mssql+pyodbc://scott:tiger@myhost:port/databasename?driver=ODBC+Driver+17+for+SQL+Server"
56 )
57
58The ``driver`` keyword is significant to the pyodbc dialect and must be
59specified in lowercase.
60
61Any other names passed in the query string are passed through in the pyodbc
62connect string, such as ``authentication``, ``TrustServerCertificate``, etc.
63Multiple keyword arguments must be separated by an ampersand (``&``); these
64will be translated to semicolons when the pyodbc connect string is generated
65internally::
66
67 e = create_engine(
68 "mssql+pyodbc://scott:tiger@mssql2017:1433/test?"
69 "driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes"
70 "&authentication=ActiveDirectoryIntegrated"
71 )
72
73The equivalent URL can be constructed using :class:`_sa.engine.URL`::
74
75 from sqlalchemy.engine import URL
76
77 connection_url = URL.create(
78 "mssql+pyodbc",
79 username="scott",
80 password="tiger",
81 host="mssql2017",
82 port=1433,
83 database="test",
84 query={
85 "driver": "ODBC Driver 18 for SQL Server",
86 "TrustServerCertificate": "yes",
87 "authentication": "ActiveDirectoryIntegrated",
88 },
89 )
90
91Pass through exact Pyodbc string
92^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
93
94A PyODBC connection string can also be sent in pyodbc's format directly, as
95specified in `the PyODBC documentation
96<https://github.com/mkleehammer/pyodbc/wiki/Connecting-to-databases>`_,
97using the parameter ``odbc_connect``. A :class:`_sa.engine.URL` object
98can help make this easier::
99
100 from sqlalchemy.engine import URL
101
102 connection_string = "DRIVER={SQL Server Native Client 10.0};SERVER=dagger;DATABASE=test;UID=user;PWD=password"
103 connection_url = URL.create(
104 "mssql+pyodbc", query={"odbc_connect": connection_string}
105 )
106
107 engine = create_engine(connection_url)
108
109.. _mssql_pyodbc_access_tokens:
110
111Connecting to databases with access tokens
112^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
113
114Some database servers are set up to only accept access tokens for login. For
115example, SQL Server allows the use of Azure Active Directory tokens to connect
116to databases. This requires creating a credential object using the
117``azure-identity`` library. More information about the authentication step can be
118found in `Microsoft's documentation
119<https://docs.microsoft.com/en-us/azure/developer/python/azure-sdk-authenticate?tabs=bash>`_.
120
121After getting an engine, the credentials need to be sent to ``pyodbc.connect``
122each time a connection is requested. One way to do this is to set up an event
123listener on the engine that adds the credential token to the dialect's connect
124call. This is discussed more generally in :ref:`engines_dynamic_tokens`. For
125SQL Server in particular, this is passed as an ODBC connection attribute with
126a data structure `described by Microsoft
127<https://docs.microsoft.com/en-us/sql/connect/odbc/using-azure-active-directory#authenticating-with-an-access-token>`_.
128
129The following code snippet will create an engine that connects to an Azure SQL
130database using Azure credentials::
131
132 import struct
133 from sqlalchemy import create_engine, event
134 from sqlalchemy.engine.url import URL
135 from azure import identity
136
137 # Connection option for access tokens, as defined in msodbcsql.h
138 SQL_COPT_SS_ACCESS_TOKEN = 1256
139 TOKEN_URL = "https://database.windows.net/" # The token URL for any Azure SQL database
140
141 connection_string = "mssql+pyodbc://@my-server.database.windows.net/myDb?driver=ODBC+Driver+17+for+SQL+Server"
142
143 engine = create_engine(connection_string)
144
145 azure_credentials = identity.DefaultAzureCredential()
146
147
148 @event.listens_for(engine, "do_connect")
149 def provide_token(dialect, conn_rec, cargs, cparams):
150 # remove the "Trusted_Connection" parameter that SQLAlchemy adds
151 cargs[0] = cargs[0].replace(";Trusted_Connection=Yes", "")
152
153 # create token credential
154 raw_token = azure_credentials.get_token(TOKEN_URL).token.encode(
155 "utf-16-le"
156 )
157 token_struct = struct.pack(
158 f"<I{len(raw_token)}s", len(raw_token), raw_token
159 )
160
161 # apply it to keyword arguments
162 cparams["attrs_before"] = {SQL_COPT_SS_ACCESS_TOKEN: token_struct}
163
164.. tip::
165
166 The ``Trusted_Connection`` token is currently added by the SQLAlchemy
167 pyodbc dialect when no username or password is present. This needs
168 to be removed per Microsoft's
169 `documentation for Azure access tokens
170 <https://docs.microsoft.com/en-us/sql/connect/odbc/using-azure-active-directory#authenticating-with-an-access-token>`_,
171 stating that a connection string when using an access token must not contain
172 ``UID``, ``PWD``, ``Authentication`` or ``Trusted_Connection`` parameters.
173
174.. _azure_synapse_ignore_no_transaction_on_rollback:
175
176Avoiding transaction-related exceptions on Azure Synapse Analytics
177^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
178
179Azure Synapse Analytics has a significant difference in its transaction
180handling compared to plain SQL Server; in some cases an error within a Synapse
181transaction can cause it to be arbitrarily terminated on the server side, which
182then causes the DBAPI ``.rollback()`` method (as well as ``.commit()``) to
183fail. The issue prevents the usual DBAPI contract of allowing ``.rollback()``
184to pass silently if no transaction is present as the driver does not expect
185this condition. The symptom of this failure is an exception with a message
186resembling 'No corresponding transaction found. (111214)' when attempting to
187emit a ``.rollback()`` after an operation had a failure of some kind.
188
189This specific case can be handled by passing ``ignore_no_transaction_on_rollback=True`` to
190the SQL Server dialect via the :func:`_sa.create_engine` function as follows::
191
192 engine = create_engine(
193 connection_url, ignore_no_transaction_on_rollback=True
194 )
195
196Using the above parameter, the dialect will catch ``ProgrammingError``
197exceptions raised during ``connection.rollback()`` and emit a warning
198if the error message contains code ``111214``, however will not raise
199an exception.
200
201.. versionadded:: 1.4.40 Added the
202 ``ignore_no_transaction_on_rollback=True`` parameter.
203
204Enable autocommit for Azure SQL Data Warehouse (DW) connections
205^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
206
207Azure SQL Data Warehouse does not support transactions,
208and that can cause problems with SQLAlchemy's "autobegin" (and implicit
209commit/rollback) behavior. We can avoid these problems by enabling autocommit
210at both the pyodbc and engine levels::
211
212 connection_url = sa.engine.URL.create(
213 "mssql+pyodbc",
214 username="scott",
215 password="tiger",
216 host="dw.azure.example.com",
217 database="mydb",
218 query={
219 "driver": "ODBC Driver 17 for SQL Server",
220 "autocommit": "True",
221 },
222 )
223
224 engine = create_engine(connection_url).execution_options(
225 isolation_level="AUTOCOMMIT"
226 )
227
228Avoiding sending large string parameters as TEXT/NTEXT
229^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
230
231By default, for historical reasons, Microsoft's ODBC drivers for SQL Server
232send long string parameters (greater than 4000 SBCS characters or 2000 Unicode
233characters) as TEXT/NTEXT values. TEXT and NTEXT have been deprecated for many
234years and are starting to cause compatibility issues with newer versions of
235SQL_Server/Azure. For example, see `this
236issue <https://github.com/mkleehammer/pyodbc/issues/835>`_.
237
238Starting with ODBC Driver 18 for SQL Server we can override the legacy
239behavior and pass long strings as varchar(max)/nvarchar(max) using the
240``LongAsMax=Yes`` connection string parameter::
241
242 connection_url = sa.engine.URL.create(
243 "mssql+pyodbc",
244 username="scott",
245 password="tiger",
246 host="mssqlserver.example.com",
247 database="mydb",
248 query={
249 "driver": "ODBC Driver 18 for SQL Server",
250 "LongAsMax": "Yes",
251 },
252 )
253
254Pyodbc Pooling / connection close behavior
255------------------------------------------
256
257PyODBC uses internal `pooling
258<https://github.com/mkleehammer/pyodbc/wiki/The-pyodbc-Module#pooling>`_ by
259default, which means connections will be longer lived than they are within
260SQLAlchemy itself. As SQLAlchemy has its own pooling behavior, it is often
261preferable to disable this behavior. This behavior can only be disabled
262globally at the PyODBC module level, **before** any connections are made::
263
264 import pyodbc
265
266 pyodbc.pooling = False
267
268 # don't use the engine before pooling is set to False
269 engine = create_engine("mssql+pyodbc://user:pass@dsn")
270
271If this variable is left at its default value of ``True``, **the application
272will continue to maintain active database connections**, even when the
273SQLAlchemy engine itself fully discards a connection or if the engine is
274disposed.
275
276.. seealso::
277
278 `pooling <https://github.com/mkleehammer/pyodbc/wiki/The-pyodbc-Module#pooling>`_ -
279 in the PyODBC documentation.
280
281Driver / Unicode Support
282-------------------------
283
284PyODBC works best with Microsoft ODBC drivers, particularly in the area
285of Unicode support on both Python 2 and Python 3.
286
287Using the FreeTDS ODBC drivers on Linux or OSX with PyODBC is **not**
288recommended; there have been historically many Unicode-related issues
289in this area, including before Microsoft offered ODBC drivers for Linux
290and OSX. Now that Microsoft offers drivers for all platforms, for
291PyODBC support these are recommended. FreeTDS remains relevant for
292non-ODBC drivers such as pymssql where it works very well.
293
294
295Rowcount Support
296----------------
297
298Previous limitations with the SQLAlchemy ORM's "versioned rows" feature with
299Pyodbc have been resolved as of SQLAlchemy 2.0.5. See the notes at
300:ref:`mssql_rowcount_versioning`.
301
302.. _mssql_pyodbc_fastexecutemany:
303
304Fast Executemany Mode
305---------------------
306
307The PyODBC driver includes support for a "fast executemany" mode of execution
308which greatly reduces round trips for a DBAPI ``executemany()`` call when using
309Microsoft ODBC drivers, for **limited size batches that fit in memory**. The
310feature is enabled by setting the attribute ``.fast_executemany`` on the DBAPI
311cursor when an executemany call is to be used. The SQLAlchemy PyODBC SQL
312Server dialect supports this parameter by passing the
313``fast_executemany`` parameter to
314:func:`_sa.create_engine` , when using the **Microsoft ODBC driver only**::
315
316 engine = create_engine(
317 "mssql+pyodbc://scott:tiger@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server",
318 fast_executemany=True,
319 )
320
321.. versionchanged:: 2.0.9 - the ``fast_executemany`` parameter now has its
322 intended effect of this PyODBC feature taking effect for all INSERT
323 statements that are executed with multiple parameter sets, which don't
324 include RETURNING. Previously, SQLAlchemy 2.0's :term:`insertmanyvalues`
325 feature would cause ``fast_executemany`` to not be used in most cases
326 even if specified.
327
328.. versionadded:: 1.3
329
330.. seealso::
331
332 `fast executemany <https://github.com/mkleehammer/pyodbc/wiki/Features-beyond-the-DB-API#fast_executemany>`_
333 - on github
334
335.. _mssql_pyodbc_setinputsizes:
336
337Setinputsizes Support
338-----------------------
339
340As of version 2.0, the pyodbc ``cursor.setinputsizes()`` method is used for
341all statement executions, except for ``cursor.executemany()`` calls when
342fast_executemany=True where it is not supported (assuming
343:ref:`insertmanyvalues <engine_insertmanyvalues>` is kept enabled,
344"fastexecutemany" will not take place for INSERT statements in any case).
345
346The use of ``cursor.setinputsizes()`` can be disabled by passing
347``use_setinputsizes=False`` to :func:`_sa.create_engine`.
348
349When ``use_setinputsizes`` is left at its default of ``True``, the
350specific per-type symbols passed to ``cursor.setinputsizes()`` can be
351programmatically customized using the :meth:`.DialectEvents.do_setinputsizes`
352hook. See that method for usage examples.
353
354.. versionchanged:: 2.0 The mssql+pyodbc dialect now defaults to using
355 ``use_setinputsizes=True`` for all statement executions with the exception of
356 cursor.executemany() calls when fast_executemany=True. The behavior can
357 be turned off by passing ``use_setinputsizes=False`` to
358 :func:`_sa.create_engine`.
359
360""" # noqa
361
362import datetime
363import decimal
364import re
365import struct
366
367from .base import _MSDateTime
368from .base import _MSUnicode
369from .base import _MSUnicodeText
370from .base import BINARY
371from .base import DATETIMEOFFSET
372from .base import MSDialect
373from .base import MSExecutionContext
374from .base import VARBINARY
375from .json import JSON as _MSJson
376from .json import JSONIndexType as _MSJsonIndexType
377from .json import JSONPathType as _MSJsonPathType
378from ... import exc
379from ... import types as sqltypes
380from ... import util
381from ...connectors.pyodbc import PyODBCConnector
382from ...engine import cursor as _cursor
383
384
385class _ms_numeric_pyodbc:
386 """Turns Decimals with adjusted() < 0 or > 7 into strings.
387
388 The routines here are needed for older pyodbc versions
389 as well as current mxODBC versions.
390
391 """
392
393 def bind_processor(self, dialect):
394 super_process = super().bind_processor(dialect)
395
396 if not dialect._need_decimal_fix:
397 return super_process
398
399 def process(value):
400 if self.asdecimal and isinstance(value, decimal.Decimal):
401 adjusted = value.adjusted()
402 if adjusted < 0:
403 return self._small_dec_to_string(value)
404 elif adjusted > 7:
405 return self._large_dec_to_string(value)
406
407 if super_process:
408 return super_process(value)
409 else:
410 return value
411
412 return process
413
414 # these routines needed for older versions of pyodbc.
415 # as of 2.1.8 this logic is integrated.
416
417 def _small_dec_to_string(self, value):
418 return "%s0.%s%s" % (
419 (value < 0 and "-" or ""),
420 "0" * (abs(value.adjusted()) - 1),
421 "".join([str(nint) for nint in value.as_tuple()[1]]),
422 )
423
424 def _large_dec_to_string(self, value):
425 _int = value.as_tuple()[1]
426 if "E" in str(value):
427 result = "%s%s%s" % (
428 (value < 0 and "-" or ""),
429 "".join([str(s) for s in _int]),
430 "0" * (value.adjusted() - (len(_int) - 1)),
431 )
432 else:
433 if (len(_int) - 1) > value.adjusted():
434 result = "%s%s.%s" % (
435 (value < 0 and "-" or ""),
436 "".join([str(s) for s in _int][0 : value.adjusted() + 1]),
437 "".join([str(s) for s in _int][value.adjusted() + 1 :]),
438 )
439 else:
440 result = "%s%s" % (
441 (value < 0 and "-" or ""),
442 "".join([str(s) for s in _int][0 : value.adjusted() + 1]),
443 )
444 return result
445
446
447class _MSNumeric_pyodbc(_ms_numeric_pyodbc, sqltypes.Numeric):
448 pass
449
450
451class _MSFloat_pyodbc(_ms_numeric_pyodbc, sqltypes.Float):
452 pass
453
454
455class _ms_binary_pyodbc:
456 """Wraps binary values in dialect-specific Binary wrapper.
457 If the value is null, return a pyodbc-specific BinaryNull
458 object to prevent pyODBC [and FreeTDS] from defaulting binary
459 NULL types to SQLWCHAR and causing implicit conversion errors.
460 """
461
462 def bind_processor(self, dialect):
463 if dialect.dbapi is None:
464 return None
465
466 DBAPIBinary = dialect.dbapi.Binary
467
468 def process(value):
469 if value is not None:
470 return DBAPIBinary(value)
471 else:
472 # pyodbc-specific
473 return dialect.dbapi.BinaryNull
474
475 return process
476
477
478class _ODBCDateTimeBindProcessor:
479 """Add bind processors to handle datetimeoffset behaviors"""
480
481 has_tz = False
482
483 def bind_processor(self, dialect):
484 def process(value):
485 if value is None:
486 return None
487 elif isinstance(value, str):
488 # if a string was passed directly, allow it through
489 return value
490 elif not value.tzinfo or (not self.timezone and not self.has_tz):
491 # for DateTime(timezone=False)
492 return value
493 else:
494 # for DATETIMEOFFSET or DateTime(timezone=True)
495 #
496 # Convert to string format required by T-SQL
497 dto_string = value.strftime("%Y-%m-%d %H:%M:%S.%f %z")
498 # offset needs a colon, e.g., -0700 -> -07:00
499 # "UTC offset in the form (+-)HHMM[SS[.ffffff]]"
500 # backend currently rejects seconds / fractional seconds
501 dto_string = re.sub(
502 r"([\+\-]\d{2})([\d\.]+)$", r"\1:\2", dto_string
503 )
504 return dto_string
505
506 return process
507
508
509class _ODBCDateTime(_ODBCDateTimeBindProcessor, _MSDateTime):
510 pass
511
512
513class _ODBCDATETIMEOFFSET(_ODBCDateTimeBindProcessor, DATETIMEOFFSET):
514 has_tz = True
515
516
517class _VARBINARY_pyodbc(_ms_binary_pyodbc, VARBINARY):
518 pass
519
520
521class _BINARY_pyodbc(_ms_binary_pyodbc, BINARY):
522 pass
523
524
525class _String_pyodbc(sqltypes.String):
526 def get_dbapi_type(self, dbapi):
527 if self.length in (None, "max") or self.length >= 2000:
528 return (dbapi.SQL_VARCHAR, 0, 0)
529 else:
530 return dbapi.SQL_VARCHAR
531
532
533class _Unicode_pyodbc(_MSUnicode):
534 def get_dbapi_type(self, dbapi):
535 if self.length in (None, "max") or self.length >= 2000:
536 return (dbapi.SQL_WVARCHAR, 0, 0)
537 else:
538 return dbapi.SQL_WVARCHAR
539
540
541class _UnicodeText_pyodbc(_MSUnicodeText):
542 def get_dbapi_type(self, dbapi):
543 if self.length in (None, "max") or self.length >= 2000:
544 return (dbapi.SQL_WVARCHAR, 0, 0)
545 else:
546 return dbapi.SQL_WVARCHAR
547
548
549class _JSON_pyodbc(_MSJson):
550 def get_dbapi_type(self, dbapi):
551 return (dbapi.SQL_WVARCHAR, 0, 0)
552
553
554class _JSONIndexType_pyodbc(_MSJsonIndexType):
555 def get_dbapi_type(self, dbapi):
556 return dbapi.SQL_WVARCHAR
557
558
559class _JSONPathType_pyodbc(_MSJsonPathType):
560 def get_dbapi_type(self, dbapi):
561 return dbapi.SQL_WVARCHAR
562
563
564class MSExecutionContext_pyodbc(MSExecutionContext):
565 _embedded_scope_identity = False
566
567 def pre_exec(self):
568 """where appropriate, issue "select scope_identity()" in the same
569 statement.
570
571 Background on why "scope_identity()" is preferable to "@@identity":
572 https://msdn.microsoft.com/en-us/library/ms190315.aspx
573
574 Background on why we attempt to embed "scope_identity()" into the same
575 statement as the INSERT:
576 https://code.google.com/p/pyodbc/wiki/FAQs#How_do_I_retrieve_autogenerated/identity_values?
577
578 """
579
580 super().pre_exec()
581
582 # don't embed the scope_identity select into an
583 # "INSERT .. DEFAULT VALUES"
584 if (
585 self._select_lastrowid
586 and self.dialect.use_scope_identity
587 and len(self.parameters[0])
588 ):
589 self._embedded_scope_identity = True
590
591 self.statement += "; select scope_identity()"
592
593 def post_exec(self):
594 if self._embedded_scope_identity:
595 # Fetch the last inserted id from the manipulated statement
596 # We may have to skip over a number of result sets with
597 # no data (due to triggers, etc.)
598 while True:
599 try:
600 # fetchall() ensures the cursor is consumed
601 # without closing it (FreeTDS particularly)
602 rows = self.cursor.fetchall()
603 except self.dialect.dbapi.Error:
604 # no way around this - nextset() consumes the previous set
605 # so we need to just keep flipping
606 self.cursor.nextset()
607 else:
608 if not rows:
609 # async adapter drivers just return None here
610 self.cursor.nextset()
611 continue
612 row = rows[0]
613 break
614
615 self._lastrowid = int(row[0])
616
617 self.cursor_fetch_strategy = _cursor._NO_CURSOR_DML
618 else:
619 super().post_exec()
620
621
622class MSDialect_pyodbc(PyODBCConnector, MSDialect):
623 supports_statement_cache = True
624
625 # note this parameter is no longer used by the ORM or default dialect
626 # see #9414
627 supports_sane_rowcount_returning = False
628
629 execution_ctx_cls = MSExecutionContext_pyodbc
630
631 colspecs = util.update_copy(
632 MSDialect.colspecs,
633 {
634 sqltypes.Numeric: _MSNumeric_pyodbc,
635 sqltypes.Float: _MSFloat_pyodbc,
636 BINARY: _BINARY_pyodbc,
637 # support DateTime(timezone=True)
638 sqltypes.DateTime: _ODBCDateTime,
639 DATETIMEOFFSET: _ODBCDATETIMEOFFSET,
640 # SQL Server dialect has a VARBINARY that is just to support
641 # "deprecate_large_types" w/ VARBINARY(max), but also we must
642 # handle the usual SQL standard VARBINARY
643 VARBINARY: _VARBINARY_pyodbc,
644 sqltypes.VARBINARY: _VARBINARY_pyodbc,
645 sqltypes.LargeBinary: _VARBINARY_pyodbc,
646 sqltypes.String: _String_pyodbc,
647 sqltypes.Unicode: _Unicode_pyodbc,
648 sqltypes.UnicodeText: _UnicodeText_pyodbc,
649 sqltypes.JSON: _JSON_pyodbc,
650 sqltypes.JSON.JSONIndexType: _JSONIndexType_pyodbc,
651 sqltypes.JSON.JSONPathType: _JSONPathType_pyodbc,
652 # this excludes Enum from the string/VARCHAR thing for now
653 # it looks like Enum's adaptation doesn't really support the
654 # String type itself having a dialect-level impl
655 sqltypes.Enum: sqltypes.Enum,
656 },
657 )
658
659 def __init__(
660 self,
661 fast_executemany=False,
662 use_setinputsizes=True,
663 **params,
664 ):
665 super().__init__(use_setinputsizes=use_setinputsizes, **params)
666 self.use_scope_identity = (
667 self.use_scope_identity
668 and self.dbapi
669 and hasattr(self.dbapi.Cursor, "nextset")
670 )
671 self._need_decimal_fix = self.dbapi and self._dbapi_version() < (
672 2,
673 1,
674 8,
675 )
676 self.fast_executemany = fast_executemany
677 if fast_executemany:
678 self.use_insertmanyvalues_wo_returning = False
679
680 def _get_server_version_info(self, connection):
681 try:
682 # "Version of the instance of SQL Server, in the form
683 # of 'major.minor.build.revision'"
684 raw = connection.exec_driver_sql(
685 "SELECT CAST(SERVERPROPERTY('ProductVersion') AS VARCHAR)"
686 ).scalar()
687 except exc.DBAPIError:
688 # SQL Server docs indicate this function isn't present prior to
689 # 2008. Before we had the VARCHAR cast above, pyodbc would also
690 # fail on this query.
691 return super()._get_server_version_info(connection)
692 else:
693 version = []
694 r = re.compile(r"[.\-]")
695 for n in r.split(raw):
696 try:
697 version.append(int(n))
698 except ValueError:
699 pass
700 return tuple(version)
701
702 def on_connect(self):
703 super_ = super().on_connect()
704
705 def on_connect(conn):
706 if super_ is not None:
707 super_(conn)
708
709 self._setup_timestampoffset_type(conn)
710
711 return on_connect
712
713 def _setup_timestampoffset_type(self, connection):
714 # output converter function for datetimeoffset
715 def _handle_datetimeoffset(dto_value):
716 tup = struct.unpack("<6hI2h", dto_value)
717 return datetime.datetime(
718 tup[0],
719 tup[1],
720 tup[2],
721 tup[3],
722 tup[4],
723 tup[5],
724 tup[6] // 1000,
725 datetime.timezone(
726 datetime.timedelta(hours=tup[7], minutes=tup[8])
727 ),
728 )
729
730 odbc_SQL_SS_TIMESTAMPOFFSET = -155 # as defined in SQLNCLI.h
731 connection.add_output_converter(
732 odbc_SQL_SS_TIMESTAMPOFFSET, _handle_datetimeoffset
733 )
734
735 def do_executemany(self, cursor, statement, parameters, context=None):
736 if self.fast_executemany:
737 cursor.fast_executemany = True
738 super().do_executemany(cursor, statement, parameters, context=context)
739
740 def is_disconnect(self, e, connection, cursor):
741 if isinstance(e, self.dbapi.Error):
742 code = e.args[0]
743 if code in {
744 "08S01",
745 "01000",
746 "01002",
747 "08003",
748 "08007",
749 "08S02",
750 "08001",
751 "HYT00",
752 "HY010",
753 "10054",
754 }:
755 return True
756 return super().is_disconnect(e, connection, cursor)
757
758
759dialect = MSDialect_pyodbc