Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1051 statements  

1# engine/default.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Default implementations of per-dialect sqlalchemy.engine classes. 

10 

11These are semi-private implementation classes which are only of importance 

12to database dialect authors; dialects will usually use the classes here 

13as the base class for their own corresponding classes. 

14 

15""" 

16 

17from __future__ import annotations 

18 

19import functools 

20import operator 

21import random 

22import re 

23from time import perf_counter 

24import typing 

25from typing import Any 

26from typing import Callable 

27from typing import cast 

28from typing import Dict 

29from typing import Final 

30from typing import List 

31from typing import Mapping 

32from typing import MutableMapping 

33from typing import MutableSequence 

34from typing import Optional 

35from typing import Sequence 

36from typing import Set 

37from typing import Tuple 

38from typing import Type 

39from typing import TYPE_CHECKING 

40from typing import Union 

41import weakref 

42 

43from . import characteristics 

44from . import cursor as _cursor 

45from . import interfaces 

46from .base import Connection 

47from .interfaces import CacheStats 

48from .interfaces import DBAPICursor 

49from .interfaces import Dialect 

50from .interfaces import ExecuteStyle 

51from .interfaces import ExecutionContext 

52from .reflection import ObjectKind 

53from .reflection import ObjectScope 

54from .. import event 

55from .. import exc 

56from .. import pool 

57from .. import util 

58from ..sql import compiler 

59from ..sql import dml 

60from ..sql import expression 

61from ..sql import type_api 

62from ..sql import util as sql_util 

63from ..sql._typing import is_tuple_type 

64from ..sql.base import _NoArg 

65from ..sql.compiler import DDLCompiler 

66from ..sql.compiler import InsertmanyvaluesSentinelOpts 

67from ..sql.compiler import SQLCompiler 

68from ..sql.elements import quoted_name 

69from ..util.typing import Literal 

70from ..util.typing import TupleAny 

71from ..util.typing import Unpack 

72 

73 

74if typing.TYPE_CHECKING: 

75 from types import ModuleType 

76 

77 from .base import Engine 

78 from .cursor import ResultFetchStrategy 

79 from .interfaces import _CoreMultiExecuteParams 

80 from .interfaces import _CoreSingleExecuteParams 

81 from .interfaces import _DBAPICursorDescription 

82 from .interfaces import _DBAPIMultiExecuteParams 

83 from .interfaces import _DBAPISingleExecuteParams 

84 from .interfaces import _ExecuteOptions 

85 from .interfaces import _MutableCoreSingleExecuteParams 

86 from .interfaces import _ParamStyle 

87 from .interfaces import ConnectArgsType 

88 from .interfaces import DBAPIConnection 

89 from .interfaces import DBAPIModule 

90 from .interfaces import IsolationLevel 

91 from .row import Row 

92 from .url import URL 

93 from ..event import _ListenerFnType 

94 from ..pool import Pool 

95 from ..pool import PoolProxiedConnection 

96 from ..sql import Executable 

97 from ..sql.compiler import Compiled 

98 from ..sql.compiler import Linting 

99 from ..sql.compiler import ResultColumnsEntry 

100 from ..sql.dml import DMLState 

101 from ..sql.dml import UpdateBase 

102 from ..sql.elements import BindParameter 

103 from ..sql.schema import Column 

104 from ..sql.type_api import _BindProcessorType 

105 from ..sql.type_api import _ResultProcessorType 

106 from ..sql.type_api import TypeEngine 

107 

108 

109# When we're handed literal SQL, ensure it's a SELECT query 

110SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE) 

111 

112 

113( 

114 CACHE_HIT, 

115 CACHE_MISS, 

116 CACHING_DISABLED, 

117 NO_CACHE_KEY, 

118 NO_DIALECT_SUPPORT, 

119) = list(CacheStats) 

120 

121 

122class DefaultDialect(Dialect): 

123 """Default implementation of Dialect""" 

124 

125 statement_compiler = compiler.SQLCompiler 

126 ddl_compiler = compiler.DDLCompiler 

127 type_compiler_cls = compiler.GenericTypeCompiler 

128 

129 preparer = compiler.IdentifierPreparer 

130 supports_alter = True 

131 supports_comments = False 

132 supports_constraint_comments = False 

133 inline_comments = False 

134 supports_statement_cache = True 

135 

136 div_is_floordiv = True 

137 

138 bind_typing = interfaces.BindTyping.NONE 

139 

140 include_set_input_sizes: Optional[Set[Any]] = None 

141 exclude_set_input_sizes: Optional[Set[Any]] = None 

142 

143 # the first value we'd get for an autoincrement column. 

144 default_sequence_base = 1 

145 

146 # most DBAPIs happy with this for execute(). 

147 # not cx_oracle. 

148 execute_sequence_format = tuple 

149 

150 supports_schemas = True 

151 supports_views = True 

152 supports_sequences = False 

153 sequences_optional = False 

154 preexecute_autoincrement_sequences = False 

155 supports_identity_columns = False 

156 postfetch_lastrowid = True 

157 favor_returning_over_lastrowid = False 

158 insert_null_pk_still_autoincrements = False 

159 update_returning = False 

160 delete_returning = False 

161 update_returning_multifrom = False 

162 delete_returning_multifrom = False 

163 insert_returning = False 

164 

165 cte_follows_insert = False 

166 

167 supports_native_enum = False 

168 supports_native_boolean = False 

169 supports_native_uuid = False 

170 returns_native_bytes = False 

171 

172 non_native_boolean_check_constraint = True 

173 

174 supports_simple_order_by_label = True 

175 

176 tuple_in_values = False 

177 

178 connection_characteristics = util.immutabledict( 

179 { 

180 "isolation_level": characteristics.IsolationLevelCharacteristic(), 

181 "logging_token": characteristics.LoggingTokenCharacteristic(), 

182 } 

183 ) 

184 

185 engine_config_types: Mapping[str, Any] = util.immutabledict( 

186 { 

187 "pool_timeout": util.asint, 

188 "echo": util.bool_or_str("debug"), 

189 "echo_pool": util.bool_or_str("debug"), 

190 "pool_recycle": util.asint, 

191 "pool_size": util.asint, 

192 "max_overflow": util.asint, 

193 "future": util.asbool, 

194 } 

195 ) 

196 

197 # if the NUMERIC type 

198 # returns decimal.Decimal. 

199 # *not* the FLOAT type however. 

200 supports_native_decimal = False 

201 

202 name = "default" 

203 

204 # length at which to truncate 

205 # any identifier. 

206 max_identifier_length = 9999 

207 _user_defined_max_identifier_length: Optional[int] = None 

208 

209 isolation_level: Optional[str] = None 

210 

211 # sub-categories of max_identifier_length. 

212 # currently these accommodate for MySQL which allows alias names 

213 # of 255 but DDL names only of 64. 

214 max_index_name_length: Optional[int] = None 

215 max_constraint_name_length: Optional[int] = None 

216 

217 supports_sane_rowcount = True 

218 supports_sane_multi_rowcount = True 

219 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {} 

220 default_paramstyle = "named" 

221 

222 supports_default_values = False 

223 """dialect supports INSERT... DEFAULT VALUES syntax""" 

224 

225 supports_default_metavalue = False 

226 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

227 

228 default_metavalue_token = "DEFAULT" 

229 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

230 parenthesis.""" 

231 

232 # not sure if this is a real thing but the compiler will deliver it 

233 # if this is the only flag enabled. 

234 supports_empty_insert = True 

235 """dialect supports INSERT () VALUES ()""" 

236 

237 supports_multivalues_insert = False 

238 

239 use_insertmanyvalues: bool = False 

240 

241 use_insertmanyvalues_wo_returning: bool = False 

242 

243 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = ( 

244 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED 

245 ) 

246 

247 insertmanyvalues_page_size: int = 1000 

248 insertmanyvalues_max_parameters = 32700 

249 

250 supports_is_distinct_from = True 

251 

252 supports_server_side_cursors = False 

253 

254 server_side_cursors = False 

255 

256 # extra record-level locking features (#4860) 

257 supports_for_update_of = False 

258 

259 server_version_info = None 

260 

261 default_schema_name: Optional[str] = None 

262 

263 # indicates symbol names are 

264 # UPPERCASED if they are case insensitive 

265 # within the database. 

266 # if this is True, the methods normalize_name() 

267 # and denormalize_name() must be provided. 

268 requires_name_normalize = False 

269 

270 is_async = False 

271 

272 has_terminate = False 

273 

274 # TODO: this is not to be part of 2.0. implement rudimentary binary 

275 # literals for SQLite, PostgreSQL, MySQL only within 

276 # _Binary.literal_processor 

277 _legacy_binary_type_literal_encoding = "utf-8" 

278 

279 @util.deprecated_params( 

280 empty_in_strategy=( 

281 "1.4", 

282 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is " 

283 "deprecated, and no longer has any effect. All IN expressions " 

284 "are now rendered using " 

285 'the "expanding parameter" strategy which renders a set of bound' 

286 'expressions, or an "empty set" SELECT, at statement execution' 

287 "time.", 

288 ), 

289 server_side_cursors=( 

290 "1.4", 

291 "The :paramref:`_sa.create_engine.server_side_cursors` parameter " 

292 "is deprecated and will be removed in a future release. Please " 

293 "use the " 

294 ":paramref:`_engine.Connection.execution_options.stream_results` " 

295 "parameter.", 

296 ), 

297 ) 

298 def __init__( 

299 self, 

300 paramstyle: Optional[_ParamStyle] = None, 

301 isolation_level: Optional[IsolationLevel] = None, 

302 dbapi: Optional[ModuleType] = None, 

303 implicit_returning: Literal[True] = True, 

304 supports_native_boolean: Optional[bool] = None, 

305 max_identifier_length: Optional[int] = None, 

306 label_length: Optional[int] = None, 

307 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG, 

308 use_insertmanyvalues: Optional[bool] = None, 

309 # util.deprecated_params decorator cannot render the 

310 # Linting.NO_LINTING constant 

311 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore 

312 server_side_cursors: bool = False, 

313 skip_autocommit_rollback: bool = False, 

314 **kwargs: Any, 

315 ): 

316 if server_side_cursors: 

317 if not self.supports_server_side_cursors: 

318 raise exc.ArgumentError( 

319 "Dialect %s does not support server side cursors" % self 

320 ) 

321 else: 

322 self.server_side_cursors = True 

323 

324 if getattr(self, "use_setinputsizes", False): 

325 util.warn_deprecated( 

326 "The dialect-level use_setinputsizes attribute is " 

327 "deprecated. Please use " 

328 "bind_typing = BindTyping.SETINPUTSIZES", 

329 "2.0", 

330 ) 

331 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES 

332 

333 self.positional = False 

334 self._ischema = None 

335 

336 self.dbapi = dbapi 

337 

338 self.skip_autocommit_rollback = skip_autocommit_rollback 

339 

340 if paramstyle is not None: 

341 self.paramstyle = paramstyle 

342 elif self.dbapi is not None: 

343 self.paramstyle = self.dbapi.paramstyle 

344 else: 

345 self.paramstyle = self.default_paramstyle 

346 self.positional = self.paramstyle in ( 

347 "qmark", 

348 "format", 

349 "numeric", 

350 "numeric_dollar", 

351 ) 

352 self.identifier_preparer = self.preparer(self) 

353 self._on_connect_isolation_level = isolation_level 

354 

355 legacy_tt_callable = getattr(self, "type_compiler", None) 

356 if legacy_tt_callable is not None: 

357 tt_callable = cast( 

358 Type[compiler.GenericTypeCompiler], 

359 self.type_compiler, 

360 ) 

361 else: 

362 tt_callable = self.type_compiler_cls 

363 

364 self.type_compiler_instance = self.type_compiler = tt_callable(self) 

365 

366 if supports_native_boolean is not None: 

367 self.supports_native_boolean = supports_native_boolean 

368 

369 self._user_defined_max_identifier_length = max_identifier_length 

370 if self._user_defined_max_identifier_length: 

371 self.max_identifier_length = ( 

372 self._user_defined_max_identifier_length 

373 ) 

374 self.label_length = label_length 

375 self.compiler_linting = compiler_linting 

376 

377 if use_insertmanyvalues is not None: 

378 self.use_insertmanyvalues = use_insertmanyvalues 

379 

380 if insertmanyvalues_page_size is not _NoArg.NO_ARG: 

381 self.insertmanyvalues_page_size = insertmanyvalues_page_size 

382 

383 @property 

384 @util.deprecated( 

385 "2.0", 

386 "full_returning is deprecated, please use insert_returning, " 

387 "update_returning, delete_returning", 

388 ) 

389 def full_returning(self): 

390 return ( 

391 self.insert_returning 

392 and self.update_returning 

393 and self.delete_returning 

394 ) 

395 

396 @util.memoized_property 

397 def insert_executemany_returning(self): 

398 """Default implementation for insert_executemany_returning, if not 

399 otherwise overridden by the specific dialect. 

400 

401 The default dialect determines "insert_executemany_returning" is 

402 available if the dialect in use has opted into using the 

403 "use_insertmanyvalues" feature. If they haven't opted into that, then 

404 this attribute is False, unless the dialect in question overrides this 

405 and provides some other implementation (such as the Oracle Database 

406 dialects). 

407 

408 """ 

409 return self.insert_returning and self.use_insertmanyvalues 

410 

411 @util.memoized_property 

412 def insert_executemany_returning_sort_by_parameter_order(self): 

413 """Default implementation for 

414 insert_executemany_returning_deterministic_order, if not otherwise 

415 overridden by the specific dialect. 

416 

417 The default dialect determines "insert_executemany_returning" can have 

418 deterministic order only if the dialect in use has opted into using the 

419 "use_insertmanyvalues" feature, which implements deterministic ordering 

420 using client side sentinel columns only by default. The 

421 "insertmanyvalues" feature also features alternate forms that can 

422 use server-generated PK values as "sentinels", but those are only 

423 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel` 

424 bitflag enables those alternate SQL forms, which are disabled 

425 by default. 

426 

427 If the dialect in use hasn't opted into that, then this attribute is 

428 False, unless the dialect in question overrides this and provides some 

429 other implementation (such as the Oracle Database dialects). 

430 

431 """ 

432 return self.insert_returning and self.use_insertmanyvalues 

433 

434 update_executemany_returning = False 

435 delete_executemany_returning = False 

436 

437 @util.memoized_property 

438 def loaded_dbapi(self) -> DBAPIModule: 

439 if self.dbapi is None: 

440 raise exc.InvalidRequestError( 

441 f"Dialect {self} does not have a Python DBAPI established " 

442 "and cannot be used for actual database interaction" 

443 ) 

444 return self.dbapi 

445 

446 @util.memoized_property 

447 def _bind_typing_render_casts(self): 

448 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS 

449 

450 def _ensure_has_table_connection(self, arg: Connection) -> None: 

451 if not isinstance(arg, Connection): 

452 raise exc.ArgumentError( 

453 "The argument passed to Dialect.has_table() should be a " 

454 "%s, got %s. " 

455 "Additionally, the Dialect.has_table() method is for " 

456 "internal dialect " 

457 "use only; please use " 

458 "``inspect(some_engine).has_table(<tablename>>)`` " 

459 "for public API use." % (Connection, type(arg)) 

460 ) 

461 

462 @util.memoized_property 

463 def _supports_statement_cache(self): 

464 ssc = self.__class__.__dict__.get("supports_statement_cache", None) 

465 if ssc is None: 

466 util.warn( 

467 "Dialect %s:%s will not make use of SQL compilation caching " 

468 "as it does not set the 'supports_statement_cache' attribute " 

469 "to ``True``. This can have " 

470 "significant performance implications including some " 

471 "performance degradations in comparison to prior SQLAlchemy " 

472 "versions. Dialect maintainers should seek to set this " 

473 "attribute to True after appropriate development and testing " 

474 "for SQLAlchemy 1.4 caching support. Alternatively, this " 

475 "attribute may be set to False which will disable this " 

476 "warning." % (self.name, self.driver), 

477 code="cprf", 

478 ) 

479 

480 return bool(ssc) 

481 

482 @util.memoized_property 

483 def _type_memos(self): 

484 return weakref.WeakKeyDictionary() 

485 

486 @property 

487 def dialect_description(self): # type: ignore[override] 

488 return self.name + "+" + self.driver 

489 

490 @property 

491 def supports_sane_rowcount_returning(self): 

492 """True if this dialect supports sane rowcount even if RETURNING is 

493 in use. 

494 

495 For dialects that don't support RETURNING, this is synonymous with 

496 ``supports_sane_rowcount``. 

497 

498 """ 

499 return self.supports_sane_rowcount 

500 

501 @classmethod 

502 def get_pool_class(cls, url: URL) -> Type[Pool]: 

503 default: Type[pool.Pool] 

504 if cls.is_async: 

505 default = pool.AsyncAdaptedQueuePool 

506 else: 

507 default = pool.QueuePool 

508 

509 return getattr(cls, "poolclass", default) 

510 

511 def get_dialect_pool_class(self, url: URL) -> Type[Pool]: 

512 return self.get_pool_class(url) 

513 

514 @classmethod 

515 def load_provisioning(cls): 

516 package = ".".join(cls.__module__.split(".")[0:-1]) 

517 try: 

518 __import__(package + ".provision") 

519 except ImportError: 

520 pass 

521 

522 def _builtin_onconnect(self) -> Optional[_ListenerFnType]: 

523 if self._on_connect_isolation_level is not None: 

524 

525 def builtin_connect(dbapi_conn, conn_rec): 

526 self._assert_and_set_isolation_level( 

527 dbapi_conn, self._on_connect_isolation_level 

528 ) 

529 

530 return builtin_connect 

531 else: 

532 return None 

533 

534 def initialize(self, connection: Connection) -> None: 

535 try: 

536 self.server_version_info = self._get_server_version_info( 

537 connection 

538 ) 

539 except NotImplementedError: 

540 self.server_version_info = None 

541 try: 

542 self.default_schema_name = self._get_default_schema_name( 

543 connection 

544 ) 

545 except NotImplementedError: 

546 self.default_schema_name = None 

547 

548 try: 

549 self.default_isolation_level = self.get_default_isolation_level( 

550 connection.connection.dbapi_connection 

551 ) 

552 except NotImplementedError: 

553 self.default_isolation_level = None 

554 

555 if not self._user_defined_max_identifier_length: 

556 max_ident_length = self._check_max_identifier_length(connection) 

557 if max_ident_length: 

558 self.max_identifier_length = max_ident_length 

559 

560 if ( 

561 self.label_length 

562 and self.label_length > self.max_identifier_length 

563 ): 

564 raise exc.ArgumentError( 

565 "Label length of %d is greater than this dialect's" 

566 " maximum identifier length of %d" 

567 % (self.label_length, self.max_identifier_length) 

568 ) 

569 

570 def on_connect(self) -> Optional[Callable[[Any], None]]: 

571 # inherits the docstring from interfaces.Dialect.on_connect 

572 return None 

573 

574 def _check_max_identifier_length(self, connection): 

575 """Perform a connection / server version specific check to determine 

576 the max_identifier_length. 

577 

578 If the dialect's class level max_identifier_length should be used, 

579 can return None. 

580 

581 """ 

582 return None 

583 

584 def get_default_isolation_level(self, dbapi_conn): 

585 """Given a DBAPI connection, return its isolation level, or 

586 a default isolation level if one cannot be retrieved. 

587 

588 May be overridden by subclasses in order to provide a 

589 "fallback" isolation level for databases that cannot reliably 

590 retrieve the actual isolation level. 

591 

592 By default, calls the :meth:`_engine.Interfaces.get_isolation_level` 

593 method, propagating any exceptions raised. 

594 

595 """ 

596 return self.get_isolation_level(dbapi_conn) 

597 

598 def type_descriptor(self, typeobj): 

599 """Provide a database-specific :class:`.TypeEngine` object, given 

600 the generic object which comes from the types module. 

601 

602 This method looks for a dictionary called 

603 ``colspecs`` as a class or instance-level variable, 

604 and passes on to :func:`_types.adapt_type`. 

605 

606 """ 

607 return type_api.adapt_type(typeobj, self.colspecs) 

608 

609 def has_index(self, connection, table_name, index_name, schema=None, **kw): 

610 if not self.has_table(connection, table_name, schema=schema, **kw): 

611 return False 

612 for idx in self.get_indexes( 

613 connection, table_name, schema=schema, **kw 

614 ): 

615 if idx["name"] == index_name: 

616 return True 

617 else: 

618 return False 

619 

620 def has_schema( 

621 self, connection: Connection, schema_name: str, **kw: Any 

622 ) -> bool: 

623 return schema_name in self.get_schema_names(connection, **kw) 

624 

625 def validate_identifier(self, ident: str) -> None: 

626 if len(ident) > self.max_identifier_length: 

627 raise exc.IdentifierError( 

628 "Identifier '%s' exceeds maximum length of %d characters" 

629 % (ident, self.max_identifier_length) 

630 ) 

631 

632 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection: 

633 # inherits the docstring from interfaces.Dialect.connect 

634 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501 

635 

636 def create_connect_args(self, url: URL) -> ConnectArgsType: 

637 # inherits the docstring from interfaces.Dialect.create_connect_args 

638 opts = url.translate_connect_args() 

639 opts.update(url.query) 

640 return ([], opts) 

641 

642 def set_engine_execution_options( 

643 self, engine: Engine, opts: Mapping[str, Any] 

644 ) -> None: 

645 supported_names = set(self.connection_characteristics).intersection( 

646 opts 

647 ) 

648 if supported_names: 

649 characteristics: Mapping[str, Any] = util.immutabledict( 

650 (name, opts[name]) for name in supported_names 

651 ) 

652 

653 @event.listens_for(engine, "engine_connect") 

654 def set_connection_characteristics(connection): 

655 self._set_connection_characteristics( 

656 connection, characteristics 

657 ) 

658 

659 def set_connection_execution_options( 

660 self, connection: Connection, opts: Mapping[str, Any] 

661 ) -> None: 

662 supported_names = set(self.connection_characteristics).intersection( 

663 opts 

664 ) 

665 if supported_names: 

666 characteristics: Mapping[str, Any] = util.immutabledict( 

667 (name, opts[name]) for name in supported_names 

668 ) 

669 self._set_connection_characteristics(connection, characteristics) 

670 

671 def _set_connection_characteristics(self, connection, characteristics): 

672 characteristic_values = [ 

673 (name, self.connection_characteristics[name], value) 

674 for name, value in characteristics.items() 

675 ] 

676 

677 if connection.in_transaction(): 

678 trans_objs = [ 

679 (name, obj) 

680 for name, obj, _ in characteristic_values 

681 if obj.transactional 

682 ] 

683 if trans_objs: 

684 raise exc.InvalidRequestError( 

685 "This connection has already initialized a SQLAlchemy " 

686 "Transaction() object via begin() or autobegin; " 

687 "%s may not be altered unless rollback() or commit() " 

688 "is called first." 

689 % (", ".join(name for name, obj in trans_objs)) 

690 ) 

691 

692 dbapi_connection = connection.connection.dbapi_connection 

693 for _, characteristic, value in characteristic_values: 

694 characteristic.set_connection_characteristic( 

695 self, connection, dbapi_connection, value 

696 ) 

697 connection.connection._connection_record.finalize_callback.append( 

698 functools.partial(self._reset_characteristics, characteristics) 

699 ) 

700 

701 def _reset_characteristics(self, characteristics, dbapi_connection): 

702 for characteristic_name in characteristics: 

703 characteristic = self.connection_characteristics[ 

704 characteristic_name 

705 ] 

706 characteristic.reset_characteristic(self, dbapi_connection) 

707 

708 def do_begin(self, dbapi_connection): 

709 pass 

710 

711 def do_rollback(self, dbapi_connection): 

712 if self.skip_autocommit_rollback and self.detect_autocommit_setting( 

713 dbapi_connection 

714 ): 

715 return 

716 dbapi_connection.rollback() 

717 

718 def do_commit(self, dbapi_connection): 

719 dbapi_connection.commit() 

720 

721 def do_terminate(self, dbapi_connection): 

722 self.do_close(dbapi_connection) 

723 

724 def do_close(self, dbapi_connection): 

725 dbapi_connection.close() 

726 

727 @util.memoized_property 

728 def _dialect_specific_select_one(self): 

729 return str(expression.select(1).compile(dialect=self)) 

730 

731 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: 

732 try: 

733 return self.do_ping(dbapi_connection) 

734 except self.loaded_dbapi.Error as err: 

735 is_disconnect = self.is_disconnect(err, dbapi_connection, None) 

736 

737 if self._has_events: 

738 try: 

739 Connection._handle_dbapi_exception_noconnection( 

740 err, 

741 self, 

742 is_disconnect=is_disconnect, 

743 invalidate_pool_on_disconnect=False, 

744 is_pre_ping=True, 

745 ) 

746 except exc.StatementError as new_err: 

747 is_disconnect = new_err.connection_invalidated 

748 

749 if is_disconnect: 

750 return False 

751 else: 

752 raise 

753 

754 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool: 

755 cursor = dbapi_connection.cursor() 

756 try: 

757 cursor.execute(self._dialect_specific_select_one) 

758 finally: 

759 cursor.close() 

760 return True 

761 

762 def create_xid(self): 

763 """Create a random two-phase transaction ID. 

764 

765 This id will be passed to do_begin_twophase(), do_rollback_twophase(), 

766 do_commit_twophase(). Its format is unspecified. 

767 """ 

768 

769 return "_sa_%032x" % random.randint(0, 2**128) 

770 

771 def do_savepoint(self, connection, name): 

772 connection.execute(expression.SavepointClause(name)) 

773 

774 def do_rollback_to_savepoint(self, connection, name): 

775 connection.execute(expression.RollbackToSavepointClause(name)) 

776 

777 def do_release_savepoint(self, connection, name): 

778 connection.execute(expression.ReleaseSavepointClause(name)) 

779 

780 def _deliver_insertmanyvalues_batches( 

781 self, 

782 connection, 

783 cursor, 

784 statement, 

785 parameters, 

786 generic_setinputsizes, 

787 context, 

788 ): 

789 context = cast(DefaultExecutionContext, context) 

790 compiled = cast(SQLCompiler, context.compiled) 

791 

792 _composite_sentinel_proc: Sequence[ 

793 Optional[_ResultProcessorType[Any]] 

794 ] = () 

795 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None 

796 _sentinel_proc_initialized: bool = False 

797 

798 compiled_parameters = context.compiled_parameters 

799 

800 imv = compiled._insertmanyvalues 

801 assert imv is not None 

802 

803 is_returning: Final[bool] = bool(compiled.effective_returning) 

804 batch_size = context.execution_options.get( 

805 "insertmanyvalues_page_size", self.insertmanyvalues_page_size 

806 ) 

807 

808 if compiled.schema_translate_map: 

809 schema_translate_map = context.execution_options.get( 

810 "schema_translate_map", {} 

811 ) 

812 else: 

813 schema_translate_map = None 

814 

815 if is_returning: 

816 result: Optional[List[Any]] = [] 

817 context._insertmanyvalues_rows = result 

818 

819 sort_by_parameter_order = imv.sort_by_parameter_order 

820 

821 else: 

822 sort_by_parameter_order = False 

823 result = None 

824 

825 for imv_batch in compiled._deliver_insertmanyvalues_batches( 

826 statement, 

827 parameters, 

828 compiled_parameters, 

829 generic_setinputsizes, 

830 batch_size, 

831 sort_by_parameter_order, 

832 schema_translate_map, 

833 ): 

834 yield imv_batch 

835 

836 if is_returning: 

837 

838 try: 

839 rows = context.fetchall_for_returning(cursor) 

840 except BaseException as be: 

841 connection._handle_dbapi_exception( 

842 be, 

843 sql_util._long_statement(imv_batch.replaced_statement), 

844 imv_batch.replaced_parameters, 

845 None, 

846 context, 

847 is_sub_exec=True, 

848 ) 

849 

850 # I would have thought "is_returning: Final[bool]" 

851 # would have assured this but pylance thinks not 

852 assert result is not None 

853 

854 if imv.num_sentinel_columns and not imv_batch.is_downgraded: 

855 composite_sentinel = imv.num_sentinel_columns > 1 

856 if imv.implicit_sentinel: 

857 # for implicit sentinel, which is currently single-col 

858 # integer autoincrement, do a simple sort. 

859 assert not composite_sentinel 

860 result.extend( 

861 sorted(rows, key=operator.itemgetter(-1)) 

862 ) 

863 continue 

864 

865 # otherwise, create dictionaries to match up batches 

866 # with parameters 

867 assert imv.sentinel_param_keys 

868 assert imv.sentinel_columns 

869 

870 _nsc = imv.num_sentinel_columns 

871 

872 if not _sentinel_proc_initialized: 

873 if composite_sentinel: 

874 _composite_sentinel_proc = [ 

875 col.type._cached_result_processor( 

876 self, cursor_desc[1] 

877 ) 

878 for col, cursor_desc in zip( 

879 imv.sentinel_columns, 

880 cursor.description[-_nsc:], 

881 ) 

882 ] 

883 else: 

884 _scalar_sentinel_proc = ( 

885 imv.sentinel_columns[0] 

886 ).type._cached_result_processor( 

887 self, cursor.description[-1][1] 

888 ) 

889 _sentinel_proc_initialized = True 

890 

891 rows_by_sentinel: Union[ 

892 Dict[Tuple[Any, ...], Any], 

893 Dict[Any, Any], 

894 ] 

895 if composite_sentinel: 

896 rows_by_sentinel = { 

897 tuple( 

898 (proc(val) if proc else val) 

899 for val, proc in zip( 

900 row[-_nsc:], _composite_sentinel_proc 

901 ) 

902 ): row 

903 for row in rows 

904 } 

905 elif _scalar_sentinel_proc: 

906 rows_by_sentinel = { 

907 _scalar_sentinel_proc(row[-1]): row for row in rows 

908 } 

909 else: 

910 rows_by_sentinel = {row[-1]: row for row in rows} 

911 

912 if len(rows_by_sentinel) != len(imv_batch.batch): 

913 # see test_insert_exec.py:: 

914 # IMVSentinelTest::test_sentinel_incorrect_rowcount 

915 # for coverage / demonstration 

916 raise exc.InvalidRequestError( 

917 f"Sentinel-keyed result set did not produce " 

918 f"correct number of rows {len(imv_batch.batch)}; " 

919 "produced " 

920 f"{len(rows_by_sentinel)}. Please ensure the " 

921 "sentinel column is fully unique and populated in " 

922 "all cases." 

923 ) 

924 

925 try: 

926 ordered_rows = [ 

927 rows_by_sentinel[sentinel_keys] 

928 for sentinel_keys in imv_batch.sentinel_values 

929 ] 

930 except KeyError as ke: 

931 # see test_insert_exec.py:: 

932 # IMVSentinelTest::test_sentinel_cant_match_keys 

933 # for coverage / demonstration 

934 raise exc.InvalidRequestError( 

935 f"Can't match sentinel values in result set to " 

936 f"parameter sets; key {ke.args[0]!r} was not " 

937 "found. " 

938 "There may be a mismatch between the datatype " 

939 "passed to the DBAPI driver vs. that which it " 

940 "returns in a result row. Ensure the given " 

941 "Python value matches the expected result type " 

942 "*exactly*, taking care to not rely upon implicit " 

943 "conversions which may occur such as when using " 

944 "strings in place of UUID or integer values, etc. " 

945 ) from ke 

946 

947 result.extend(ordered_rows) 

948 

949 else: 

950 result.extend(rows) 

951 

952 def do_executemany(self, cursor, statement, parameters, context=None): 

953 cursor.executemany(statement, parameters) 

954 

955 def do_execute(self, cursor, statement, parameters, context=None): 

956 cursor.execute(statement, parameters) 

957 

958 def do_execute_no_params(self, cursor, statement, context=None): 

959 cursor.execute(statement) 

960 

961 def is_disconnect( 

962 self, 

963 e: DBAPIModule.Error, 

964 connection: Union[ 

965 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None 

966 ], 

967 cursor: Optional[interfaces.DBAPICursor], 

968 ) -> bool: 

969 return False 

970 

971 @util.memoized_instancemethod 

972 def _gen_allowed_isolation_levels(self, dbapi_conn): 

973 try: 

974 raw_levels = list(self.get_isolation_level_values(dbapi_conn)) 

975 except NotImplementedError: 

976 return None 

977 else: 

978 normalized_levels = [ 

979 level.replace("_", " ").upper() for level in raw_levels 

980 ] 

981 if raw_levels != normalized_levels: 

982 raise ValueError( 

983 f"Dialect {self.name!r} get_isolation_level_values() " 

984 f"method should return names as UPPERCASE using spaces, " 

985 f"not underscores; got " 

986 f"{sorted(set(raw_levels).difference(normalized_levels))}" 

987 ) 

988 return tuple(normalized_levels) 

989 

990 def _assert_and_set_isolation_level(self, dbapi_conn, level): 

991 level = level.replace("_", " ").upper() 

992 

993 _allowed_isolation_levels = self._gen_allowed_isolation_levels( 

994 dbapi_conn 

995 ) 

996 if ( 

997 _allowed_isolation_levels 

998 and level not in _allowed_isolation_levels 

999 ): 

1000 raise exc.ArgumentError( 

1001 f"Invalid value {level!r} for isolation_level. " 

1002 f"Valid isolation levels for {self.name!r} are " 

1003 f"{', '.join(_allowed_isolation_levels)}" 

1004 ) 

1005 

1006 self.set_isolation_level(dbapi_conn, level) 

1007 

1008 def reset_isolation_level(self, dbapi_conn): 

1009 if self._on_connect_isolation_level is not None: 

1010 assert ( 

1011 self._on_connect_isolation_level == "AUTOCOMMIT" 

1012 or self._on_connect_isolation_level 

1013 == self.default_isolation_level 

1014 ) 

1015 self._assert_and_set_isolation_level( 

1016 dbapi_conn, self._on_connect_isolation_level 

1017 ) 

1018 else: 

1019 assert self.default_isolation_level is not None 

1020 self._assert_and_set_isolation_level( 

1021 dbapi_conn, 

1022 self.default_isolation_level, 

1023 ) 

1024 

1025 def normalize_name(self, name): 

1026 if name is None: 

1027 return None 

1028 

1029 name_lower = name.lower() 

1030 name_upper = name.upper() 

1031 

1032 if name_upper == name_lower: 

1033 # name has no upper/lower conversion, e.g. non-european characters. 

1034 # return unchanged 

1035 return name 

1036 elif name_upper == name and not ( 

1037 self.identifier_preparer._requires_quotes 

1038 )(name_lower): 

1039 # name is all uppercase and doesn't require quoting; normalize 

1040 # to all lower case 

1041 return name_lower 

1042 elif name_lower == name: 

1043 # name is all lower case, which if denormalized means we need to 

1044 # force quoting on it 

1045 return quoted_name(name, quote=True) 

1046 else: 

1047 # name is mixed case, means it will be quoted in SQL when used 

1048 # later, no normalizes 

1049 return name 

1050 

1051 def denormalize_name(self, name): 

1052 if name is None: 

1053 return None 

1054 

1055 name_lower = name.lower() 

1056 name_upper = name.upper() 

1057 

1058 if name_upper == name_lower: 

1059 # name has no upper/lower conversion, e.g. non-european characters. 

1060 # return unchanged 

1061 return name 

1062 elif name_lower == name and not ( 

1063 self.identifier_preparer._requires_quotes 

1064 )(name_lower): 

1065 name = name_upper 

1066 return name 

1067 

1068 def get_driver_connection(self, connection: DBAPIConnection) -> Any: 

1069 return connection 

1070 

1071 def _overrides_default(self, method): 

1072 return ( 

1073 getattr(type(self), method).__code__ 

1074 is not getattr(DefaultDialect, method).__code__ 

1075 ) 

1076 

1077 def _default_multi_reflect( 

1078 self, 

1079 single_tbl_method, 

1080 connection, 

1081 kind, 

1082 schema, 

1083 filter_names, 

1084 scope, 

1085 **kw, 

1086 ): 

1087 names_fns = [] 

1088 temp_names_fns = [] 

1089 if ObjectKind.TABLE in kind: 

1090 names_fns.append(self.get_table_names) 

1091 temp_names_fns.append(self.get_temp_table_names) 

1092 if ObjectKind.VIEW in kind: 

1093 names_fns.append(self.get_view_names) 

1094 temp_names_fns.append(self.get_temp_view_names) 

1095 if ObjectKind.MATERIALIZED_VIEW in kind: 

1096 names_fns.append(self.get_materialized_view_names) 

1097 # no temp materialized view at the moment 

1098 # temp_names_fns.append(self.get_temp_materialized_view_names) 

1099 

1100 unreflectable = kw.pop("unreflectable", {}) 

1101 

1102 if ( 

1103 filter_names 

1104 and scope is ObjectScope.ANY 

1105 and kind is ObjectKind.ANY 

1106 ): 

1107 # if names are given and no qualification on type of table 

1108 # (i.e. the Table(..., autoload) case), take the names as given, 

1109 # don't run names queries. If a table does not exit 

1110 # NoSuchTableError is raised and it's skipped 

1111 

1112 # this also suits the case for mssql where we can reflect 

1113 # individual temp tables but there's no temp_names_fn 

1114 names = filter_names 

1115 else: 

1116 names = [] 

1117 name_kw = {"schema": schema, **kw} 

1118 fns = [] 

1119 if ObjectScope.DEFAULT in scope: 

1120 fns.extend(names_fns) 

1121 if ObjectScope.TEMPORARY in scope: 

1122 fns.extend(temp_names_fns) 

1123 

1124 for fn in fns: 

1125 try: 

1126 names.extend(fn(connection, **name_kw)) 

1127 except NotImplementedError: 

1128 pass 

1129 

1130 if filter_names: 

1131 filter_names = set(filter_names) 

1132 

1133 # iterate over all the tables/views and call the single table method 

1134 for table in names: 

1135 if not filter_names or table in filter_names: 

1136 key = (schema, table) 

1137 try: 

1138 yield ( 

1139 key, 

1140 single_tbl_method( 

1141 connection, table, schema=schema, **kw 

1142 ), 

1143 ) 

1144 except exc.UnreflectableTableError as err: 

1145 if key not in unreflectable: 

1146 unreflectable[key] = err 

1147 except exc.NoSuchTableError: 

1148 pass 

1149 

1150 def get_multi_table_options(self, connection, **kw): 

1151 return self._default_multi_reflect( 

1152 self.get_table_options, connection, **kw 

1153 ) 

1154 

1155 def get_multi_columns(self, connection, **kw): 

1156 return self._default_multi_reflect(self.get_columns, connection, **kw) 

1157 

1158 def get_multi_pk_constraint(self, connection, **kw): 

1159 return self._default_multi_reflect( 

1160 self.get_pk_constraint, connection, **kw 

1161 ) 

1162 

1163 def get_multi_foreign_keys(self, connection, **kw): 

1164 return self._default_multi_reflect( 

1165 self.get_foreign_keys, connection, **kw 

1166 ) 

1167 

1168 def get_multi_indexes(self, connection, **kw): 

1169 return self._default_multi_reflect(self.get_indexes, connection, **kw) 

1170 

1171 def get_multi_unique_constraints(self, connection, **kw): 

1172 return self._default_multi_reflect( 

1173 self.get_unique_constraints, connection, **kw 

1174 ) 

1175 

1176 def get_multi_check_constraints(self, connection, **kw): 

1177 return self._default_multi_reflect( 

1178 self.get_check_constraints, connection, **kw 

1179 ) 

1180 

1181 def get_multi_table_comment(self, connection, **kw): 

1182 return self._default_multi_reflect( 

1183 self.get_table_comment, connection, **kw 

1184 ) 

1185 

1186 

1187class StrCompileDialect(DefaultDialect): 

1188 statement_compiler = compiler.StrSQLCompiler 

1189 ddl_compiler = compiler.DDLCompiler 

1190 type_compiler_cls = compiler.StrSQLTypeCompiler 

1191 preparer = compiler.IdentifierPreparer 

1192 

1193 insert_returning = True 

1194 update_returning = True 

1195 delete_returning = True 

1196 

1197 supports_statement_cache = True 

1198 

1199 supports_identity_columns = True 

1200 

1201 supports_sequences = True 

1202 sequences_optional = True 

1203 preexecute_autoincrement_sequences = False 

1204 

1205 supports_native_boolean = True 

1206 

1207 supports_multivalues_insert = True 

1208 supports_simple_order_by_label = True 

1209 

1210 

1211class DefaultExecutionContext(ExecutionContext): 

1212 isinsert = False 

1213 isupdate = False 

1214 isdelete = False 

1215 is_crud = False 

1216 is_text = False 

1217 isddl = False 

1218 

1219 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE 

1220 

1221 compiled: Optional[Compiled] = None 

1222 result_column_struct: Optional[ 

1223 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool] 

1224 ] = None 

1225 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None 

1226 

1227 execution_options: _ExecuteOptions = util.EMPTY_DICT 

1228 

1229 cursor_fetch_strategy = _cursor._DEFAULT_FETCH 

1230 

1231 invoked_statement: Optional[Executable] = None 

1232 

1233 _is_implicit_returning = False 

1234 _is_explicit_returning = False 

1235 _is_supplemental_returning = False 

1236 _is_server_side = False 

1237 

1238 _soft_closed = False 

1239 

1240 _rowcount: Optional[int] = None 

1241 

1242 # a hook for SQLite's translation of 

1243 # result column names 

1244 # NOTE: pyhive is using this hook, can't remove it :( 

1245 _translate_colname: Optional[Callable[[str], str]] = None 

1246 

1247 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict() 

1248 """used by set_input_sizes(). 

1249 

1250 This collection comes from ``ExpandedState.parameter_expansion``. 

1251 

1252 """ 

1253 

1254 cache_hit = NO_CACHE_KEY 

1255 

1256 root_connection: Connection 

1257 _dbapi_connection: PoolProxiedConnection 

1258 dialect: Dialect 

1259 unicode_statement: str 

1260 cursor: DBAPICursor 

1261 compiled_parameters: List[_MutableCoreSingleExecuteParams] 

1262 parameters: _DBAPIMultiExecuteParams 

1263 extracted_parameters: Optional[Sequence[BindParameter[Any]]] 

1264 

1265 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) 

1266 

1267 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None 

1268 _num_sentinel_cols: int = 0 

1269 

1270 @classmethod 

1271 def _init_ddl( 

1272 cls, 

1273 dialect: Dialect, 

1274 connection: Connection, 

1275 dbapi_connection: PoolProxiedConnection, 

1276 execution_options: _ExecuteOptions, 

1277 compiled_ddl: DDLCompiler, 

1278 ) -> ExecutionContext: 

1279 """Initialize execution context for an ExecutableDDLElement 

1280 construct.""" 

1281 

1282 self = cls.__new__(cls) 

1283 self.root_connection = connection 

1284 self._dbapi_connection = dbapi_connection 

1285 self.dialect = connection.dialect 

1286 

1287 self.compiled = compiled = compiled_ddl 

1288 self.isddl = True 

1289 

1290 self.execution_options = execution_options 

1291 

1292 self.unicode_statement = str(compiled) 

1293 if compiled.schema_translate_map: 

1294 schema_translate_map = self.execution_options.get( 

1295 "schema_translate_map", {} 

1296 ) 

1297 

1298 rst = compiled.preparer._render_schema_translates 

1299 self.unicode_statement = rst( 

1300 self.unicode_statement, schema_translate_map 

1301 ) 

1302 

1303 self.statement = self.unicode_statement 

1304 

1305 self.cursor = self.create_cursor() 

1306 self.compiled_parameters = [] 

1307 

1308 if dialect.positional: 

1309 self.parameters = [dialect.execute_sequence_format()] 

1310 else: 

1311 self.parameters = [self._empty_dict_params] 

1312 

1313 return self 

1314 

1315 @classmethod 

1316 def _init_compiled( 

1317 cls, 

1318 dialect: Dialect, 

1319 connection: Connection, 

1320 dbapi_connection: PoolProxiedConnection, 

1321 execution_options: _ExecuteOptions, 

1322 compiled: SQLCompiler, 

1323 parameters: _CoreMultiExecuteParams, 

1324 invoked_statement: Executable, 

1325 extracted_parameters: Optional[Sequence[BindParameter[Any]]], 

1326 cache_hit: CacheStats = CacheStats.CACHING_DISABLED, 

1327 ) -> ExecutionContext: 

1328 """Initialize execution context for a Compiled construct.""" 

1329 

1330 self = cls.__new__(cls) 

1331 self.root_connection = connection 

1332 self._dbapi_connection = dbapi_connection 

1333 self.dialect = connection.dialect 

1334 self.extracted_parameters = extracted_parameters 

1335 self.invoked_statement = invoked_statement 

1336 self.compiled = compiled 

1337 self.cache_hit = cache_hit 

1338 

1339 self.execution_options = execution_options 

1340 

1341 self.result_column_struct = ( 

1342 compiled._result_columns, 

1343 compiled._ordered_columns, 

1344 compiled._textual_ordered_columns, 

1345 compiled._ad_hoc_textual, 

1346 compiled._loose_column_name_matching, 

1347 ) 

1348 

1349 self.isinsert = ii = compiled.isinsert 

1350 self.isupdate = iu = compiled.isupdate 

1351 self.isdelete = id_ = compiled.isdelete 

1352 self.is_text = compiled.isplaintext 

1353 

1354 if ii or iu or id_: 

1355 dml_statement = compiled.compile_state.statement # type: ignore 

1356 if TYPE_CHECKING: 

1357 assert isinstance(dml_statement, UpdateBase) 

1358 self.is_crud = True 

1359 self._is_explicit_returning = ier = bool(dml_statement._returning) 

1360 self._is_implicit_returning = iir = bool( 

1361 compiled.implicit_returning 

1362 ) 

1363 if iir and dml_statement._supplemental_returning: 

1364 self._is_supplemental_returning = True 

1365 

1366 # dont mix implicit and explicit returning 

1367 assert not (iir and ier) 

1368 

1369 if (ier or iir) and compiled.for_executemany: 

1370 if ii and not self.dialect.insert_executemany_returning: 

1371 raise exc.InvalidRequestError( 

1372 f"Dialect {self.dialect.dialect_description} with " 

1373 f"current server capabilities does not support " 

1374 "INSERT..RETURNING when executemany is used" 

1375 ) 

1376 elif ( 

1377 ii 

1378 and dml_statement._sort_by_parameter_order 

1379 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501 

1380 ): 

1381 raise exc.InvalidRequestError( 

1382 f"Dialect {self.dialect.dialect_description} with " 

1383 f"current server capabilities does not support " 

1384 "INSERT..RETURNING with deterministic row ordering " 

1385 "when executemany is used" 

1386 ) 

1387 elif ( 

1388 ii 

1389 and self.dialect.use_insertmanyvalues 

1390 and not compiled._insertmanyvalues 

1391 ): 

1392 raise exc.InvalidRequestError( 

1393 'Statement does not have "insertmanyvalues" ' 

1394 "enabled, can't use INSERT..RETURNING with " 

1395 "executemany in this case." 

1396 ) 

1397 elif iu and not self.dialect.update_executemany_returning: 

1398 raise exc.InvalidRequestError( 

1399 f"Dialect {self.dialect.dialect_description} with " 

1400 f"current server capabilities does not support " 

1401 "UPDATE..RETURNING when executemany is used" 

1402 ) 

1403 elif id_ and not self.dialect.delete_executemany_returning: 

1404 raise exc.InvalidRequestError( 

1405 f"Dialect {self.dialect.dialect_description} with " 

1406 f"current server capabilities does not support " 

1407 "DELETE..RETURNING when executemany is used" 

1408 ) 

1409 

1410 if not parameters: 

1411 self.compiled_parameters = [ 

1412 compiled.construct_params( 

1413 extracted_parameters=extracted_parameters, 

1414 escape_names=False, 

1415 ) 

1416 ] 

1417 else: 

1418 self.compiled_parameters = [ 

1419 compiled.construct_params( 

1420 m, 

1421 escape_names=False, 

1422 _group_number=grp, 

1423 extracted_parameters=extracted_parameters, 

1424 ) 

1425 for grp, m in enumerate(parameters) 

1426 ] 

1427 

1428 if len(parameters) > 1: 

1429 if self.isinsert and compiled._insertmanyvalues: 

1430 self.execute_style = ExecuteStyle.INSERTMANYVALUES 

1431 

1432 imv = compiled._insertmanyvalues 

1433 if imv.sentinel_columns is not None: 

1434 self._num_sentinel_cols = imv.num_sentinel_columns 

1435 else: 

1436 self.execute_style = ExecuteStyle.EXECUTEMANY 

1437 

1438 self.unicode_statement = compiled.string 

1439 

1440 self.cursor = self.create_cursor() 

1441 

1442 if self.compiled.insert_prefetch or self.compiled.update_prefetch: 

1443 self._process_execute_defaults() 

1444 

1445 processors = compiled._bind_processors 

1446 

1447 flattened_processors: Mapping[ 

1448 str, _BindProcessorType[Any] 

1449 ] = processors # type: ignore[assignment] 

1450 

1451 if compiled.literal_execute_params or compiled.post_compile_params: 

1452 if self.executemany: 

1453 raise exc.InvalidRequestError( 

1454 "'literal_execute' or 'expanding' parameters can't be " 

1455 "used with executemany()" 

1456 ) 

1457 

1458 expanded_state = compiled._process_parameters_for_postcompile( 

1459 self.compiled_parameters[0] 

1460 ) 

1461 

1462 # re-assign self.unicode_statement 

1463 self.unicode_statement = expanded_state.statement 

1464 

1465 self._expanded_parameters = expanded_state.parameter_expansion 

1466 

1467 flattened_processors = dict(processors) # type: ignore 

1468 flattened_processors.update(expanded_state.processors) 

1469 positiontup = expanded_state.positiontup 

1470 elif compiled.positional: 

1471 positiontup = self.compiled.positiontup 

1472 else: 

1473 positiontup = None 

1474 

1475 if compiled.schema_translate_map: 

1476 schema_translate_map = self.execution_options.get( 

1477 "schema_translate_map", {} 

1478 ) 

1479 rst = compiled.preparer._render_schema_translates 

1480 self.unicode_statement = rst( 

1481 self.unicode_statement, schema_translate_map 

1482 ) 

1483 

1484 # final self.unicode_statement is now assigned, encode if needed 

1485 # by dialect 

1486 self.statement = self.unicode_statement 

1487 

1488 # Convert the dictionary of bind parameter values 

1489 # into a dict or list to be sent to the DBAPI's 

1490 # execute() or executemany() method. 

1491 

1492 if compiled.positional: 

1493 core_positional_parameters: MutableSequence[Sequence[Any]] = [] 

1494 assert positiontup is not None 

1495 for compiled_params in self.compiled_parameters: 

1496 l_param: List[Any] = [ 

1497 ( 

1498 flattened_processors[key](compiled_params[key]) 

1499 if key in flattened_processors 

1500 else compiled_params[key] 

1501 ) 

1502 for key in positiontup 

1503 ] 

1504 core_positional_parameters.append( 

1505 dialect.execute_sequence_format(l_param) 

1506 ) 

1507 

1508 self.parameters = core_positional_parameters 

1509 else: 

1510 core_dict_parameters: MutableSequence[Dict[str, Any]] = [] 

1511 escaped_names = compiled.escaped_bind_names 

1512 

1513 # note that currently, "expanded" parameters will be present 

1514 # in self.compiled_parameters in their quoted form. This is 

1515 # slightly inconsistent with the approach taken as of 

1516 # #8056 where self.compiled_parameters is meant to contain unquoted 

1517 # param names. 

1518 d_param: Dict[str, Any] 

1519 for compiled_params in self.compiled_parameters: 

1520 if escaped_names: 

1521 d_param = { 

1522 escaped_names.get(key, key): ( 

1523 flattened_processors[key](compiled_params[key]) 

1524 if key in flattened_processors 

1525 else compiled_params[key] 

1526 ) 

1527 for key in compiled_params 

1528 } 

1529 else: 

1530 d_param = { 

1531 key: ( 

1532 flattened_processors[key](compiled_params[key]) 

1533 if key in flattened_processors 

1534 else compiled_params[key] 

1535 ) 

1536 for key in compiled_params 

1537 } 

1538 

1539 core_dict_parameters.append(d_param) 

1540 

1541 self.parameters = core_dict_parameters 

1542 

1543 return self 

1544 

1545 @classmethod 

1546 def _init_statement( 

1547 cls, 

1548 dialect: Dialect, 

1549 connection: Connection, 

1550 dbapi_connection: PoolProxiedConnection, 

1551 execution_options: _ExecuteOptions, 

1552 statement: str, 

1553 parameters: _DBAPIMultiExecuteParams, 

1554 ) -> ExecutionContext: 

1555 """Initialize execution context for a string SQL statement.""" 

1556 

1557 self = cls.__new__(cls) 

1558 self.root_connection = connection 

1559 self._dbapi_connection = dbapi_connection 

1560 self.dialect = connection.dialect 

1561 self.is_text = True 

1562 

1563 self.execution_options = execution_options 

1564 

1565 if not parameters: 

1566 if self.dialect.positional: 

1567 self.parameters = [dialect.execute_sequence_format()] 

1568 else: 

1569 self.parameters = [self._empty_dict_params] 

1570 elif isinstance(parameters[0], dialect.execute_sequence_format): 

1571 self.parameters = parameters 

1572 elif isinstance(parameters[0], dict): 

1573 self.parameters = parameters 

1574 else: 

1575 self.parameters = [ 

1576 dialect.execute_sequence_format(p) for p in parameters 

1577 ] 

1578 

1579 if len(parameters) > 1: 

1580 self.execute_style = ExecuteStyle.EXECUTEMANY 

1581 

1582 self.statement = self.unicode_statement = statement 

1583 

1584 self.cursor = self.create_cursor() 

1585 return self 

1586 

1587 @classmethod 

1588 def _init_default( 

1589 cls, 

1590 dialect: Dialect, 

1591 connection: Connection, 

1592 dbapi_connection: PoolProxiedConnection, 

1593 execution_options: _ExecuteOptions, 

1594 ) -> ExecutionContext: 

1595 """Initialize execution context for a ColumnDefault construct.""" 

1596 

1597 self = cls.__new__(cls) 

1598 self.root_connection = connection 

1599 self._dbapi_connection = dbapi_connection 

1600 self.dialect = connection.dialect 

1601 

1602 self.execution_options = execution_options 

1603 

1604 self.cursor = self.create_cursor() 

1605 return self 

1606 

1607 def _get_cache_stats(self) -> str: 

1608 if self.compiled is None: 

1609 return "raw sql" 

1610 

1611 now = perf_counter() 

1612 

1613 ch = self.cache_hit 

1614 

1615 gen_time = self.compiled._gen_time 

1616 assert gen_time is not None 

1617 

1618 if ch is NO_CACHE_KEY: 

1619 return "no key %.5fs" % (now - gen_time,) 

1620 elif ch is CACHE_HIT: 

1621 return "cached since %.4gs ago" % (now - gen_time,) 

1622 elif ch is CACHE_MISS: 

1623 return "generated in %.5fs" % (now - gen_time,) 

1624 elif ch is CACHING_DISABLED: 

1625 if "_cache_disable_reason" in self.execution_options: 

1626 return "caching disabled (%s) %.5fs " % ( 

1627 self.execution_options["_cache_disable_reason"], 

1628 now - gen_time, 

1629 ) 

1630 else: 

1631 return "caching disabled %.5fs" % (now - gen_time,) 

1632 elif ch is NO_DIALECT_SUPPORT: 

1633 return "dialect %s+%s does not support caching %.5fs" % ( 

1634 self.dialect.name, 

1635 self.dialect.driver, 

1636 now - gen_time, 

1637 ) 

1638 else: 

1639 return "unknown" 

1640 

1641 @property 

1642 def executemany(self): # type: ignore[override] 

1643 return self.execute_style in ( 

1644 ExecuteStyle.EXECUTEMANY, 

1645 ExecuteStyle.INSERTMANYVALUES, 

1646 ) 

1647 

1648 @util.memoized_property 

1649 def identifier_preparer(self): 

1650 if self.compiled: 

1651 return self.compiled.preparer 

1652 elif "schema_translate_map" in self.execution_options: 

1653 return self.dialect.identifier_preparer._with_schema_translate( 

1654 self.execution_options["schema_translate_map"] 

1655 ) 

1656 else: 

1657 return self.dialect.identifier_preparer 

1658 

1659 @util.memoized_property 

1660 def engine(self): 

1661 return self.root_connection.engine 

1662 

1663 @util.memoized_property 

1664 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1665 if TYPE_CHECKING: 

1666 assert isinstance(self.compiled, SQLCompiler) 

1667 return self.compiled.postfetch 

1668 

1669 @util.memoized_property 

1670 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1671 if TYPE_CHECKING: 

1672 assert isinstance(self.compiled, SQLCompiler) 

1673 if self.isinsert: 

1674 return self.compiled.insert_prefetch 

1675 elif self.isupdate: 

1676 return self.compiled.update_prefetch 

1677 else: 

1678 return () 

1679 

1680 @util.memoized_property 

1681 def no_parameters(self): 

1682 return self.execution_options.get("no_parameters", False) 

1683 

1684 def _execute_scalar( 

1685 self, 

1686 stmt: str, 

1687 type_: Optional[TypeEngine[Any]], 

1688 parameters: Optional[_DBAPISingleExecuteParams] = None, 

1689 ) -> Any: 

1690 """Execute a string statement on the current cursor, returning a 

1691 scalar result. 

1692 

1693 Used to fire off sequences, default phrases, and "select lastrowid" 

1694 types of statements individually or in the context of a parent INSERT 

1695 or UPDATE statement. 

1696 

1697 """ 

1698 

1699 conn = self.root_connection 

1700 

1701 if "schema_translate_map" in self.execution_options: 

1702 schema_translate_map = self.execution_options.get( 

1703 "schema_translate_map", {} 

1704 ) 

1705 

1706 rst = self.identifier_preparer._render_schema_translates 

1707 stmt = rst(stmt, schema_translate_map) 

1708 

1709 if not parameters: 

1710 if self.dialect.positional: 

1711 parameters = self.dialect.execute_sequence_format() 

1712 else: 

1713 parameters = {} 

1714 

1715 conn._cursor_execute(self.cursor, stmt, parameters, context=self) 

1716 row = self.cursor.fetchone() 

1717 if row is not None: 

1718 r = row[0] 

1719 else: 

1720 r = None 

1721 if type_ is not None: 

1722 # apply type post processors to the result 

1723 proc = type_._cached_result_processor( 

1724 self.dialect, self.cursor.description[0][1] 

1725 ) 

1726 if proc: 

1727 return proc(r) 

1728 return r 

1729 

1730 @util.memoized_property 

1731 def connection(self): 

1732 return self.root_connection 

1733 

1734 def _use_server_side_cursor(self): 

1735 if not self.dialect.supports_server_side_cursors: 

1736 return False 

1737 

1738 if self.dialect.server_side_cursors: 

1739 # this is deprecated 

1740 use_server_side = self.execution_options.get( 

1741 "stream_results", True 

1742 ) and ( 

1743 self.compiled 

1744 and isinstance(self.compiled.statement, expression.Selectable) 

1745 or ( 

1746 ( 

1747 not self.compiled 

1748 or isinstance( 

1749 self.compiled.statement, expression.TextClause 

1750 ) 

1751 ) 

1752 and self.unicode_statement 

1753 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement) 

1754 ) 

1755 ) 

1756 else: 

1757 use_server_side = self.execution_options.get( 

1758 "stream_results", False 

1759 ) 

1760 

1761 return use_server_side 

1762 

1763 def create_cursor(self) -> DBAPICursor: 

1764 if ( 

1765 # inlining initial preference checks for SS cursors 

1766 self.dialect.supports_server_side_cursors 

1767 and ( 

1768 self.execution_options.get("stream_results", False) 

1769 or ( 

1770 self.dialect.server_side_cursors 

1771 and self._use_server_side_cursor() 

1772 ) 

1773 ) 

1774 ): 

1775 self._is_server_side = True 

1776 return self.create_server_side_cursor() 

1777 else: 

1778 self._is_server_side = False 

1779 return self.create_default_cursor() 

1780 

1781 def fetchall_for_returning(self, cursor): 

1782 return cursor.fetchall() 

1783 

1784 def create_default_cursor(self) -> DBAPICursor: 

1785 return self._dbapi_connection.cursor() 

1786 

1787 def create_server_side_cursor(self) -> DBAPICursor: 

1788 raise NotImplementedError() 

1789 

1790 def pre_exec(self): 

1791 pass 

1792 

1793 def get_out_parameter_values(self, names): 

1794 raise NotImplementedError( 

1795 "This dialect does not support OUT parameters" 

1796 ) 

1797 

1798 def post_exec(self): 

1799 pass 

1800 

1801 def get_result_processor(self, type_, colname, coltype): 

1802 """Return a 'result processor' for a given type as present in 

1803 cursor.description. 

1804 

1805 This has a default implementation that dialects can override 

1806 for context-sensitive result type handling. 

1807 

1808 """ 

1809 return type_._cached_result_processor(self.dialect, coltype) 

1810 

1811 def get_lastrowid(self): 

1812 """return self.cursor.lastrowid, or equivalent, after an INSERT. 

1813 

1814 This may involve calling special cursor functions, issuing a new SELECT 

1815 on the cursor (or a new one), or returning a stored value that was 

1816 calculated within post_exec(). 

1817 

1818 This function will only be called for dialects which support "implicit" 

1819 primary key generation, keep preexecute_autoincrement_sequences set to 

1820 False, and when no explicit id value was bound to the statement. 

1821 

1822 The function is called once for an INSERT statement that would need to 

1823 return the last inserted primary key for those dialects that make use 

1824 of the lastrowid concept. In these cases, it is called directly after 

1825 :meth:`.ExecutionContext.post_exec`. 

1826 

1827 """ 

1828 return self.cursor.lastrowid 

1829 

1830 def handle_dbapi_exception(self, e): 

1831 pass 

1832 

1833 @util.non_memoized_property 

1834 def rowcount(self) -> int: 

1835 if self._rowcount is not None: 

1836 return self._rowcount 

1837 else: 

1838 return self.cursor.rowcount 

1839 

1840 @property 

1841 def _has_rowcount(self): 

1842 return self._rowcount is not None 

1843 

1844 def supports_sane_rowcount(self): 

1845 return self.dialect.supports_sane_rowcount 

1846 

1847 def supports_sane_multi_rowcount(self): 

1848 return self.dialect.supports_sane_multi_rowcount 

1849 

1850 def _setup_result_proxy(self): 

1851 exec_opt = self.execution_options 

1852 

1853 if self._rowcount is None and exec_opt.get("preserve_rowcount", False): 

1854 self._rowcount = self.cursor.rowcount 

1855 

1856 yp: Optional[Union[int, bool]] 

1857 if self.is_crud or self.is_text: 

1858 result = self._setup_dml_or_text_result() 

1859 yp = False 

1860 else: 

1861 yp = exec_opt.get("yield_per", None) 

1862 sr = self._is_server_side or exec_opt.get("stream_results", False) 

1863 strategy = self.cursor_fetch_strategy 

1864 if sr and strategy is _cursor._DEFAULT_FETCH: 

1865 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1866 self.cursor, self.execution_options 

1867 ) 

1868 cursor_description: _DBAPICursorDescription = ( 

1869 strategy.alternate_cursor_description 

1870 or self.cursor.description 

1871 ) 

1872 if cursor_description is None: 

1873 strategy = _cursor._NO_CURSOR_DQL 

1874 

1875 result = _cursor.CursorResult(self, strategy, cursor_description) 

1876 

1877 compiled = self.compiled 

1878 

1879 if ( 

1880 compiled 

1881 and not self.isddl 

1882 and cast(SQLCompiler, compiled).has_out_parameters 

1883 ): 

1884 self._setup_out_parameters(result) 

1885 

1886 self._soft_closed = result._soft_closed 

1887 

1888 if yp: 

1889 result = result.yield_per(yp) 

1890 

1891 return result 

1892 

1893 def _setup_out_parameters(self, result): 

1894 compiled = cast(SQLCompiler, self.compiled) 

1895 

1896 out_bindparams = [ 

1897 (param, name) 

1898 for param, name in compiled.bind_names.items() 

1899 if param.isoutparam 

1900 ] 

1901 out_parameters = {} 

1902 

1903 for bindparam, raw_value in zip( 

1904 [param for param, name in out_bindparams], 

1905 self.get_out_parameter_values( 

1906 [name for param, name in out_bindparams] 

1907 ), 

1908 ): 

1909 type_ = bindparam.type 

1910 impl_type = type_.dialect_impl(self.dialect) 

1911 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi) 

1912 result_processor = impl_type.result_processor( 

1913 self.dialect, dbapi_type 

1914 ) 

1915 if result_processor is not None: 

1916 raw_value = result_processor(raw_value) 

1917 out_parameters[bindparam.key] = raw_value 

1918 

1919 result.out_parameters = out_parameters 

1920 

1921 def _setup_dml_or_text_result(self): 

1922 compiled = cast(SQLCompiler, self.compiled) 

1923 

1924 strategy: ResultFetchStrategy = self.cursor_fetch_strategy 

1925 

1926 if self.isinsert: 

1927 if ( 

1928 self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1929 and compiled.effective_returning 

1930 ): 

1931 strategy = _cursor.FullyBufferedCursorFetchStrategy( 

1932 self.cursor, 

1933 initial_buffer=self._insertmanyvalues_rows, 

1934 # maintain alt cursor description if set by the 

1935 # dialect, e.g. mssql preserves it 

1936 alternate_description=( 

1937 strategy.alternate_cursor_description 

1938 ), 

1939 ) 

1940 

1941 if compiled.postfetch_lastrowid: 

1942 self.inserted_primary_key_rows = ( 

1943 self._setup_ins_pk_from_lastrowid() 

1944 ) 

1945 # else if not self._is_implicit_returning, 

1946 # the default inserted_primary_key_rows accessor will 

1947 # return an "empty" primary key collection when accessed. 

1948 

1949 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: 

1950 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1951 self.cursor, self.execution_options 

1952 ) 

1953 

1954 if strategy is _cursor._NO_CURSOR_DML: 

1955 cursor_description = None 

1956 else: 

1957 cursor_description = ( 

1958 strategy.alternate_cursor_description 

1959 or self.cursor.description 

1960 ) 

1961 

1962 if cursor_description is None: 

1963 strategy = _cursor._NO_CURSOR_DML 

1964 elif self._num_sentinel_cols: 

1965 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1966 # the sentinel columns are handled in CursorResult._init_metadata 

1967 # using essentially _reduce 

1968 

1969 result: _cursor.CursorResult[Any] = _cursor.CursorResult( 

1970 self, strategy, cursor_description 

1971 ) 

1972 

1973 if self.isinsert: 

1974 if self._is_implicit_returning: 

1975 rows = result.all() 

1976 

1977 self.returned_default_rows = rows 

1978 

1979 self.inserted_primary_key_rows = ( 

1980 self._setup_ins_pk_from_implicit_returning(result, rows) 

1981 ) 

1982 

1983 # test that it has a cursor metadata that is accurate. the 

1984 # first row will have been fetched and current assumptions 

1985 # are that the result has only one row, until executemany() 

1986 # support is added here. 

1987 assert result._metadata.returns_rows 

1988 

1989 # Insert statement has both return_defaults() and 

1990 # returning(). rewind the result on the list of rows 

1991 # we just used. 

1992 if self._is_supplemental_returning: 

1993 result._rewind(rows) 

1994 else: 

1995 result._soft_close() 

1996 elif not self._is_explicit_returning: 

1997 result._soft_close() 

1998 

1999 # we assume here the result does not return any rows. 

2000 # *usually*, this will be true. However, some dialects 

2001 # such as that of MSSQL/pyodbc need to SELECT a post fetch 

2002 # function so this is not necessarily true. 

2003 # assert not result.returns_rows 

2004 

2005 elif self._is_implicit_returning: 

2006 rows = result.all() 

2007 

2008 if rows: 

2009 self.returned_default_rows = rows 

2010 self._rowcount = len(rows) 

2011 

2012 if self._is_supplemental_returning: 

2013 result._rewind(rows) 

2014 else: 

2015 result._soft_close() 

2016 

2017 # test that it has a cursor metadata that is accurate. 

2018 # the rows have all been fetched however. 

2019 assert result._metadata.returns_rows 

2020 

2021 elif not result._metadata.returns_rows: 

2022 # no results, get rowcount 

2023 # (which requires open cursor on some drivers) 

2024 if self._rowcount is None: 

2025 self._rowcount = self.cursor.rowcount 

2026 result._soft_close() 

2027 elif self.isupdate or self.isdelete: 

2028 if self._rowcount is None: 

2029 self._rowcount = self.cursor.rowcount 

2030 return result 

2031 

2032 @util.memoized_property 

2033 def inserted_primary_key_rows(self): 

2034 # if no specific "get primary key" strategy was set up 

2035 # during execution, return a "default" primary key based 

2036 # on what's in the compiled_parameters and nothing else. 

2037 return self._setup_ins_pk_from_empty() 

2038 

2039 def _setup_ins_pk_from_lastrowid(self): 

2040 getter = cast( 

2041 SQLCompiler, self.compiled 

2042 )._inserted_primary_key_from_lastrowid_getter 

2043 lastrowid = self.get_lastrowid() 

2044 return [getter(lastrowid, self.compiled_parameters[0])] 

2045 

2046 def _setup_ins_pk_from_empty(self): 

2047 getter = cast( 

2048 SQLCompiler, self.compiled 

2049 )._inserted_primary_key_from_lastrowid_getter 

2050 return [getter(None, param) for param in self.compiled_parameters] 

2051 

2052 def _setup_ins_pk_from_implicit_returning(self, result, rows): 

2053 if not rows: 

2054 return [] 

2055 

2056 getter = cast( 

2057 SQLCompiler, self.compiled 

2058 )._inserted_primary_key_from_returning_getter 

2059 compiled_params = self.compiled_parameters 

2060 

2061 return [ 

2062 getter(row, param) for row, param in zip(rows, compiled_params) 

2063 ] 

2064 

2065 def lastrow_has_defaults(self): 

2066 return (self.isinsert or self.isupdate) and bool( 

2067 cast(SQLCompiler, self.compiled).postfetch 

2068 ) 

2069 

2070 def _prepare_set_input_sizes( 

2071 self, 

2072 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]: 

2073 """Given a cursor and ClauseParameters, prepare arguments 

2074 in order to call the appropriate 

2075 style of ``setinputsizes()`` on the cursor, using DB-API types 

2076 from the bind parameter's ``TypeEngine`` objects. 

2077 

2078 This method only called by those dialects which set the 

2079 :attr:`.Dialect.bind_typing` attribute to 

2080 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are 

2081 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an 

2082 option. 

2083 

2084 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used 

2085 for pg8000 and asyncpg, which has been changed to inline rendering 

2086 of casts. 

2087 

2088 """ 

2089 if self.isddl or self.is_text: 

2090 return None 

2091 

2092 compiled = cast(SQLCompiler, self.compiled) 

2093 

2094 inputsizes = compiled._get_set_input_sizes_lookup() 

2095 

2096 if inputsizes is None: 

2097 return None 

2098 

2099 dialect = self.dialect 

2100 

2101 # all of the rest of this... cython? 

2102 

2103 if dialect._has_events: 

2104 inputsizes = dict(inputsizes) 

2105 dialect.dispatch.do_setinputsizes( 

2106 inputsizes, self.cursor, self.statement, self.parameters, self 

2107 ) 

2108 

2109 if compiled.escaped_bind_names: 

2110 escaped_bind_names = compiled.escaped_bind_names 

2111 else: 

2112 escaped_bind_names = None 

2113 

2114 if dialect.positional: 

2115 items = [ 

2116 (key, compiled.binds[key]) 

2117 for key in compiled.positiontup or () 

2118 ] 

2119 else: 

2120 items = [ 

2121 (key, bindparam) 

2122 for bindparam, key in compiled.bind_names.items() 

2123 ] 

2124 

2125 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = [] 

2126 for key, bindparam in items: 

2127 if bindparam in compiled.literal_execute_params: 

2128 continue 

2129 

2130 if key in self._expanded_parameters: 

2131 if is_tuple_type(bindparam.type): 

2132 num = len(bindparam.type.types) 

2133 dbtypes = inputsizes[bindparam] 

2134 generic_inputsizes.extend( 

2135 ( 

2136 ( 

2137 escaped_bind_names.get(paramname, paramname) 

2138 if escaped_bind_names is not None 

2139 else paramname 

2140 ), 

2141 dbtypes[idx % num], 

2142 bindparam.type.types[idx % num], 

2143 ) 

2144 for idx, paramname in enumerate( 

2145 self._expanded_parameters[key] 

2146 ) 

2147 ) 

2148 else: 

2149 dbtype = inputsizes.get(bindparam, None) 

2150 generic_inputsizes.extend( 

2151 ( 

2152 ( 

2153 escaped_bind_names.get(paramname, paramname) 

2154 if escaped_bind_names is not None 

2155 else paramname 

2156 ), 

2157 dbtype, 

2158 bindparam.type, 

2159 ) 

2160 for paramname in self._expanded_parameters[key] 

2161 ) 

2162 else: 

2163 dbtype = inputsizes.get(bindparam, None) 

2164 

2165 escaped_name = ( 

2166 escaped_bind_names.get(key, key) 

2167 if escaped_bind_names is not None 

2168 else key 

2169 ) 

2170 

2171 generic_inputsizes.append( 

2172 (escaped_name, dbtype, bindparam.type) 

2173 ) 

2174 

2175 return generic_inputsizes 

2176 

2177 def _exec_default(self, column, default, type_): 

2178 if default.is_sequence: 

2179 return self.fire_sequence(default, type_) 

2180 elif default.is_callable: 

2181 # this codepath is not normally used as it's inlined 

2182 # into _process_execute_defaults 

2183 self.current_column = column 

2184 return default.arg(self) 

2185 elif default.is_clause_element: 

2186 return self._exec_default_clause_element(column, default, type_) 

2187 else: 

2188 # this codepath is not normally used as it's inlined 

2189 # into _process_execute_defaults 

2190 return default.arg 

2191 

2192 def _exec_default_clause_element(self, column, default, type_): 

2193 # execute a default that's a complete clause element. Here, we have 

2194 # to re-implement a miniature version of the compile->parameters-> 

2195 # cursor.execute() sequence, since we don't want to modify the state 

2196 # of the connection / result in progress or create new connection/ 

2197 # result objects etc. 

2198 # .. versionchanged:: 1.4 

2199 

2200 if not default._arg_is_typed: 

2201 default_arg = expression.type_coerce(default.arg, type_) 

2202 else: 

2203 default_arg = default.arg 

2204 compiled = expression.select(default_arg).compile(dialect=self.dialect) 

2205 compiled_params = compiled.construct_params() 

2206 processors = compiled._bind_processors 

2207 if compiled.positional: 

2208 parameters = self.dialect.execute_sequence_format( 

2209 [ 

2210 ( 

2211 processors[key](compiled_params[key]) # type: ignore 

2212 if key in processors 

2213 else compiled_params[key] 

2214 ) 

2215 for key in compiled.positiontup or () 

2216 ] 

2217 ) 

2218 else: 

2219 parameters = { 

2220 key: ( 

2221 processors[key](compiled_params[key]) # type: ignore 

2222 if key in processors 

2223 else compiled_params[key] 

2224 ) 

2225 for key in compiled_params 

2226 } 

2227 return self._execute_scalar( 

2228 str(compiled), type_, parameters=parameters 

2229 ) 

2230 

2231 current_parameters: Optional[_CoreSingleExecuteParams] = None 

2232 """A dictionary of parameters applied to the current row. 

2233 

2234 This attribute is only available in the context of a user-defined default 

2235 generation function, e.g. as described at :ref:`context_default_functions`. 

2236 It consists of a dictionary which includes entries for each column/value 

2237 pair that is to be part of the INSERT or UPDATE statement. The keys of the 

2238 dictionary will be the key value of each :class:`_schema.Column`, 

2239 which is usually 

2240 synonymous with the name. 

2241 

2242 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute 

2243 does not accommodate for the "multi-values" feature of the 

2244 :meth:`_expression.Insert.values` method. The 

2245 :meth:`.DefaultExecutionContext.get_current_parameters` method should be 

2246 preferred. 

2247 

2248 .. seealso:: 

2249 

2250 :meth:`.DefaultExecutionContext.get_current_parameters` 

2251 

2252 :ref:`context_default_functions` 

2253 

2254 """ 

2255 

2256 def get_current_parameters(self, isolate_multiinsert_groups=True): 

2257 """Return a dictionary of parameters applied to the current row. 

2258 

2259 This method can only be used in the context of a user-defined default 

2260 generation function, e.g. as described at 

2261 :ref:`context_default_functions`. When invoked, a dictionary is 

2262 returned which includes entries for each column/value pair that is part 

2263 of the INSERT or UPDATE statement. The keys of the dictionary will be 

2264 the key value of each :class:`_schema.Column`, 

2265 which is usually synonymous 

2266 with the name. 

2267 

2268 :param isolate_multiinsert_groups=True: indicates that multi-valued 

2269 INSERT constructs created using :meth:`_expression.Insert.values` 

2270 should be 

2271 handled by returning only the subset of parameters that are local 

2272 to the current column default invocation. When ``False``, the 

2273 raw parameters of the statement are returned including the 

2274 naming convention used in the case of multi-valued INSERT. 

2275 

2276 .. seealso:: 

2277 

2278 :attr:`.DefaultExecutionContext.current_parameters` 

2279 

2280 :ref:`context_default_functions` 

2281 

2282 """ 

2283 try: 

2284 parameters = self.current_parameters 

2285 column = self.current_column 

2286 except AttributeError: 

2287 raise exc.InvalidRequestError( 

2288 "get_current_parameters() can only be invoked in the " 

2289 "context of a Python side column default function" 

2290 ) 

2291 else: 

2292 assert column is not None 

2293 assert parameters is not None 

2294 compile_state = cast( 

2295 "DMLState", cast(SQLCompiler, self.compiled).compile_state 

2296 ) 

2297 assert compile_state is not None 

2298 if ( 

2299 isolate_multiinsert_groups 

2300 and dml.isinsert(compile_state) 

2301 and compile_state._has_multi_parameters 

2302 ): 

2303 if column._is_multiparam_column: 

2304 index = column.index + 1 

2305 d = {column.original.key: parameters[column.key]} 

2306 else: 

2307 d = {column.key: parameters[column.key]} 

2308 index = 0 

2309 assert compile_state._dict_parameters is not None 

2310 keys = compile_state._dict_parameters.keys() 

2311 d.update( 

2312 (key, parameters["%s_m%d" % (key, index)]) for key in keys 

2313 ) 

2314 return d 

2315 else: 

2316 return parameters 

2317 

2318 def get_insert_default(self, column): 

2319 if column.default is None: 

2320 return None 

2321 else: 

2322 return self._exec_default(column, column.default, column.type) 

2323 

2324 def get_update_default(self, column): 

2325 if column.onupdate is None: 

2326 return None 

2327 else: 

2328 return self._exec_default(column, column.onupdate, column.type) 

2329 

2330 def _process_execute_defaults(self): 

2331 compiled = cast(SQLCompiler, self.compiled) 

2332 

2333 key_getter = compiled._within_exec_param_key_getter 

2334 

2335 sentinel_counter = 0 

2336 

2337 if compiled.insert_prefetch: 

2338 prefetch_recs = [ 

2339 ( 

2340 c, 

2341 key_getter(c), 

2342 c._default_description_tuple, 

2343 self.get_insert_default, 

2344 ) 

2345 for c in compiled.insert_prefetch 

2346 ] 

2347 elif compiled.update_prefetch: 

2348 prefetch_recs = [ 

2349 ( 

2350 c, 

2351 key_getter(c), 

2352 c._onupdate_description_tuple, 

2353 self.get_update_default, 

2354 ) 

2355 for c in compiled.update_prefetch 

2356 ] 

2357 else: 

2358 prefetch_recs = [] 

2359 

2360 for param in self.compiled_parameters: 

2361 self.current_parameters = param 

2362 

2363 for ( 

2364 c, 

2365 param_key, 

2366 (arg, is_scalar, is_callable, is_sentinel), 

2367 fallback, 

2368 ) in prefetch_recs: 

2369 if is_sentinel: 

2370 param[param_key] = sentinel_counter 

2371 sentinel_counter += 1 

2372 elif is_scalar: 

2373 param[param_key] = arg 

2374 elif is_callable: 

2375 self.current_column = c 

2376 param[param_key] = arg(self) 

2377 else: 

2378 val = fallback(c) 

2379 if val is not None: 

2380 param[param_key] = val 

2381 

2382 del self.current_parameters 

2383 

2384 

2385DefaultDialect.execution_ctx_cls = DefaultExecutionContext