Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1053 statements  

1# engine/default.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Default implementations of per-dialect sqlalchemy.engine classes. 

10 

11These are semi-private implementation classes which are only of importance 

12to database dialect authors; dialects will usually use the classes here 

13as the base class for their own corresponding classes. 

14 

15""" 

16 

17from __future__ import annotations 

18 

19import functools 

20import operator 

21import random 

22import re 

23from time import perf_counter 

24import typing 

25from typing import Any 

26from typing import Callable 

27from typing import cast 

28from typing import Dict 

29from typing import Final 

30from typing import List 

31from typing import Literal 

32from typing import Mapping 

33from typing import MutableMapping 

34from typing import MutableSequence 

35from typing import Optional 

36from typing import Sequence 

37from typing import Set 

38from typing import Tuple 

39from typing import Type 

40from typing import TYPE_CHECKING 

41from typing import Union 

42import weakref 

43 

44from . import characteristics 

45from . import cursor as _cursor 

46from . import interfaces 

47from .base import Connection 

48from .interfaces import CacheStats 

49from .interfaces import DBAPICursor 

50from .interfaces import Dialect 

51from .interfaces import ExecuteStyle 

52from .interfaces import ExecutionContext 

53from .reflection import ObjectKind 

54from .reflection import ObjectScope 

55from .. import event 

56from .. import exc 

57from .. import pool 

58from .. import util 

59from ..sql import compiler 

60from ..sql import dml 

61from ..sql import expression 

62from ..sql import type_api 

63from ..sql import util as sql_util 

64from ..sql._typing import is_tuple_type 

65from ..sql.base import _NoArg 

66from ..sql.compiler import AggregateOrderByStyle 

67from ..sql.compiler import DDLCompiler 

68from ..sql.compiler import InsertmanyvaluesSentinelOpts 

69from ..sql.compiler import SQLCompiler 

70from ..sql.elements import quoted_name 

71from ..util.typing import TupleAny 

72from ..util.typing import Unpack 

73 

74if typing.TYPE_CHECKING: 

75 from types import ModuleType 

76 

77 from .base import Engine 

78 from .cursor import ResultFetchStrategy 

79 from .interfaces import _CoreMultiExecuteParams 

80 from .interfaces import _CoreSingleExecuteParams 

81 from .interfaces import _DBAPICursorDescription 

82 from .interfaces import _DBAPIMultiExecuteParams 

83 from .interfaces import _DBAPISingleExecuteParams 

84 from .interfaces import _ExecuteOptions 

85 from .interfaces import _MutableCoreSingleExecuteParams 

86 from .interfaces import _ParamStyle 

87 from .interfaces import ConnectArgsType 

88 from .interfaces import DBAPIConnection 

89 from .interfaces import DBAPIModule 

90 from .interfaces import IsolationLevel 

91 from .row import Row 

92 from .url import URL 

93 from ..event import _ListenerFnType 

94 from ..pool import Pool 

95 from ..pool import PoolProxiedConnection 

96 from ..sql import Executable 

97 from ..sql.compiler import Compiled 

98 from ..sql.compiler import Linting 

99 from ..sql.compiler import ResultColumnsEntry 

100 from ..sql.dml import DMLState 

101 from ..sql.dml import UpdateBase 

102 from ..sql.elements import BindParameter 

103 from ..sql.schema import Column 

104 from ..sql.type_api import _BindProcessorType 

105 from ..sql.type_api import _ResultProcessorType 

106 from ..sql.type_api import TypeEngine 

107 

108 

109# When we're handed literal SQL, ensure it's a SELECT query 

110SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE) 

111 

112 

113( 

114 CACHE_HIT, 

115 CACHE_MISS, 

116 CACHING_DISABLED, 

117 NO_CACHE_KEY, 

118 NO_DIALECT_SUPPORT, 

119) = list(CacheStats) 

120 

121 

122class DefaultDialect(Dialect): 

123 """Default implementation of Dialect""" 

124 

125 statement_compiler = compiler.SQLCompiler 

126 ddl_compiler = compiler.DDLCompiler 

127 type_compiler_cls = compiler.GenericTypeCompiler 

128 

129 preparer = compiler.IdentifierPreparer 

130 supports_alter = True 

131 supports_comments = False 

132 supports_constraint_comments = False 

133 inline_comments = False 

134 supports_statement_cache = True 

135 

136 div_is_floordiv = True 

137 

138 bind_typing = interfaces.BindTyping.NONE 

139 

140 include_set_input_sizes: Optional[Set[Any]] = None 

141 exclude_set_input_sizes: Optional[Set[Any]] = None 

142 

143 # the first value we'd get for an autoincrement column. 

144 default_sequence_base = 1 

145 

146 # most DBAPIs happy with this for execute(). 

147 # not cx_oracle. 

148 execute_sequence_format = tuple 

149 

150 supports_schemas = True 

151 supports_views = True 

152 supports_sequences = False 

153 sequences_optional = False 

154 preexecute_autoincrement_sequences = False 

155 supports_identity_columns = False 

156 postfetch_lastrowid = True 

157 favor_returning_over_lastrowid = False 

158 insert_null_pk_still_autoincrements = False 

159 update_returning = False 

160 delete_returning = False 

161 update_returning_multifrom = False 

162 delete_returning_multifrom = False 

163 insert_returning = False 

164 

165 aggregate_order_by_style = AggregateOrderByStyle.INLINE 

166 

167 cte_follows_insert = False 

168 

169 supports_native_enum = False 

170 supports_native_boolean = False 

171 supports_native_uuid = False 

172 returns_native_bytes = False 

173 

174 non_native_boolean_check_constraint = True 

175 

176 supports_simple_order_by_label = True 

177 

178 tuple_in_values = False 

179 

180 connection_characteristics = util.immutabledict( 

181 { 

182 "isolation_level": characteristics.IsolationLevelCharacteristic(), 

183 "logging_token": characteristics.LoggingTokenCharacteristic(), 

184 } 

185 ) 

186 

187 engine_config_types: Mapping[str, Any] = util.immutabledict( 

188 { 

189 "pool_timeout": util.asint, 

190 "echo": util.bool_or_str("debug"), 

191 "echo_pool": util.bool_or_str("debug"), 

192 "pool_recycle": util.asint, 

193 "pool_size": util.asint, 

194 "max_overflow": util.asint, 

195 "future": util.asbool, 

196 } 

197 ) 

198 

199 # if the NUMERIC type 

200 # returns decimal.Decimal. 

201 # *not* the FLOAT type however. 

202 supports_native_decimal = False 

203 

204 name = "default" 

205 

206 # length at which to truncate 

207 # any identifier. 

208 max_identifier_length = 9999 

209 _user_defined_max_identifier_length: Optional[int] = None 

210 

211 isolation_level: Optional[str] = None 

212 

213 # sub-categories of max_identifier_length. 

214 # currently these accommodate for MySQL which allows alias names 

215 # of 255 but DDL names only of 64. 

216 max_index_name_length: Optional[int] = None 

217 max_constraint_name_length: Optional[int] = None 

218 

219 supports_sane_rowcount = True 

220 supports_sane_multi_rowcount = True 

221 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {} 

222 default_paramstyle = "named" 

223 

224 supports_default_values = False 

225 """dialect supports INSERT... DEFAULT VALUES syntax""" 

226 

227 supports_default_metavalue = False 

228 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

229 

230 default_metavalue_token = "DEFAULT" 

231 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

232 parenthesis.""" 

233 

234 # not sure if this is a real thing but the compiler will deliver it 

235 # if this is the only flag enabled. 

236 supports_empty_insert = True 

237 """dialect supports INSERT () VALUES ()""" 

238 

239 supports_multivalues_insert = False 

240 

241 use_insertmanyvalues: bool = False 

242 

243 use_insertmanyvalues_wo_returning: bool = False 

244 

245 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = ( 

246 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED 

247 ) 

248 

249 insertmanyvalues_page_size: int = 1000 

250 insertmanyvalues_max_parameters = 32700 

251 

252 supports_is_distinct_from = True 

253 

254 supports_server_side_cursors = False 

255 

256 server_side_cursors = False 

257 

258 # extra record-level locking features (#4860) 

259 supports_for_update_of = False 

260 

261 server_version_info = None 

262 

263 default_schema_name: Optional[str] = None 

264 

265 # indicates symbol names are 

266 # UPPERCASED if they are case insensitive 

267 # within the database. 

268 # if this is True, the methods normalize_name() 

269 # and denormalize_name() must be provided. 

270 requires_name_normalize = False 

271 

272 is_async = False 

273 

274 has_terminate = False 

275 

276 # TODO: this is not to be part of 2.0. implement rudimentary binary 

277 # literals for SQLite, PostgreSQL, MySQL only within 

278 # _Binary.literal_processor 

279 _legacy_binary_type_literal_encoding = "utf-8" 

280 

281 @util.deprecated_params( 

282 empty_in_strategy=( 

283 "1.4", 

284 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is " 

285 "deprecated, and no longer has any effect. All IN expressions " 

286 "are now rendered using " 

287 'the "expanding parameter" strategy which renders a set of bound' 

288 'expressions, or an "empty set" SELECT, at statement execution' 

289 "time.", 

290 ), 

291 server_side_cursors=( 

292 "1.4", 

293 "The :paramref:`_sa.create_engine.server_side_cursors` parameter " 

294 "is deprecated and will be removed in a future release. Please " 

295 "use the " 

296 ":paramref:`_engine.Connection.execution_options.stream_results` " 

297 "parameter.", 

298 ), 

299 ) 

300 def __init__( 

301 self, 

302 paramstyle: Optional[_ParamStyle] = None, 

303 isolation_level: Optional[IsolationLevel] = None, 

304 dbapi: Optional[ModuleType] = None, 

305 implicit_returning: Literal[True] = True, 

306 supports_native_boolean: Optional[bool] = None, 

307 max_identifier_length: Optional[int] = None, 

308 label_length: Optional[int] = None, 

309 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG, 

310 use_insertmanyvalues: Optional[bool] = None, 

311 # util.deprecated_params decorator cannot render the 

312 # Linting.NO_LINTING constant 

313 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore 

314 server_side_cursors: bool = False, 

315 skip_autocommit_rollback: bool = False, 

316 **kwargs: Any, 

317 ): 

318 if server_side_cursors: 

319 if not self.supports_server_side_cursors: 

320 raise exc.ArgumentError( 

321 "Dialect %s does not support server side cursors" % self 

322 ) 

323 else: 

324 self.server_side_cursors = True 

325 

326 if getattr(self, "use_setinputsizes", False): 

327 util.warn_deprecated( 

328 "The dialect-level use_setinputsizes attribute is " 

329 "deprecated. Please use " 

330 "bind_typing = BindTyping.SETINPUTSIZES", 

331 "2.0", 

332 ) 

333 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES 

334 

335 self.positional = False 

336 self._ischema = None 

337 

338 self.dbapi = dbapi 

339 

340 self.skip_autocommit_rollback = skip_autocommit_rollback 

341 

342 if paramstyle is not None: 

343 self.paramstyle = paramstyle 

344 elif self.dbapi is not None: 

345 self.paramstyle = self.dbapi.paramstyle 

346 else: 

347 self.paramstyle = self.default_paramstyle 

348 self.positional = self.paramstyle in ( 

349 "qmark", 

350 "format", 

351 "numeric", 

352 "numeric_dollar", 

353 ) 

354 self.identifier_preparer = self.preparer(self) 

355 self._on_connect_isolation_level = isolation_level 

356 

357 legacy_tt_callable = getattr(self, "type_compiler", None) 

358 if legacy_tt_callable is not None: 

359 tt_callable = cast( 

360 Type[compiler.GenericTypeCompiler], 

361 self.type_compiler, 

362 ) 

363 else: 

364 tt_callable = self.type_compiler_cls 

365 

366 self.type_compiler_instance = self.type_compiler = tt_callable(self) 

367 

368 if supports_native_boolean is not None: 

369 self.supports_native_boolean = supports_native_boolean 

370 

371 self._user_defined_max_identifier_length = max_identifier_length 

372 if self._user_defined_max_identifier_length: 

373 self.max_identifier_length = ( 

374 self._user_defined_max_identifier_length 

375 ) 

376 self.label_length = label_length 

377 self.compiler_linting = compiler_linting 

378 

379 if use_insertmanyvalues is not None: 

380 self.use_insertmanyvalues = use_insertmanyvalues 

381 

382 if insertmanyvalues_page_size is not _NoArg.NO_ARG: 

383 self.insertmanyvalues_page_size = insertmanyvalues_page_size 

384 

385 @property 

386 @util.deprecated( 

387 "2.0", 

388 "full_returning is deprecated, please use insert_returning, " 

389 "update_returning, delete_returning", 

390 ) 

391 def full_returning(self): 

392 return ( 

393 self.insert_returning 

394 and self.update_returning 

395 and self.delete_returning 

396 ) 

397 

398 @util.memoized_property 

399 def insert_executemany_returning(self): 

400 """Default implementation for insert_executemany_returning, if not 

401 otherwise overridden by the specific dialect. 

402 

403 The default dialect determines "insert_executemany_returning" is 

404 available if the dialect in use has opted into using the 

405 "use_insertmanyvalues" feature. If they haven't opted into that, then 

406 this attribute is False, unless the dialect in question overrides this 

407 and provides some other implementation (such as the Oracle Database 

408 dialects). 

409 

410 """ 

411 return self.insert_returning and self.use_insertmanyvalues 

412 

413 @util.memoized_property 

414 def insert_executemany_returning_sort_by_parameter_order(self): 

415 """Default implementation for 

416 insert_executemany_returning_deterministic_order, if not otherwise 

417 overridden by the specific dialect. 

418 

419 The default dialect determines "insert_executemany_returning" can have 

420 deterministic order only if the dialect in use has opted into using the 

421 "use_insertmanyvalues" feature, which implements deterministic ordering 

422 using client side sentinel columns only by default. The 

423 "insertmanyvalues" feature also features alternate forms that can 

424 use server-generated PK values as "sentinels", but those are only 

425 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel` 

426 bitflag enables those alternate SQL forms, which are disabled 

427 by default. 

428 

429 If the dialect in use hasn't opted into that, then this attribute is 

430 False, unless the dialect in question overrides this and provides some 

431 other implementation (such as the Oracle Database dialects). 

432 

433 """ 

434 return self.insert_returning and self.use_insertmanyvalues 

435 

436 update_executemany_returning = False 

437 delete_executemany_returning = False 

438 

439 @util.memoized_property 

440 def loaded_dbapi(self) -> DBAPIModule: 

441 if self.dbapi is None: 

442 raise exc.InvalidRequestError( 

443 f"Dialect {self} does not have a Python DBAPI established " 

444 "and cannot be used for actual database interaction" 

445 ) 

446 return self.dbapi 

447 

448 @util.memoized_property 

449 def _bind_typing_render_casts(self): 

450 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS 

451 

452 def _ensure_has_table_connection(self, arg: Connection) -> None: 

453 if not isinstance(arg, Connection): 

454 raise exc.ArgumentError( 

455 "The argument passed to Dialect.has_table() should be a " 

456 "%s, got %s. " 

457 "Additionally, the Dialect.has_table() method is for " 

458 "internal dialect " 

459 "use only; please use " 

460 "``inspect(some_engine).has_table(<tablename>>)`` " 

461 "for public API use." % (Connection, type(arg)) 

462 ) 

463 

464 @util.memoized_property 

465 def _supports_statement_cache(self): 

466 ssc = self.__class__.__dict__.get("supports_statement_cache", None) 

467 if ssc is None: 

468 util.warn( 

469 "Dialect %s:%s will not make use of SQL compilation caching " 

470 "as it does not set the 'supports_statement_cache' attribute " 

471 "to ``True``. This can have " 

472 "significant performance implications including some " 

473 "performance degradations in comparison to prior SQLAlchemy " 

474 "versions. Dialect maintainers should seek to set this " 

475 "attribute to True after appropriate development and testing " 

476 "for SQLAlchemy 1.4 caching support. Alternatively, this " 

477 "attribute may be set to False which will disable this " 

478 "warning." % (self.name, self.driver), 

479 code="cprf", 

480 ) 

481 

482 return bool(ssc) 

483 

484 @util.memoized_property 

485 def _type_memos(self): 

486 return weakref.WeakKeyDictionary() 

487 

488 @property 

489 def dialect_description(self): # type: ignore[override] 

490 return self.name + "+" + self.driver 

491 

492 @property 

493 def supports_sane_rowcount_returning(self): 

494 """True if this dialect supports sane rowcount even if RETURNING is 

495 in use. 

496 

497 For dialects that don't support RETURNING, this is synonymous with 

498 ``supports_sane_rowcount``. 

499 

500 """ 

501 return self.supports_sane_rowcount 

502 

503 @classmethod 

504 def get_pool_class(cls, url: URL) -> Type[Pool]: 

505 default: Type[pool.Pool] 

506 if cls.is_async: 

507 default = pool.AsyncAdaptedQueuePool 

508 else: 

509 default = pool.QueuePool 

510 

511 return getattr(cls, "poolclass", default) 

512 

513 def get_dialect_pool_class(self, url: URL) -> Type[Pool]: 

514 return self.get_pool_class(url) 

515 

516 @classmethod 

517 def load_provisioning(cls): 

518 package = ".".join(cls.__module__.split(".")[0:-1]) 

519 try: 

520 __import__(package + ".provision") 

521 except ImportError: 

522 pass 

523 

524 def _builtin_onconnect(self) -> Optional[_ListenerFnType]: 

525 if self._on_connect_isolation_level is not None: 

526 

527 def builtin_connect(dbapi_conn, conn_rec): 

528 self._assert_and_set_isolation_level( 

529 dbapi_conn, self._on_connect_isolation_level 

530 ) 

531 

532 return builtin_connect 

533 else: 

534 return None 

535 

536 def initialize(self, connection: Connection) -> None: 

537 try: 

538 self.server_version_info = self._get_server_version_info( 

539 connection 

540 ) 

541 except NotImplementedError: 

542 self.server_version_info = None 

543 try: 

544 self.default_schema_name = self._get_default_schema_name( 

545 connection 

546 ) 

547 except NotImplementedError: 

548 self.default_schema_name = None 

549 

550 try: 

551 self.default_isolation_level = self.get_default_isolation_level( 

552 connection.connection.dbapi_connection 

553 ) 

554 except NotImplementedError: 

555 self.default_isolation_level = None 

556 

557 if not self._user_defined_max_identifier_length: 

558 max_ident_length = self._check_max_identifier_length(connection) 

559 if max_ident_length: 

560 self.max_identifier_length = max_ident_length 

561 

562 if ( 

563 self.label_length 

564 and self.label_length > self.max_identifier_length 

565 ): 

566 raise exc.ArgumentError( 

567 "Label length of %d is greater than this dialect's" 

568 " maximum identifier length of %d" 

569 % (self.label_length, self.max_identifier_length) 

570 ) 

571 

572 def on_connect(self) -> Optional[Callable[[Any], None]]: 

573 # inherits the docstring from interfaces.Dialect.on_connect 

574 return None 

575 

576 def _check_max_identifier_length(self, connection): 

577 """Perform a connection / server version specific check to determine 

578 the max_identifier_length. 

579 

580 If the dialect's class level max_identifier_length should be used, 

581 can return None. 

582 

583 """ 

584 return None 

585 

586 def get_default_isolation_level(self, dbapi_conn): 

587 """Given a DBAPI connection, return its isolation level, or 

588 a default isolation level if one cannot be retrieved. 

589 

590 May be overridden by subclasses in order to provide a 

591 "fallback" isolation level for databases that cannot reliably 

592 retrieve the actual isolation level. 

593 

594 By default, calls the :meth:`_engine.Interfaces.get_isolation_level` 

595 method, propagating any exceptions raised. 

596 

597 """ 

598 return self.get_isolation_level(dbapi_conn) 

599 

600 def type_descriptor(self, typeobj): 

601 """Provide a database-specific :class:`.TypeEngine` object, given 

602 the generic object which comes from the types module. 

603 

604 This method looks for a dictionary called 

605 ``colspecs`` as a class or instance-level variable, 

606 and passes on to :func:`_types.adapt_type`. 

607 

608 """ 

609 return type_api.adapt_type(typeobj, self.colspecs) 

610 

611 def has_index(self, connection, table_name, index_name, schema=None, **kw): 

612 if not self.has_table(connection, table_name, schema=schema, **kw): 

613 return False 

614 for idx in self.get_indexes( 

615 connection, table_name, schema=schema, **kw 

616 ): 

617 if idx["name"] == index_name: 

618 return True 

619 else: 

620 return False 

621 

622 def has_schema( 

623 self, connection: Connection, schema_name: str, **kw: Any 

624 ) -> bool: 

625 return schema_name in self.get_schema_names(connection, **kw) 

626 

627 def validate_identifier(self, ident: str) -> None: 

628 if len(ident) > self.max_identifier_length: 

629 raise exc.IdentifierError( 

630 "Identifier '%s' exceeds maximum length of %d characters" 

631 % (ident, self.max_identifier_length) 

632 ) 

633 

634 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection: 

635 # inherits the docstring from interfaces.Dialect.connect 

636 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501 

637 

638 def create_connect_args(self, url: URL) -> ConnectArgsType: 

639 # inherits the docstring from interfaces.Dialect.create_connect_args 

640 opts = url.translate_connect_args() 

641 opts.update(url.query) 

642 return ([], opts) 

643 

644 def set_engine_execution_options( 

645 self, engine: Engine, opts: Mapping[str, Any] 

646 ) -> None: 

647 supported_names = set(self.connection_characteristics).intersection( 

648 opts 

649 ) 

650 if supported_names: 

651 characteristics: Mapping[str, Any] = util.immutabledict( 

652 (name, opts[name]) for name in supported_names 

653 ) 

654 

655 @event.listens_for(engine, "engine_connect") 

656 def set_connection_characteristics(connection): 

657 self._set_connection_characteristics( 

658 connection, characteristics 

659 ) 

660 

661 def set_connection_execution_options( 

662 self, connection: Connection, opts: Mapping[str, Any] 

663 ) -> None: 

664 supported_names = set(self.connection_characteristics).intersection( 

665 opts 

666 ) 

667 if supported_names: 

668 characteristics: Mapping[str, Any] = util.immutabledict( 

669 (name, opts[name]) for name in supported_names 

670 ) 

671 self._set_connection_characteristics(connection, characteristics) 

672 

673 def _set_connection_characteristics(self, connection, characteristics): 

674 characteristic_values = [ 

675 (name, self.connection_characteristics[name], value) 

676 for name, value in characteristics.items() 

677 ] 

678 

679 if connection.in_transaction(): 

680 trans_objs = [ 

681 (name, obj) 

682 for name, obj, _ in characteristic_values 

683 if obj.transactional 

684 ] 

685 if trans_objs: 

686 raise exc.InvalidRequestError( 

687 "This connection has already initialized a SQLAlchemy " 

688 "Transaction() object via begin() or autobegin; " 

689 "%s may not be altered unless rollback() or commit() " 

690 "is called first." 

691 % (", ".join(name for name, obj in trans_objs)) 

692 ) 

693 

694 dbapi_connection = connection.connection.dbapi_connection 

695 for _, characteristic, value in characteristic_values: 

696 characteristic.set_connection_characteristic( 

697 self, connection, dbapi_connection, value 

698 ) 

699 connection.connection._connection_record.finalize_callback.append( 

700 functools.partial(self._reset_characteristics, characteristics) 

701 ) 

702 

703 def _reset_characteristics(self, characteristics, dbapi_connection): 

704 for characteristic_name in characteristics: 

705 characteristic = self.connection_characteristics[ 

706 characteristic_name 

707 ] 

708 characteristic.reset_characteristic(self, dbapi_connection) 

709 

710 def do_begin(self, dbapi_connection): 

711 pass 

712 

713 def do_rollback(self, dbapi_connection): 

714 if self.skip_autocommit_rollback and self.detect_autocommit_setting( 

715 dbapi_connection 

716 ): 

717 return 

718 dbapi_connection.rollback() 

719 

720 def do_commit(self, dbapi_connection): 

721 dbapi_connection.commit() 

722 

723 def do_terminate(self, dbapi_connection): 

724 self.do_close(dbapi_connection) 

725 

726 def do_close(self, dbapi_connection): 

727 dbapi_connection.close() 

728 

729 @util.memoized_property 

730 def _dialect_specific_select_one(self): 

731 return str(expression.select(1).compile(dialect=self)) 

732 

733 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: 

734 try: 

735 return self.do_ping(dbapi_connection) 

736 except self.loaded_dbapi.Error as err: 

737 is_disconnect = self.is_disconnect(err, dbapi_connection, None) 

738 

739 if self._has_events: 

740 try: 

741 Connection._handle_dbapi_exception_noconnection( 

742 err, 

743 self, 

744 is_disconnect=is_disconnect, 

745 invalidate_pool_on_disconnect=False, 

746 is_pre_ping=True, 

747 ) 

748 except exc.StatementError as new_err: 

749 is_disconnect = new_err.connection_invalidated 

750 

751 if is_disconnect: 

752 return False 

753 else: 

754 raise 

755 

756 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool: 

757 cursor = dbapi_connection.cursor() 

758 try: 

759 cursor.execute(self._dialect_specific_select_one) 

760 finally: 

761 cursor.close() 

762 return True 

763 

764 def create_xid(self): 

765 """Create a random two-phase transaction ID. 

766 

767 This id will be passed to do_begin_twophase(), do_rollback_twophase(), 

768 do_commit_twophase(). Its format is unspecified. 

769 """ 

770 

771 return "_sa_%032x" % random.randint(0, 2**128) 

772 

773 def do_savepoint(self, connection, name): 

774 connection.execute(expression.SavepointClause(name)) 

775 

776 def do_rollback_to_savepoint(self, connection, name): 

777 connection.execute(expression.RollbackToSavepointClause(name)) 

778 

779 def do_release_savepoint(self, connection, name): 

780 connection.execute(expression.ReleaseSavepointClause(name)) 

781 

782 def _deliver_insertmanyvalues_batches( 

783 self, 

784 connection, 

785 cursor, 

786 statement, 

787 parameters, 

788 generic_setinputsizes, 

789 context, 

790 ): 

791 context = cast(DefaultExecutionContext, context) 

792 compiled = cast(SQLCompiler, context.compiled) 

793 

794 _composite_sentinel_proc: Sequence[ 

795 Optional[_ResultProcessorType[Any]] 

796 ] = () 

797 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None 

798 _sentinel_proc_initialized: bool = False 

799 

800 compiled_parameters = context.compiled_parameters 

801 

802 imv = compiled._insertmanyvalues 

803 assert imv is not None 

804 

805 is_returning: Final[bool] = bool(compiled.effective_returning) 

806 batch_size = context.execution_options.get( 

807 "insertmanyvalues_page_size", self.insertmanyvalues_page_size 

808 ) 

809 

810 if compiled.schema_translate_map: 

811 schema_translate_map = context.execution_options.get( 

812 "schema_translate_map", {} 

813 ) 

814 else: 

815 schema_translate_map = None 

816 

817 if is_returning: 

818 result: Optional[List[Any]] = [] 

819 context._insertmanyvalues_rows = result 

820 

821 sort_by_parameter_order = imv.sort_by_parameter_order 

822 

823 else: 

824 sort_by_parameter_order = False 

825 result = None 

826 

827 for imv_batch in compiled._deliver_insertmanyvalues_batches( 

828 statement, 

829 parameters, 

830 compiled_parameters, 

831 generic_setinputsizes, 

832 batch_size, 

833 sort_by_parameter_order, 

834 schema_translate_map, 

835 ): 

836 yield imv_batch 

837 

838 if is_returning: 

839 

840 try: 

841 rows = context.fetchall_for_returning(cursor) 

842 except BaseException as be: 

843 connection._handle_dbapi_exception( 

844 be, 

845 sql_util._long_statement(imv_batch.replaced_statement), 

846 imv_batch.replaced_parameters, 

847 None, 

848 context, 

849 is_sub_exec=True, 

850 ) 

851 

852 # I would have thought "is_returning: Final[bool]" 

853 # would have assured this but pylance thinks not 

854 assert result is not None 

855 

856 if imv.num_sentinel_columns and not imv_batch.is_downgraded: 

857 composite_sentinel = imv.num_sentinel_columns > 1 

858 if imv.implicit_sentinel: 

859 # for implicit sentinel, which is currently single-col 

860 # integer autoincrement, do a simple sort. 

861 assert not composite_sentinel 

862 result.extend( 

863 sorted(rows, key=operator.itemgetter(-1)) 

864 ) 

865 continue 

866 

867 # otherwise, create dictionaries to match up batches 

868 # with parameters 

869 assert imv.sentinel_param_keys 

870 assert imv.sentinel_columns 

871 

872 _nsc = imv.num_sentinel_columns 

873 

874 if not _sentinel_proc_initialized: 

875 if composite_sentinel: 

876 _composite_sentinel_proc = [ 

877 col.type._cached_result_processor( 

878 self, cursor_desc[1] 

879 ) 

880 for col, cursor_desc in zip( 

881 imv.sentinel_columns, 

882 cursor.description[-_nsc:], 

883 ) 

884 ] 

885 else: 

886 _scalar_sentinel_proc = ( 

887 imv.sentinel_columns[0] 

888 ).type._cached_result_processor( 

889 self, cursor.description[-1][1] 

890 ) 

891 _sentinel_proc_initialized = True 

892 

893 rows_by_sentinel: Union[ 

894 Dict[Tuple[Any, ...], Any], 

895 Dict[Any, Any], 

896 ] 

897 if composite_sentinel: 

898 rows_by_sentinel = { 

899 tuple( 

900 (proc(val) if proc else val) 

901 for val, proc in zip( 

902 row[-_nsc:], _composite_sentinel_proc 

903 ) 

904 ): row 

905 for row in rows 

906 } 

907 elif _scalar_sentinel_proc: 

908 rows_by_sentinel = { 

909 _scalar_sentinel_proc(row[-1]): row for row in rows 

910 } 

911 else: 

912 rows_by_sentinel = {row[-1]: row for row in rows} 

913 

914 if len(rows_by_sentinel) != len(imv_batch.batch): 

915 # see test_insert_exec.py:: 

916 # IMVSentinelTest::test_sentinel_incorrect_rowcount 

917 # for coverage / demonstration 

918 raise exc.InvalidRequestError( 

919 f"Sentinel-keyed result set did not produce " 

920 f"correct number of rows {len(imv_batch.batch)}; " 

921 "produced " 

922 f"{len(rows_by_sentinel)}. Please ensure the " 

923 "sentinel column is fully unique and populated in " 

924 "all cases." 

925 ) 

926 

927 try: 

928 ordered_rows = [ 

929 rows_by_sentinel[sentinel_keys] 

930 for sentinel_keys in imv_batch.sentinel_values 

931 ] 

932 except KeyError as ke: 

933 # see test_insert_exec.py:: 

934 # IMVSentinelTest::test_sentinel_cant_match_keys 

935 # for coverage / demonstration 

936 raise exc.InvalidRequestError( 

937 f"Can't match sentinel values in result set to " 

938 f"parameter sets; key {ke.args[0]!r} was not " 

939 "found. " 

940 "There may be a mismatch between the datatype " 

941 "passed to the DBAPI driver vs. that which it " 

942 "returns in a result row. Ensure the given " 

943 "Python value matches the expected result type " 

944 "*exactly*, taking care to not rely upon implicit " 

945 "conversions which may occur such as when using " 

946 "strings in place of UUID or integer values, etc. " 

947 ) from ke 

948 

949 result.extend(ordered_rows) 

950 

951 else: 

952 result.extend(rows) 

953 

954 def do_executemany(self, cursor, statement, parameters, context=None): 

955 cursor.executemany(statement, parameters) 

956 

957 def do_execute(self, cursor, statement, parameters, context=None): 

958 cursor.execute(statement, parameters) 

959 

960 def do_execute_no_params(self, cursor, statement, context=None): 

961 cursor.execute(statement) 

962 

963 def is_disconnect( 

964 self, 

965 e: DBAPIModule.Error, 

966 connection: Union[ 

967 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None 

968 ], 

969 cursor: Optional[interfaces.DBAPICursor], 

970 ) -> bool: 

971 return False 

972 

973 @util.memoized_instancemethod 

974 def _gen_allowed_isolation_levels(self, dbapi_conn): 

975 try: 

976 raw_levels = list(self.get_isolation_level_values(dbapi_conn)) 

977 except NotImplementedError: 

978 return None 

979 else: 

980 normalized_levels = [ 

981 level.replace("_", " ").upper() for level in raw_levels 

982 ] 

983 if raw_levels != normalized_levels: 

984 raise ValueError( 

985 f"Dialect {self.name!r} get_isolation_level_values() " 

986 f"method should return names as UPPERCASE using spaces, " 

987 f"not underscores; got " 

988 f"{sorted(set(raw_levels).difference(normalized_levels))}" 

989 ) 

990 return tuple(normalized_levels) 

991 

992 def _assert_and_set_isolation_level(self, dbapi_conn, level): 

993 level = level.replace("_", " ").upper() 

994 

995 _allowed_isolation_levels = self._gen_allowed_isolation_levels( 

996 dbapi_conn 

997 ) 

998 if ( 

999 _allowed_isolation_levels 

1000 and level not in _allowed_isolation_levels 

1001 ): 

1002 raise exc.ArgumentError( 

1003 f"Invalid value {level!r} for isolation_level. " 

1004 f"Valid isolation levels for {self.name!r} are " 

1005 f"{', '.join(_allowed_isolation_levels)}" 

1006 ) 

1007 

1008 self.set_isolation_level(dbapi_conn, level) 

1009 

1010 def reset_isolation_level(self, dbapi_conn): 

1011 if self._on_connect_isolation_level is not None: 

1012 assert ( 

1013 self._on_connect_isolation_level == "AUTOCOMMIT" 

1014 or self._on_connect_isolation_level 

1015 == self.default_isolation_level 

1016 ) 

1017 self._assert_and_set_isolation_level( 

1018 dbapi_conn, self._on_connect_isolation_level 

1019 ) 

1020 else: 

1021 assert self.default_isolation_level is not None 

1022 self._assert_and_set_isolation_level( 

1023 dbapi_conn, 

1024 self.default_isolation_level, 

1025 ) 

1026 

1027 def normalize_name(self, name): 

1028 if name is None: 

1029 return None 

1030 

1031 name_lower = name.lower() 

1032 name_upper = name.upper() 

1033 

1034 if name_upper == name_lower: 

1035 # name has no upper/lower conversion, e.g. non-european characters. 

1036 # return unchanged 

1037 return name 

1038 elif name_upper == name and not ( 

1039 self.identifier_preparer._requires_quotes 

1040 )(name_lower): 

1041 # name is all uppercase and doesn't require quoting; normalize 

1042 # to all lower case 

1043 return name_lower 

1044 elif name_lower == name: 

1045 # name is all lower case, which if denormalized means we need to 

1046 # force quoting on it 

1047 return quoted_name(name, quote=True) 

1048 else: 

1049 # name is mixed case, means it will be quoted in SQL when used 

1050 # later, no normalizes 

1051 return name 

1052 

1053 def denormalize_name(self, name): 

1054 if name is None: 

1055 return None 

1056 

1057 name_lower = name.lower() 

1058 name_upper = name.upper() 

1059 

1060 if name_upper == name_lower: 

1061 # name has no upper/lower conversion, e.g. non-european characters. 

1062 # return unchanged 

1063 return name 

1064 elif name_lower == name and not ( 

1065 self.identifier_preparer._requires_quotes 

1066 )(name_lower): 

1067 name = name_upper 

1068 return name 

1069 

1070 def get_driver_connection(self, connection: DBAPIConnection) -> Any: 

1071 return connection 

1072 

1073 def _overrides_default(self, method): 

1074 return ( 

1075 getattr(type(self), method).__code__ 

1076 is not getattr(DefaultDialect, method).__code__ 

1077 ) 

1078 

1079 def _default_multi_reflect( 

1080 self, 

1081 single_tbl_method, 

1082 connection, 

1083 kind, 

1084 schema, 

1085 filter_names, 

1086 scope, 

1087 **kw, 

1088 ): 

1089 names_fns = [] 

1090 temp_names_fns = [] 

1091 if ObjectKind.TABLE in kind: 

1092 names_fns.append(self.get_table_names) 

1093 temp_names_fns.append(self.get_temp_table_names) 

1094 if ObjectKind.VIEW in kind: 

1095 names_fns.append(self.get_view_names) 

1096 temp_names_fns.append(self.get_temp_view_names) 

1097 if ObjectKind.MATERIALIZED_VIEW in kind: 

1098 names_fns.append(self.get_materialized_view_names) 

1099 # no temp materialized view at the moment 

1100 # temp_names_fns.append(self.get_temp_materialized_view_names) 

1101 

1102 unreflectable = kw.pop("unreflectable", {}) 

1103 

1104 if ( 

1105 filter_names 

1106 and scope is ObjectScope.ANY 

1107 and kind is ObjectKind.ANY 

1108 ): 

1109 # if names are given and no qualification on type of table 

1110 # (i.e. the Table(..., autoload) case), take the names as given, 

1111 # don't run names queries. If a table does not exit 

1112 # NoSuchTableError is raised and it's skipped 

1113 

1114 # this also suits the case for mssql where we can reflect 

1115 # individual temp tables but there's no temp_names_fn 

1116 names = filter_names 

1117 else: 

1118 names = [] 

1119 name_kw = {"schema": schema, **kw} 

1120 fns = [] 

1121 if ObjectScope.DEFAULT in scope: 

1122 fns.extend(names_fns) 

1123 if ObjectScope.TEMPORARY in scope: 

1124 fns.extend(temp_names_fns) 

1125 

1126 for fn in fns: 

1127 try: 

1128 names.extend(fn(connection, **name_kw)) 

1129 except NotImplementedError: 

1130 pass 

1131 

1132 if filter_names: 

1133 filter_names = set(filter_names) 

1134 

1135 # iterate over all the tables/views and call the single table method 

1136 for table in names: 

1137 if not filter_names or table in filter_names: 

1138 key = (schema, table) 

1139 try: 

1140 yield ( 

1141 key, 

1142 single_tbl_method( 

1143 connection, table, schema=schema, **kw 

1144 ), 

1145 ) 

1146 except exc.UnreflectableTableError as err: 

1147 if key not in unreflectable: 

1148 unreflectable[key] = err 

1149 except exc.NoSuchTableError: 

1150 pass 

1151 

1152 def get_multi_table_options(self, connection, **kw): 

1153 return self._default_multi_reflect( 

1154 self.get_table_options, connection, **kw 

1155 ) 

1156 

1157 def get_multi_columns(self, connection, **kw): 

1158 return self._default_multi_reflect(self.get_columns, connection, **kw) 

1159 

1160 def get_multi_pk_constraint(self, connection, **kw): 

1161 return self._default_multi_reflect( 

1162 self.get_pk_constraint, connection, **kw 

1163 ) 

1164 

1165 def get_multi_foreign_keys(self, connection, **kw): 

1166 return self._default_multi_reflect( 

1167 self.get_foreign_keys, connection, **kw 

1168 ) 

1169 

1170 def get_multi_indexes(self, connection, **kw): 

1171 return self._default_multi_reflect(self.get_indexes, connection, **kw) 

1172 

1173 def get_multi_unique_constraints(self, connection, **kw): 

1174 return self._default_multi_reflect( 

1175 self.get_unique_constraints, connection, **kw 

1176 ) 

1177 

1178 def get_multi_check_constraints(self, connection, **kw): 

1179 return self._default_multi_reflect( 

1180 self.get_check_constraints, connection, **kw 

1181 ) 

1182 

1183 def get_multi_table_comment(self, connection, **kw): 

1184 return self._default_multi_reflect( 

1185 self.get_table_comment, connection, **kw 

1186 ) 

1187 

1188 

1189class StrCompileDialect(DefaultDialect): 

1190 statement_compiler = compiler.StrSQLCompiler 

1191 ddl_compiler = compiler.DDLCompiler 

1192 type_compiler_cls = compiler.StrSQLTypeCompiler 

1193 preparer = compiler.IdentifierPreparer 

1194 

1195 insert_returning = True 

1196 update_returning = True 

1197 delete_returning = True 

1198 

1199 supports_statement_cache = True 

1200 

1201 supports_identity_columns = True 

1202 

1203 supports_sequences = True 

1204 sequences_optional = True 

1205 preexecute_autoincrement_sequences = False 

1206 

1207 supports_native_boolean = True 

1208 

1209 supports_multivalues_insert = True 

1210 supports_simple_order_by_label = True 

1211 

1212 

1213class DefaultExecutionContext(ExecutionContext): 

1214 isinsert = False 

1215 isupdate = False 

1216 isdelete = False 

1217 is_crud = False 

1218 is_text = False 

1219 isddl = False 

1220 

1221 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE 

1222 

1223 compiled: Optional[Compiled] = None 

1224 result_column_struct: Optional[ 

1225 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool] 

1226 ] = None 

1227 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None 

1228 

1229 execution_options: _ExecuteOptions = util.EMPTY_DICT 

1230 

1231 cursor_fetch_strategy = _cursor._DEFAULT_FETCH 

1232 

1233 invoked_statement: Optional[Executable] = None 

1234 

1235 _is_implicit_returning = False 

1236 _is_explicit_returning = False 

1237 _is_supplemental_returning = False 

1238 _is_server_side = False 

1239 

1240 _soft_closed = False 

1241 

1242 _rowcount: Optional[int] = None 

1243 

1244 # a hook for SQLite's translation of 

1245 # result column names 

1246 # NOTE: pyhive is using this hook, can't remove it :( 

1247 _translate_colname: Optional[Callable[[str], str]] = None 

1248 

1249 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict() 

1250 """used by set_input_sizes(). 

1251 

1252 This collection comes from ``ExpandedState.parameter_expansion``. 

1253 

1254 """ 

1255 

1256 cache_hit = NO_CACHE_KEY 

1257 

1258 root_connection: Connection 

1259 _dbapi_connection: PoolProxiedConnection 

1260 dialect: Dialect 

1261 unicode_statement: str 

1262 cursor: DBAPICursor 

1263 compiled_parameters: List[_MutableCoreSingleExecuteParams] 

1264 parameters: _DBAPIMultiExecuteParams 

1265 extracted_parameters: Optional[Sequence[BindParameter[Any]]] 

1266 

1267 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) 

1268 

1269 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None 

1270 _num_sentinel_cols: int = 0 

1271 

1272 @classmethod 

1273 def _init_ddl( 

1274 cls, 

1275 dialect: Dialect, 

1276 connection: Connection, 

1277 dbapi_connection: PoolProxiedConnection, 

1278 execution_options: _ExecuteOptions, 

1279 compiled_ddl: DDLCompiler, 

1280 ) -> ExecutionContext: 

1281 """Initialize execution context for an ExecutableDDLElement 

1282 construct.""" 

1283 

1284 self = cls.__new__(cls) 

1285 self.root_connection = connection 

1286 self._dbapi_connection = dbapi_connection 

1287 self.dialect = connection.dialect 

1288 

1289 self.compiled = compiled = compiled_ddl 

1290 self.isddl = True 

1291 

1292 self.execution_options = execution_options 

1293 

1294 self.unicode_statement = str(compiled) 

1295 if compiled.schema_translate_map: 

1296 schema_translate_map = self.execution_options.get( 

1297 "schema_translate_map", {} 

1298 ) 

1299 

1300 rst = compiled.preparer._render_schema_translates 

1301 self.unicode_statement = rst( 

1302 self.unicode_statement, schema_translate_map 

1303 ) 

1304 

1305 self.statement = self.unicode_statement 

1306 

1307 self.cursor = self.create_cursor() 

1308 self.compiled_parameters = [] 

1309 

1310 if dialect.positional: 

1311 self.parameters = [dialect.execute_sequence_format()] 

1312 else: 

1313 self.parameters = [self._empty_dict_params] 

1314 

1315 return self 

1316 

1317 @classmethod 

1318 def _init_compiled( 

1319 cls, 

1320 dialect: Dialect, 

1321 connection: Connection, 

1322 dbapi_connection: PoolProxiedConnection, 

1323 execution_options: _ExecuteOptions, 

1324 compiled: SQLCompiler, 

1325 parameters: _CoreMultiExecuteParams, 

1326 invoked_statement: Executable, 

1327 extracted_parameters: Optional[Sequence[BindParameter[Any]]], 

1328 cache_hit: CacheStats = CacheStats.CACHING_DISABLED, 

1329 ) -> ExecutionContext: 

1330 """Initialize execution context for a Compiled construct.""" 

1331 

1332 self = cls.__new__(cls) 

1333 self.root_connection = connection 

1334 self._dbapi_connection = dbapi_connection 

1335 self.dialect = connection.dialect 

1336 self.extracted_parameters = extracted_parameters 

1337 self.invoked_statement = invoked_statement 

1338 self.compiled = compiled 

1339 self.cache_hit = cache_hit 

1340 

1341 self.execution_options = execution_options 

1342 

1343 self.result_column_struct = ( 

1344 compiled._result_columns, 

1345 compiled._ordered_columns, 

1346 compiled._textual_ordered_columns, 

1347 compiled._ad_hoc_textual, 

1348 compiled._loose_column_name_matching, 

1349 ) 

1350 

1351 self.isinsert = ii = compiled.isinsert 

1352 self.isupdate = iu = compiled.isupdate 

1353 self.isdelete = id_ = compiled.isdelete 

1354 self.is_text = compiled.isplaintext 

1355 

1356 if ii or iu or id_: 

1357 dml_statement = compiled.compile_state.statement # type: ignore 

1358 if TYPE_CHECKING: 

1359 assert isinstance(dml_statement, UpdateBase) 

1360 self.is_crud = True 

1361 self._is_explicit_returning = ier = bool(dml_statement._returning) 

1362 self._is_implicit_returning = iir = bool( 

1363 compiled.implicit_returning 

1364 ) 

1365 if iir and dml_statement._supplemental_returning: 

1366 self._is_supplemental_returning = True 

1367 

1368 # dont mix implicit and explicit returning 

1369 assert not (iir and ier) 

1370 

1371 if (ier or iir) and compiled.for_executemany: 

1372 if ii and not self.dialect.insert_executemany_returning: 

1373 raise exc.InvalidRequestError( 

1374 f"Dialect {self.dialect.dialect_description} with " 

1375 f"current server capabilities does not support " 

1376 "INSERT..RETURNING when executemany is used" 

1377 ) 

1378 elif ( 

1379 ii 

1380 and dml_statement._sort_by_parameter_order 

1381 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501 

1382 ): 

1383 raise exc.InvalidRequestError( 

1384 f"Dialect {self.dialect.dialect_description} with " 

1385 f"current server capabilities does not support " 

1386 "INSERT..RETURNING with deterministic row ordering " 

1387 "when executemany is used" 

1388 ) 

1389 elif ( 

1390 ii 

1391 and self.dialect.use_insertmanyvalues 

1392 and not compiled._insertmanyvalues 

1393 ): 

1394 raise exc.InvalidRequestError( 

1395 'Statement does not have "insertmanyvalues" ' 

1396 "enabled, can't use INSERT..RETURNING with " 

1397 "executemany in this case." 

1398 ) 

1399 elif iu and not self.dialect.update_executemany_returning: 

1400 raise exc.InvalidRequestError( 

1401 f"Dialect {self.dialect.dialect_description} with " 

1402 f"current server capabilities does not support " 

1403 "UPDATE..RETURNING when executemany is used" 

1404 ) 

1405 elif id_ and not self.dialect.delete_executemany_returning: 

1406 raise exc.InvalidRequestError( 

1407 f"Dialect {self.dialect.dialect_description} with " 

1408 f"current server capabilities does not support " 

1409 "DELETE..RETURNING when executemany is used" 

1410 ) 

1411 

1412 if not parameters: 

1413 self.compiled_parameters = [ 

1414 compiled.construct_params( 

1415 extracted_parameters=extracted_parameters, 

1416 escape_names=False, 

1417 ) 

1418 ] 

1419 else: 

1420 self.compiled_parameters = [ 

1421 compiled.construct_params( 

1422 m, 

1423 escape_names=False, 

1424 _group_number=grp, 

1425 extracted_parameters=extracted_parameters, 

1426 ) 

1427 for grp, m in enumerate(parameters) 

1428 ] 

1429 

1430 if len(parameters) > 1: 

1431 if self.isinsert and compiled._insertmanyvalues: 

1432 self.execute_style = ExecuteStyle.INSERTMANYVALUES 

1433 

1434 imv = compiled._insertmanyvalues 

1435 if imv.sentinel_columns is not None: 

1436 self._num_sentinel_cols = imv.num_sentinel_columns 

1437 else: 

1438 self.execute_style = ExecuteStyle.EXECUTEMANY 

1439 

1440 self.unicode_statement = compiled.string 

1441 

1442 self.cursor = self.create_cursor() 

1443 

1444 if self.compiled.insert_prefetch or self.compiled.update_prefetch: 

1445 self._process_execute_defaults() 

1446 

1447 processors = compiled._bind_processors 

1448 

1449 flattened_processors: Mapping[ 

1450 str, _BindProcessorType[Any] 

1451 ] = processors # type: ignore[assignment] 

1452 

1453 if compiled.literal_execute_params or compiled.post_compile_params: 

1454 if self.executemany: 

1455 raise exc.InvalidRequestError( 

1456 "'literal_execute' or 'expanding' parameters can't be " 

1457 "used with executemany()" 

1458 ) 

1459 

1460 expanded_state = compiled._process_parameters_for_postcompile( 

1461 self.compiled_parameters[0] 

1462 ) 

1463 

1464 # re-assign self.unicode_statement 

1465 self.unicode_statement = expanded_state.statement 

1466 

1467 self._expanded_parameters = expanded_state.parameter_expansion 

1468 

1469 flattened_processors = dict(processors) # type: ignore 

1470 flattened_processors.update(expanded_state.processors) 

1471 positiontup = expanded_state.positiontup 

1472 elif compiled.positional: 

1473 positiontup = self.compiled.positiontup 

1474 else: 

1475 positiontup = None 

1476 

1477 if compiled.schema_translate_map: 

1478 schema_translate_map = self.execution_options.get( 

1479 "schema_translate_map", {} 

1480 ) 

1481 rst = compiled.preparer._render_schema_translates 

1482 self.unicode_statement = rst( 

1483 self.unicode_statement, schema_translate_map 

1484 ) 

1485 

1486 # final self.unicode_statement is now assigned, encode if needed 

1487 # by dialect 

1488 self.statement = self.unicode_statement 

1489 

1490 # Convert the dictionary of bind parameter values 

1491 # into a dict or list to be sent to the DBAPI's 

1492 # execute() or executemany() method. 

1493 

1494 if compiled.positional: 

1495 core_positional_parameters: MutableSequence[Sequence[Any]] = [] 

1496 assert positiontup is not None 

1497 for compiled_params in self.compiled_parameters: 

1498 l_param: List[Any] = [ 

1499 ( 

1500 flattened_processors[key](compiled_params[key]) 

1501 if key in flattened_processors 

1502 else compiled_params[key] 

1503 ) 

1504 for key in positiontup 

1505 ] 

1506 core_positional_parameters.append( 

1507 dialect.execute_sequence_format(l_param) 

1508 ) 

1509 

1510 self.parameters = core_positional_parameters 

1511 else: 

1512 core_dict_parameters: MutableSequence[Dict[str, Any]] = [] 

1513 escaped_names = compiled.escaped_bind_names 

1514 

1515 # note that currently, "expanded" parameters will be present 

1516 # in self.compiled_parameters in their quoted form. This is 

1517 # slightly inconsistent with the approach taken as of 

1518 # #8056 where self.compiled_parameters is meant to contain unquoted 

1519 # param names. 

1520 d_param: Dict[str, Any] 

1521 for compiled_params in self.compiled_parameters: 

1522 if escaped_names: 

1523 d_param = { 

1524 escaped_names.get(key, key): ( 

1525 flattened_processors[key](compiled_params[key]) 

1526 if key in flattened_processors 

1527 else compiled_params[key] 

1528 ) 

1529 for key in compiled_params 

1530 } 

1531 else: 

1532 d_param = { 

1533 key: ( 

1534 flattened_processors[key](compiled_params[key]) 

1535 if key in flattened_processors 

1536 else compiled_params[key] 

1537 ) 

1538 for key in compiled_params 

1539 } 

1540 

1541 core_dict_parameters.append(d_param) 

1542 

1543 self.parameters = core_dict_parameters 

1544 

1545 return self 

1546 

1547 @classmethod 

1548 def _init_statement( 

1549 cls, 

1550 dialect: Dialect, 

1551 connection: Connection, 

1552 dbapi_connection: PoolProxiedConnection, 

1553 execution_options: _ExecuteOptions, 

1554 statement: str, 

1555 parameters: _DBAPIMultiExecuteParams, 

1556 ) -> ExecutionContext: 

1557 """Initialize execution context for a string SQL statement.""" 

1558 

1559 self = cls.__new__(cls) 

1560 self.root_connection = connection 

1561 self._dbapi_connection = dbapi_connection 

1562 self.dialect = connection.dialect 

1563 self.is_text = True 

1564 

1565 self.execution_options = execution_options 

1566 

1567 if not parameters: 

1568 if self.dialect.positional: 

1569 self.parameters = [dialect.execute_sequence_format()] 

1570 else: 

1571 self.parameters = [self._empty_dict_params] 

1572 elif isinstance(parameters[0], dialect.execute_sequence_format): 

1573 self.parameters = parameters 

1574 elif isinstance(parameters[0], dict): 

1575 self.parameters = parameters 

1576 else: 

1577 self.parameters = [ 

1578 dialect.execute_sequence_format(p) for p in parameters 

1579 ] 

1580 

1581 if len(parameters) > 1: 

1582 self.execute_style = ExecuteStyle.EXECUTEMANY 

1583 

1584 self.statement = self.unicode_statement = statement 

1585 

1586 self.cursor = self.create_cursor() 

1587 return self 

1588 

1589 @classmethod 

1590 def _init_default( 

1591 cls, 

1592 dialect: Dialect, 

1593 connection: Connection, 

1594 dbapi_connection: PoolProxiedConnection, 

1595 execution_options: _ExecuteOptions, 

1596 ) -> ExecutionContext: 

1597 """Initialize execution context for a ColumnDefault construct.""" 

1598 

1599 self = cls.__new__(cls) 

1600 self.root_connection = connection 

1601 self._dbapi_connection = dbapi_connection 

1602 self.dialect = connection.dialect 

1603 

1604 self.execution_options = execution_options 

1605 

1606 self.cursor = self.create_cursor() 

1607 return self 

1608 

1609 def _get_cache_stats(self) -> str: 

1610 if self.compiled is None: 

1611 return "raw sql" 

1612 

1613 now = perf_counter() 

1614 

1615 ch = self.cache_hit 

1616 

1617 gen_time = self.compiled._gen_time 

1618 assert gen_time is not None 

1619 

1620 if ch is NO_CACHE_KEY: 

1621 return "no key %.5fs" % (now - gen_time,) 

1622 elif ch is CACHE_HIT: 

1623 return "cached since %.4gs ago" % (now - gen_time,) 

1624 elif ch is CACHE_MISS: 

1625 return "generated in %.5fs" % (now - gen_time,) 

1626 elif ch is CACHING_DISABLED: 

1627 if "_cache_disable_reason" in self.execution_options: 

1628 return "caching disabled (%s) %.5fs " % ( 

1629 self.execution_options["_cache_disable_reason"], 

1630 now - gen_time, 

1631 ) 

1632 else: 

1633 return "caching disabled %.5fs" % (now - gen_time,) 

1634 elif ch is NO_DIALECT_SUPPORT: 

1635 return "dialect %s+%s does not support caching %.5fs" % ( 

1636 self.dialect.name, 

1637 self.dialect.driver, 

1638 now - gen_time, 

1639 ) 

1640 else: 

1641 return "unknown" 

1642 

1643 @property 

1644 def executemany(self): # type: ignore[override] 

1645 return self.execute_style in ( 

1646 ExecuteStyle.EXECUTEMANY, 

1647 ExecuteStyle.INSERTMANYVALUES, 

1648 ) 

1649 

1650 @util.memoized_property 

1651 def identifier_preparer(self): 

1652 if self.compiled: 

1653 return self.compiled.preparer 

1654 elif "schema_translate_map" in self.execution_options: 

1655 return self.dialect.identifier_preparer._with_schema_translate( 

1656 self.execution_options["schema_translate_map"] 

1657 ) 

1658 else: 

1659 return self.dialect.identifier_preparer 

1660 

1661 @util.memoized_property 

1662 def engine(self): 

1663 return self.root_connection.engine 

1664 

1665 @util.memoized_property 

1666 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1667 if TYPE_CHECKING: 

1668 assert isinstance(self.compiled, SQLCompiler) 

1669 return self.compiled.postfetch 

1670 

1671 @util.memoized_property 

1672 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1673 if TYPE_CHECKING: 

1674 assert isinstance(self.compiled, SQLCompiler) 

1675 if self.isinsert: 

1676 return self.compiled.insert_prefetch 

1677 elif self.isupdate: 

1678 return self.compiled.update_prefetch 

1679 else: 

1680 return () 

1681 

1682 @util.memoized_property 

1683 def no_parameters(self): 

1684 return self.execution_options.get("no_parameters", False) 

1685 

1686 def _execute_scalar( 

1687 self, 

1688 stmt: str, 

1689 type_: Optional[TypeEngine[Any]], 

1690 parameters: Optional[_DBAPISingleExecuteParams] = None, 

1691 ) -> Any: 

1692 """Execute a string statement on the current cursor, returning a 

1693 scalar result. 

1694 

1695 Used to fire off sequences, default phrases, and "select lastrowid" 

1696 types of statements individually or in the context of a parent INSERT 

1697 or UPDATE statement. 

1698 

1699 """ 

1700 

1701 conn = self.root_connection 

1702 

1703 if "schema_translate_map" in self.execution_options: 

1704 schema_translate_map = self.execution_options.get( 

1705 "schema_translate_map", {} 

1706 ) 

1707 

1708 rst = self.identifier_preparer._render_schema_translates 

1709 stmt = rst(stmt, schema_translate_map) 

1710 

1711 if not parameters: 

1712 if self.dialect.positional: 

1713 parameters = self.dialect.execute_sequence_format() 

1714 else: 

1715 parameters = {} 

1716 

1717 conn._cursor_execute(self.cursor, stmt, parameters, context=self) 

1718 row = self.cursor.fetchone() 

1719 if row is not None: 

1720 r = row[0] 

1721 else: 

1722 r = None 

1723 if type_ is not None: 

1724 # apply type post processors to the result 

1725 proc = type_._cached_result_processor( 

1726 self.dialect, self.cursor.description[0][1] 

1727 ) 

1728 if proc: 

1729 return proc(r) 

1730 return r 

1731 

1732 @util.memoized_property 

1733 def connection(self): 

1734 return self.root_connection 

1735 

1736 def _use_server_side_cursor(self): 

1737 if not self.dialect.supports_server_side_cursors: 

1738 return False 

1739 

1740 if self.dialect.server_side_cursors: 

1741 # this is deprecated 

1742 use_server_side = self.execution_options.get( 

1743 "stream_results", True 

1744 ) and ( 

1745 self.compiled 

1746 and isinstance(self.compiled.statement, expression.Selectable) 

1747 or ( 

1748 ( 

1749 not self.compiled 

1750 or isinstance( 

1751 self.compiled.statement, expression.TextClause 

1752 ) 

1753 ) 

1754 and self.unicode_statement 

1755 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement) 

1756 ) 

1757 ) 

1758 else: 

1759 use_server_side = self.execution_options.get( 

1760 "stream_results", False 

1761 ) 

1762 

1763 return use_server_side 

1764 

1765 def create_cursor(self) -> DBAPICursor: 

1766 if ( 

1767 # inlining initial preference checks for SS cursors 

1768 self.dialect.supports_server_side_cursors 

1769 and ( 

1770 self.execution_options.get("stream_results", False) 

1771 or ( 

1772 self.dialect.server_side_cursors 

1773 and self._use_server_side_cursor() 

1774 ) 

1775 ) 

1776 ): 

1777 self._is_server_side = True 

1778 return self.create_server_side_cursor() 

1779 else: 

1780 self._is_server_side = False 

1781 return self.create_default_cursor() 

1782 

1783 def fetchall_for_returning(self, cursor): 

1784 return cursor.fetchall() 

1785 

1786 def create_default_cursor(self) -> DBAPICursor: 

1787 return self._dbapi_connection.cursor() 

1788 

1789 def create_server_side_cursor(self) -> DBAPICursor: 

1790 raise NotImplementedError() 

1791 

1792 def pre_exec(self): 

1793 pass 

1794 

1795 def get_out_parameter_values(self, names): 

1796 raise NotImplementedError( 

1797 "This dialect does not support OUT parameters" 

1798 ) 

1799 

1800 def post_exec(self): 

1801 pass 

1802 

1803 def get_result_processor(self, type_, colname, coltype): 

1804 """Return a 'result processor' for a given type as present in 

1805 cursor.description. 

1806 

1807 This has a default implementation that dialects can override 

1808 for context-sensitive result type handling. 

1809 

1810 """ 

1811 return type_._cached_result_processor(self.dialect, coltype) 

1812 

1813 def get_lastrowid(self): 

1814 """return self.cursor.lastrowid, or equivalent, after an INSERT. 

1815 

1816 This may involve calling special cursor functions, issuing a new SELECT 

1817 on the cursor (or a new one), or returning a stored value that was 

1818 calculated within post_exec(). 

1819 

1820 This function will only be called for dialects which support "implicit" 

1821 primary key generation, keep preexecute_autoincrement_sequences set to 

1822 False, and when no explicit id value was bound to the statement. 

1823 

1824 The function is called once for an INSERT statement that would need to 

1825 return the last inserted primary key for those dialects that make use 

1826 of the lastrowid concept. In these cases, it is called directly after 

1827 :meth:`.ExecutionContext.post_exec`. 

1828 

1829 """ 

1830 return self.cursor.lastrowid 

1831 

1832 def handle_dbapi_exception(self, e): 

1833 pass 

1834 

1835 @util.non_memoized_property 

1836 def rowcount(self) -> int: 

1837 if self._rowcount is not None: 

1838 return self._rowcount 

1839 else: 

1840 return self.cursor.rowcount 

1841 

1842 @property 

1843 def _has_rowcount(self): 

1844 return self._rowcount is not None 

1845 

1846 def supports_sane_rowcount(self): 

1847 return self.dialect.supports_sane_rowcount 

1848 

1849 def supports_sane_multi_rowcount(self): 

1850 return self.dialect.supports_sane_multi_rowcount 

1851 

1852 def _setup_result_proxy(self): 

1853 exec_opt = self.execution_options 

1854 

1855 if self._rowcount is None and exec_opt.get("preserve_rowcount", False): 

1856 self._rowcount = self.cursor.rowcount 

1857 

1858 yp: Optional[Union[int, bool]] 

1859 if self.is_crud or self.is_text: 

1860 result = self._setup_dml_or_text_result() 

1861 yp = False 

1862 else: 

1863 yp = exec_opt.get("yield_per", None) 

1864 sr = self._is_server_side or exec_opt.get("stream_results", False) 

1865 strategy = self.cursor_fetch_strategy 

1866 if sr and strategy is _cursor._DEFAULT_FETCH: 

1867 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1868 self.cursor, self.execution_options 

1869 ) 

1870 cursor_description: _DBAPICursorDescription = ( 

1871 strategy.alternate_cursor_description 

1872 or self.cursor.description 

1873 ) 

1874 if cursor_description is None: 

1875 strategy = _cursor._NO_CURSOR_DQL 

1876 

1877 result = _cursor.CursorResult(self, strategy, cursor_description) 

1878 

1879 compiled = self.compiled 

1880 

1881 if ( 

1882 compiled 

1883 and not self.isddl 

1884 and cast(SQLCompiler, compiled).has_out_parameters 

1885 ): 

1886 self._setup_out_parameters(result) 

1887 

1888 self._soft_closed = result._soft_closed 

1889 

1890 if yp: 

1891 result = result.yield_per(yp) 

1892 

1893 return result 

1894 

1895 def _setup_out_parameters(self, result): 

1896 compiled = cast(SQLCompiler, self.compiled) 

1897 

1898 out_bindparams = [ 

1899 (param, name) 

1900 for param, name in compiled.bind_names.items() 

1901 if param.isoutparam 

1902 ] 

1903 out_parameters = {} 

1904 

1905 for bindparam, raw_value in zip( 

1906 [param for param, name in out_bindparams], 

1907 self.get_out_parameter_values( 

1908 [name for param, name in out_bindparams] 

1909 ), 

1910 ): 

1911 type_ = bindparam.type 

1912 impl_type = type_.dialect_impl(self.dialect) 

1913 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi) 

1914 result_processor = impl_type.result_processor( 

1915 self.dialect, dbapi_type 

1916 ) 

1917 if result_processor is not None: 

1918 raw_value = result_processor(raw_value) 

1919 out_parameters[bindparam.key] = raw_value 

1920 

1921 result.out_parameters = out_parameters 

1922 

1923 def _setup_dml_or_text_result(self): 

1924 compiled = cast(SQLCompiler, self.compiled) 

1925 

1926 strategy: ResultFetchStrategy = self.cursor_fetch_strategy 

1927 

1928 if self.isinsert: 

1929 if ( 

1930 self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1931 and compiled.effective_returning 

1932 ): 

1933 strategy = _cursor.FullyBufferedCursorFetchStrategy( 

1934 self.cursor, 

1935 initial_buffer=self._insertmanyvalues_rows, 

1936 # maintain alt cursor description if set by the 

1937 # dialect, e.g. mssql preserves it 

1938 alternate_description=( 

1939 strategy.alternate_cursor_description 

1940 ), 

1941 ) 

1942 

1943 if compiled.postfetch_lastrowid: 

1944 self.inserted_primary_key_rows = ( 

1945 self._setup_ins_pk_from_lastrowid() 

1946 ) 

1947 # else if not self._is_implicit_returning, 

1948 # the default inserted_primary_key_rows accessor will 

1949 # return an "empty" primary key collection when accessed. 

1950 

1951 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: 

1952 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1953 self.cursor, self.execution_options 

1954 ) 

1955 

1956 if strategy is _cursor._NO_CURSOR_DML: 

1957 cursor_description = None 

1958 else: 

1959 cursor_description = ( 

1960 strategy.alternate_cursor_description 

1961 or self.cursor.description 

1962 ) 

1963 

1964 if cursor_description is None: 

1965 strategy = _cursor._NO_CURSOR_DML 

1966 elif self._num_sentinel_cols: 

1967 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1968 # the sentinel columns are handled in CursorResult._init_metadata 

1969 # using essentially _reduce 

1970 

1971 result: _cursor.CursorResult[Any] = _cursor.CursorResult( 

1972 self, strategy, cursor_description 

1973 ) 

1974 

1975 if self.isinsert: 

1976 if self._is_implicit_returning: 

1977 rows = result.all() 

1978 

1979 self.returned_default_rows = rows 

1980 

1981 self.inserted_primary_key_rows = ( 

1982 self._setup_ins_pk_from_implicit_returning(result, rows) 

1983 ) 

1984 

1985 # test that it has a cursor metadata that is accurate. the 

1986 # first row will have been fetched and current assumptions 

1987 # are that the result has only one row, until executemany() 

1988 # support is added here. 

1989 assert result._metadata.returns_rows 

1990 

1991 # Insert statement has both return_defaults() and 

1992 # returning(). rewind the result on the list of rows 

1993 # we just used. 

1994 if self._is_supplemental_returning: 

1995 result._rewind(rows) 

1996 else: 

1997 result._soft_close() 

1998 elif not self._is_explicit_returning: 

1999 result._soft_close() 

2000 

2001 # we assume here the result does not return any rows. 

2002 # *usually*, this will be true. However, some dialects 

2003 # such as that of MSSQL/pyodbc need to SELECT a post fetch 

2004 # function so this is not necessarily true. 

2005 # assert not result.returns_rows 

2006 

2007 elif self._is_implicit_returning: 

2008 rows = result.all() 

2009 

2010 if rows: 

2011 self.returned_default_rows = rows 

2012 self._rowcount = len(rows) 

2013 

2014 if self._is_supplemental_returning: 

2015 result._rewind(rows) 

2016 else: 

2017 result._soft_close() 

2018 

2019 # test that it has a cursor metadata that is accurate. 

2020 # the rows have all been fetched however. 

2021 assert result._metadata.returns_rows 

2022 

2023 elif not result._metadata.returns_rows: 

2024 # no results, get rowcount 

2025 # (which requires open cursor on some drivers) 

2026 if self._rowcount is None: 

2027 self._rowcount = self.cursor.rowcount 

2028 result._soft_close() 

2029 elif self.isupdate or self.isdelete: 

2030 if self._rowcount is None: 

2031 self._rowcount = self.cursor.rowcount 

2032 return result 

2033 

2034 @util.memoized_property 

2035 def inserted_primary_key_rows(self): 

2036 # if no specific "get primary key" strategy was set up 

2037 # during execution, return a "default" primary key based 

2038 # on what's in the compiled_parameters and nothing else. 

2039 return self._setup_ins_pk_from_empty() 

2040 

2041 def _setup_ins_pk_from_lastrowid(self): 

2042 getter = cast( 

2043 SQLCompiler, self.compiled 

2044 )._inserted_primary_key_from_lastrowid_getter 

2045 lastrowid = self.get_lastrowid() 

2046 return [getter(lastrowid, self.compiled_parameters[0])] 

2047 

2048 def _setup_ins_pk_from_empty(self): 

2049 getter = cast( 

2050 SQLCompiler, self.compiled 

2051 )._inserted_primary_key_from_lastrowid_getter 

2052 return [getter(None, param) for param in self.compiled_parameters] 

2053 

2054 def _setup_ins_pk_from_implicit_returning(self, result, rows): 

2055 if not rows: 

2056 return [] 

2057 

2058 getter = cast( 

2059 SQLCompiler, self.compiled 

2060 )._inserted_primary_key_from_returning_getter 

2061 compiled_params = self.compiled_parameters 

2062 

2063 return [ 

2064 getter(row, param) for row, param in zip(rows, compiled_params) 

2065 ] 

2066 

2067 def lastrow_has_defaults(self): 

2068 return (self.isinsert or self.isupdate) and bool( 

2069 cast(SQLCompiler, self.compiled).postfetch 

2070 ) 

2071 

2072 def _prepare_set_input_sizes( 

2073 self, 

2074 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]: 

2075 """Given a cursor and ClauseParameters, prepare arguments 

2076 in order to call the appropriate 

2077 style of ``setinputsizes()`` on the cursor, using DB-API types 

2078 from the bind parameter's ``TypeEngine`` objects. 

2079 

2080 This method only called by those dialects which set the 

2081 :attr:`.Dialect.bind_typing` attribute to 

2082 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are 

2083 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an 

2084 option. 

2085 

2086 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used 

2087 for pg8000 and asyncpg, which has been changed to inline rendering 

2088 of casts. 

2089 

2090 """ 

2091 if self.isddl or self.is_text: 

2092 return None 

2093 

2094 compiled = cast(SQLCompiler, self.compiled) 

2095 

2096 inputsizes = compiled._get_set_input_sizes_lookup() 

2097 

2098 if inputsizes is None: 

2099 return None 

2100 

2101 dialect = self.dialect 

2102 

2103 # all of the rest of this... cython? 

2104 

2105 if dialect._has_events: 

2106 inputsizes = dict(inputsizes) 

2107 dialect.dispatch.do_setinputsizes( 

2108 inputsizes, self.cursor, self.statement, self.parameters, self 

2109 ) 

2110 

2111 if compiled.escaped_bind_names: 

2112 escaped_bind_names = compiled.escaped_bind_names 

2113 else: 

2114 escaped_bind_names = None 

2115 

2116 if dialect.positional: 

2117 items = [ 

2118 (key, compiled.binds[key]) 

2119 for key in compiled.positiontup or () 

2120 ] 

2121 else: 

2122 items = [ 

2123 (key, bindparam) 

2124 for bindparam, key in compiled.bind_names.items() 

2125 ] 

2126 

2127 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = [] 

2128 for key, bindparam in items: 

2129 if bindparam in compiled.literal_execute_params: 

2130 continue 

2131 

2132 if key in self._expanded_parameters: 

2133 if is_tuple_type(bindparam.type): 

2134 num = len(bindparam.type.types) 

2135 dbtypes = inputsizes[bindparam] 

2136 generic_inputsizes.extend( 

2137 ( 

2138 ( 

2139 escaped_bind_names.get(paramname, paramname) 

2140 if escaped_bind_names is not None 

2141 else paramname 

2142 ), 

2143 dbtypes[idx % num], 

2144 bindparam.type.types[idx % num], 

2145 ) 

2146 for idx, paramname in enumerate( 

2147 self._expanded_parameters[key] 

2148 ) 

2149 ) 

2150 else: 

2151 dbtype = inputsizes.get(bindparam, None) 

2152 generic_inputsizes.extend( 

2153 ( 

2154 ( 

2155 escaped_bind_names.get(paramname, paramname) 

2156 if escaped_bind_names is not None 

2157 else paramname 

2158 ), 

2159 dbtype, 

2160 bindparam.type, 

2161 ) 

2162 for paramname in self._expanded_parameters[key] 

2163 ) 

2164 else: 

2165 dbtype = inputsizes.get(bindparam, None) 

2166 

2167 escaped_name = ( 

2168 escaped_bind_names.get(key, key) 

2169 if escaped_bind_names is not None 

2170 else key 

2171 ) 

2172 

2173 generic_inputsizes.append( 

2174 (escaped_name, dbtype, bindparam.type) 

2175 ) 

2176 

2177 return generic_inputsizes 

2178 

2179 def _exec_default(self, column, default, type_): 

2180 if default.is_sequence: 

2181 return self.fire_sequence(default, type_) 

2182 elif default.is_callable: 

2183 # this codepath is not normally used as it's inlined 

2184 # into _process_execute_defaults 

2185 self.current_column = column 

2186 return default.arg(self) 

2187 elif default.is_clause_element: 

2188 return self._exec_default_clause_element(column, default, type_) 

2189 else: 

2190 # this codepath is not normally used as it's inlined 

2191 # into _process_execute_defaults 

2192 return default.arg 

2193 

2194 def _exec_default_clause_element(self, column, default, type_): 

2195 # execute a default that's a complete clause element. Here, we have 

2196 # to re-implement a miniature version of the compile->parameters-> 

2197 # cursor.execute() sequence, since we don't want to modify the state 

2198 # of the connection / result in progress or create new connection/ 

2199 # result objects etc. 

2200 # .. versionchanged:: 1.4 

2201 

2202 if not default._arg_is_typed: 

2203 default_arg = expression.type_coerce(default.arg, type_) 

2204 else: 

2205 default_arg = default.arg 

2206 compiled = expression.select(default_arg).compile(dialect=self.dialect) 

2207 compiled_params = compiled.construct_params() 

2208 processors = compiled._bind_processors 

2209 if compiled.positional: 

2210 parameters = self.dialect.execute_sequence_format( 

2211 [ 

2212 ( 

2213 processors[key](compiled_params[key]) # type: ignore 

2214 if key in processors 

2215 else compiled_params[key] 

2216 ) 

2217 for key in compiled.positiontup or () 

2218 ] 

2219 ) 

2220 else: 

2221 parameters = { 

2222 key: ( 

2223 processors[key](compiled_params[key]) # type: ignore 

2224 if key in processors 

2225 else compiled_params[key] 

2226 ) 

2227 for key in compiled_params 

2228 } 

2229 return self._execute_scalar( 

2230 str(compiled), type_, parameters=parameters 

2231 ) 

2232 

2233 current_parameters: Optional[_CoreSingleExecuteParams] = None 

2234 """A dictionary of parameters applied to the current row. 

2235 

2236 This attribute is only available in the context of a user-defined default 

2237 generation function, e.g. as described at :ref:`context_default_functions`. 

2238 It consists of a dictionary which includes entries for each column/value 

2239 pair that is to be part of the INSERT or UPDATE statement. The keys of the 

2240 dictionary will be the key value of each :class:`_schema.Column`, 

2241 which is usually 

2242 synonymous with the name. 

2243 

2244 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute 

2245 does not accommodate for the "multi-values" feature of the 

2246 :meth:`_expression.Insert.values` method. The 

2247 :meth:`.DefaultExecutionContext.get_current_parameters` method should be 

2248 preferred. 

2249 

2250 .. seealso:: 

2251 

2252 :meth:`.DefaultExecutionContext.get_current_parameters` 

2253 

2254 :ref:`context_default_functions` 

2255 

2256 """ 

2257 

2258 def get_current_parameters(self, isolate_multiinsert_groups=True): 

2259 """Return a dictionary of parameters applied to the current row. 

2260 

2261 This method can only be used in the context of a user-defined default 

2262 generation function, e.g. as described at 

2263 :ref:`context_default_functions`. When invoked, a dictionary is 

2264 returned which includes entries for each column/value pair that is part 

2265 of the INSERT or UPDATE statement. The keys of the dictionary will be 

2266 the key value of each :class:`_schema.Column`, 

2267 which is usually synonymous 

2268 with the name. 

2269 

2270 :param isolate_multiinsert_groups=True: indicates that multi-valued 

2271 INSERT constructs created using :meth:`_expression.Insert.values` 

2272 should be 

2273 handled by returning only the subset of parameters that are local 

2274 to the current column default invocation. When ``False``, the 

2275 raw parameters of the statement are returned including the 

2276 naming convention used in the case of multi-valued INSERT. 

2277 

2278 .. seealso:: 

2279 

2280 :attr:`.DefaultExecutionContext.current_parameters` 

2281 

2282 :ref:`context_default_functions` 

2283 

2284 """ 

2285 try: 

2286 parameters = self.current_parameters 

2287 column = self.current_column 

2288 except AttributeError: 

2289 raise exc.InvalidRequestError( 

2290 "get_current_parameters() can only be invoked in the " 

2291 "context of a Python side column default function" 

2292 ) 

2293 else: 

2294 assert column is not None 

2295 assert parameters is not None 

2296 compile_state = cast( 

2297 "DMLState", cast(SQLCompiler, self.compiled).compile_state 

2298 ) 

2299 assert compile_state is not None 

2300 if ( 

2301 isolate_multiinsert_groups 

2302 and dml.isinsert(compile_state) 

2303 and compile_state._has_multi_parameters 

2304 ): 

2305 if column._is_multiparam_column: 

2306 index = column.index + 1 

2307 d = {column.original.key: parameters[column.key]} 

2308 else: 

2309 d = {column.key: parameters[column.key]} 

2310 index = 0 

2311 assert compile_state._dict_parameters is not None 

2312 keys = compile_state._dict_parameters.keys() 

2313 d.update( 

2314 (key, parameters["%s_m%d" % (key, index)]) for key in keys 

2315 ) 

2316 return d 

2317 else: 

2318 return parameters 

2319 

2320 def get_insert_default(self, column): 

2321 if column.default is None: 

2322 return None 

2323 else: 

2324 return self._exec_default(column, column.default, column.type) 

2325 

2326 def get_update_default(self, column): 

2327 if column.onupdate is None: 

2328 return None 

2329 else: 

2330 return self._exec_default(column, column.onupdate, column.type) 

2331 

2332 def _process_execute_defaults(self): 

2333 compiled = cast(SQLCompiler, self.compiled) 

2334 

2335 key_getter = compiled._within_exec_param_key_getter 

2336 

2337 sentinel_counter = 0 

2338 

2339 if compiled.insert_prefetch: 

2340 prefetch_recs = [ 

2341 ( 

2342 c, 

2343 key_getter(c), 

2344 c._default_description_tuple, 

2345 self.get_insert_default, 

2346 ) 

2347 for c in compiled.insert_prefetch 

2348 ] 

2349 elif compiled.update_prefetch: 

2350 prefetch_recs = [ 

2351 ( 

2352 c, 

2353 key_getter(c), 

2354 c._onupdate_description_tuple, 

2355 self.get_update_default, 

2356 ) 

2357 for c in compiled.update_prefetch 

2358 ] 

2359 else: 

2360 prefetch_recs = [] 

2361 

2362 for param in self.compiled_parameters: 

2363 self.current_parameters = param 

2364 

2365 for ( 

2366 c, 

2367 param_key, 

2368 (arg, is_scalar, is_callable, is_sentinel), 

2369 fallback, 

2370 ) in prefetch_recs: 

2371 if is_sentinel: 

2372 param[param_key] = sentinel_counter 

2373 sentinel_counter += 1 

2374 elif is_scalar: 

2375 param[param_key] = arg 

2376 elif is_callable: 

2377 self.current_column = c 

2378 param[param_key] = arg(self) 

2379 else: 

2380 val = fallback(c) 

2381 if val is not None: 

2382 param[param_key] = val 

2383 

2384 del self.current_parameters 

2385 

2386 

2387DefaultDialect.execution_ctx_cls = DefaultExecutionContext