Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1048 statements  

1# engine/default.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Default implementations of per-dialect sqlalchemy.engine classes. 

10 

11These are semi-private implementation classes which are only of importance 

12to database dialect authors; dialects will usually use the classes here 

13as the base class for their own corresponding classes. 

14 

15""" 

16 

17from __future__ import annotations 

18 

19import functools 

20import operator 

21import random 

22import re 

23from time import perf_counter 

24import typing 

25from typing import Any 

26from typing import Callable 

27from typing import cast 

28from typing import Dict 

29from typing import List 

30from typing import Mapping 

31from typing import MutableMapping 

32from typing import MutableSequence 

33from typing import Optional 

34from typing import Sequence 

35from typing import Set 

36from typing import Tuple 

37from typing import Type 

38from typing import TYPE_CHECKING 

39from typing import Union 

40import weakref 

41 

42from . import characteristics 

43from . import cursor as _cursor 

44from . import interfaces 

45from .base import Connection 

46from .interfaces import CacheStats 

47from .interfaces import DBAPICursor 

48from .interfaces import Dialect 

49from .interfaces import ExecuteStyle 

50from .interfaces import ExecutionContext 

51from .reflection import ObjectKind 

52from .reflection import ObjectScope 

53from .. import event 

54from .. import exc 

55from .. import pool 

56from .. import util 

57from ..sql import compiler 

58from ..sql import dml 

59from ..sql import expression 

60from ..sql import type_api 

61from ..sql import util as sql_util 

62from ..sql._typing import is_tuple_type 

63from ..sql.base import _NoArg 

64from ..sql.compiler import DDLCompiler 

65from ..sql.compiler import InsertmanyvaluesSentinelOpts 

66from ..sql.compiler import SQLCompiler 

67from ..sql.elements import quoted_name 

68from ..util.typing import Final 

69from ..util.typing import Literal 

70 

71if typing.TYPE_CHECKING: 

72 from types import ModuleType 

73 

74 from .base import Engine 

75 from .cursor import ResultFetchStrategy 

76 from .interfaces import _CoreMultiExecuteParams 

77 from .interfaces import _CoreSingleExecuteParams 

78 from .interfaces import _DBAPICursorDescription 

79 from .interfaces import _DBAPIMultiExecuteParams 

80 from .interfaces import _DBAPISingleExecuteParams 

81 from .interfaces import _ExecuteOptions 

82 from .interfaces import _MutableCoreSingleExecuteParams 

83 from .interfaces import _ParamStyle 

84 from .interfaces import ConnectArgsType 

85 from .interfaces import DBAPIConnection 

86 from .interfaces import DBAPIModule 

87 from .interfaces import DBAPIType 

88 from .interfaces import IsolationLevel 

89 from .row import Row 

90 from .url import URL 

91 from ..event import _ListenerFnType 

92 from ..pool import Pool 

93 from ..pool import PoolProxiedConnection 

94 from ..sql import Executable 

95 from ..sql.compiler import Compiled 

96 from ..sql.compiler import Linting 

97 from ..sql.compiler import ResultColumnsEntry 

98 from ..sql.dml import DMLState 

99 from ..sql.dml import UpdateBase 

100 from ..sql.elements import BindParameter 

101 from ..sql.schema import Column 

102 from ..sql.type_api import _BindProcessorType 

103 from ..sql.type_api import _ResultProcessorType 

104 from ..sql.type_api import TypeEngine 

105 

106 

107# When we're handed literal SQL, ensure it's a SELECT query 

108SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE) 

109 

110 

111( 

112 CACHE_HIT, 

113 CACHE_MISS, 

114 CACHING_DISABLED, 

115 NO_CACHE_KEY, 

116 NO_DIALECT_SUPPORT, 

117) = list(CacheStats) 

118 

119 

120class DefaultDialect(Dialect): 

121 """Default implementation of Dialect""" 

122 

123 statement_compiler = compiler.SQLCompiler 

124 ddl_compiler = compiler.DDLCompiler 

125 type_compiler_cls = compiler.GenericTypeCompiler 

126 

127 preparer = compiler.IdentifierPreparer 

128 supports_alter = True 

129 supports_comments = False 

130 supports_constraint_comments = False 

131 inline_comments = False 

132 supports_statement_cache = True 

133 

134 div_is_floordiv = True 

135 

136 bind_typing = interfaces.BindTyping.NONE 

137 

138 include_set_input_sizes: Optional[Set[Any]] = None 

139 exclude_set_input_sizes: Optional[Set[Any]] = None 

140 

141 # the first value we'd get for an autoincrement column. 

142 default_sequence_base = 1 

143 

144 # most DBAPIs happy with this for execute(). 

145 # not cx_oracle. 

146 execute_sequence_format = tuple 

147 

148 supports_schemas = True 

149 supports_views = True 

150 supports_sequences = False 

151 sequences_optional = False 

152 preexecute_autoincrement_sequences = False 

153 supports_identity_columns = False 

154 postfetch_lastrowid = True 

155 favor_returning_over_lastrowid = False 

156 insert_null_pk_still_autoincrements = False 

157 update_returning = False 

158 delete_returning = False 

159 update_returning_multifrom = False 

160 delete_returning_multifrom = False 

161 insert_returning = False 

162 

163 cte_follows_insert = False 

164 

165 supports_native_enum = False 

166 supports_native_boolean = False 

167 supports_native_uuid = False 

168 returns_native_bytes = False 

169 

170 non_native_boolean_check_constraint = True 

171 

172 supports_simple_order_by_label = True 

173 

174 tuple_in_values = False 

175 

176 connection_characteristics = util.immutabledict( 

177 { 

178 "isolation_level": characteristics.IsolationLevelCharacteristic(), 

179 "logging_token": characteristics.LoggingTokenCharacteristic(), 

180 } 

181 ) 

182 

183 engine_config_types: Mapping[str, Any] = util.immutabledict( 

184 { 

185 "pool_timeout": util.asint, 

186 "echo": util.bool_or_str("debug"), 

187 "echo_pool": util.bool_or_str("debug"), 

188 "pool_recycle": util.asint, 

189 "pool_size": util.asint, 

190 "max_overflow": util.asint, 

191 "future": util.asbool, 

192 } 

193 ) 

194 

195 # if the NUMERIC type 

196 # returns decimal.Decimal. 

197 # *not* the FLOAT type however. 

198 supports_native_decimal = False 

199 

200 name = "default" 

201 

202 # length at which to truncate 

203 # any identifier. 

204 max_identifier_length = 9999 

205 _user_defined_max_identifier_length: Optional[int] = None 

206 

207 isolation_level: Optional[str] = None 

208 

209 # sub-categories of max_identifier_length. 

210 # currently these accommodate for MySQL which allows alias names 

211 # of 255 but DDL names only of 64. 

212 max_index_name_length: Optional[int] = None 

213 max_constraint_name_length: Optional[int] = None 

214 

215 supports_sane_rowcount = True 

216 supports_sane_multi_rowcount = True 

217 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {} 

218 default_paramstyle = "named" 

219 

220 supports_default_values = False 

221 """dialect supports INSERT... DEFAULT VALUES syntax""" 

222 

223 supports_default_metavalue = False 

224 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

225 

226 default_metavalue_token = "DEFAULT" 

227 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

228 parenthesis.""" 

229 

230 # not sure if this is a real thing but the compiler will deliver it 

231 # if this is the only flag enabled. 

232 supports_empty_insert = True 

233 """dialect supports INSERT () VALUES ()""" 

234 

235 supports_multivalues_insert = False 

236 

237 use_insertmanyvalues: bool = False 

238 

239 use_insertmanyvalues_wo_returning: bool = False 

240 

241 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = ( 

242 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED 

243 ) 

244 

245 insertmanyvalues_page_size: int = 1000 

246 insertmanyvalues_max_parameters = 32700 

247 

248 supports_is_distinct_from = True 

249 

250 supports_server_side_cursors = False 

251 

252 server_side_cursors = False 

253 

254 # extra record-level locking features (#4860) 

255 supports_for_update_of = False 

256 

257 server_version_info = None 

258 

259 default_schema_name: Optional[str] = None 

260 

261 # indicates symbol names are 

262 # UPPERCASED if they are case insensitive 

263 # within the database. 

264 # if this is True, the methods normalize_name() 

265 # and denormalize_name() must be provided. 

266 requires_name_normalize = False 

267 

268 is_async = False 

269 

270 has_terminate = False 

271 

272 # TODO: this is not to be part of 2.0. implement rudimentary binary 

273 # literals for SQLite, PostgreSQL, MySQL only within 

274 # _Binary.literal_processor 

275 _legacy_binary_type_literal_encoding = "utf-8" 

276 

277 @util.deprecated_params( 

278 empty_in_strategy=( 

279 "1.4", 

280 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is " 

281 "deprecated, and no longer has any effect. All IN expressions " 

282 "are now rendered using " 

283 'the "expanding parameter" strategy which renders a set of bound' 

284 'expressions, or an "empty set" SELECT, at statement execution' 

285 "time.", 

286 ), 

287 server_side_cursors=( 

288 "1.4", 

289 "The :paramref:`_sa.create_engine.server_side_cursors` parameter " 

290 "is deprecated and will be removed in a future release. Please " 

291 "use the " 

292 ":paramref:`_engine.Connection.execution_options.stream_results` " 

293 "parameter.", 

294 ), 

295 ) 

296 def __init__( 

297 self, 

298 paramstyle: Optional[_ParamStyle] = None, 

299 isolation_level: Optional[IsolationLevel] = None, 

300 dbapi: Optional[ModuleType] = None, 

301 implicit_returning: Literal[True] = True, 

302 supports_native_boolean: Optional[bool] = None, 

303 max_identifier_length: Optional[int] = None, 

304 label_length: Optional[int] = None, 

305 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG, 

306 use_insertmanyvalues: Optional[bool] = None, 

307 # util.deprecated_params decorator cannot render the 

308 # Linting.NO_LINTING constant 

309 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore 

310 server_side_cursors: bool = False, 

311 skip_autocommit_rollback: bool = False, 

312 **kwargs: Any, 

313 ): 

314 if server_side_cursors: 

315 if not self.supports_server_side_cursors: 

316 raise exc.ArgumentError( 

317 "Dialect %s does not support server side cursors" % self 

318 ) 

319 else: 

320 self.server_side_cursors = True 

321 

322 if getattr(self, "use_setinputsizes", False): 

323 util.warn_deprecated( 

324 "The dialect-level use_setinputsizes attribute is " 

325 "deprecated. Please use " 

326 "bind_typing = BindTyping.SETINPUTSIZES", 

327 "2.0", 

328 ) 

329 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES 

330 

331 self.positional = False 

332 self._ischema = None 

333 

334 self.dbapi = dbapi 

335 

336 self.skip_autocommit_rollback = skip_autocommit_rollback 

337 

338 if paramstyle is not None: 

339 self.paramstyle = paramstyle 

340 elif self.dbapi is not None: 

341 self.paramstyle = self.dbapi.paramstyle 

342 else: 

343 self.paramstyle = self.default_paramstyle 

344 self.positional = self.paramstyle in ( 

345 "qmark", 

346 "format", 

347 "numeric", 

348 "numeric_dollar", 

349 ) 

350 self.identifier_preparer = self.preparer(self) 

351 self._on_connect_isolation_level = isolation_level 

352 

353 legacy_tt_callable = getattr(self, "type_compiler", None) 

354 if legacy_tt_callable is not None: 

355 tt_callable = cast( 

356 Type[compiler.GenericTypeCompiler], 

357 self.type_compiler, 

358 ) 

359 else: 

360 tt_callable = self.type_compiler_cls 

361 

362 self.type_compiler_instance = self.type_compiler = tt_callable(self) 

363 

364 if supports_native_boolean is not None: 

365 self.supports_native_boolean = supports_native_boolean 

366 

367 self._user_defined_max_identifier_length = max_identifier_length 

368 if self._user_defined_max_identifier_length: 

369 self.max_identifier_length = ( 

370 self._user_defined_max_identifier_length 

371 ) 

372 self.label_length = label_length 

373 self.compiler_linting = compiler_linting 

374 

375 if use_insertmanyvalues is not None: 

376 self.use_insertmanyvalues = use_insertmanyvalues 

377 

378 if insertmanyvalues_page_size is not _NoArg.NO_ARG: 

379 self.insertmanyvalues_page_size = insertmanyvalues_page_size 

380 

381 @property 

382 @util.deprecated( 

383 "2.0", 

384 "full_returning is deprecated, please use insert_returning, " 

385 "update_returning, delete_returning", 

386 ) 

387 def full_returning(self): 

388 return ( 

389 self.insert_returning 

390 and self.update_returning 

391 and self.delete_returning 

392 ) 

393 

394 @util.memoized_property 

395 def insert_executemany_returning(self): 

396 """Default implementation for insert_executemany_returning, if not 

397 otherwise overridden by the specific dialect. 

398 

399 The default dialect determines "insert_executemany_returning" is 

400 available if the dialect in use has opted into using the 

401 "use_insertmanyvalues" feature. If they haven't opted into that, then 

402 this attribute is False, unless the dialect in question overrides this 

403 and provides some other implementation (such as the Oracle Database 

404 dialects). 

405 

406 """ 

407 return self.insert_returning and self.use_insertmanyvalues 

408 

409 @util.memoized_property 

410 def insert_executemany_returning_sort_by_parameter_order(self): 

411 """Default implementation for 

412 insert_executemany_returning_deterministic_order, if not otherwise 

413 overridden by the specific dialect. 

414 

415 The default dialect determines "insert_executemany_returning" can have 

416 deterministic order only if the dialect in use has opted into using the 

417 "use_insertmanyvalues" feature, which implements deterministic ordering 

418 using client side sentinel columns only by default. The 

419 "insertmanyvalues" feature also features alternate forms that can 

420 use server-generated PK values as "sentinels", but those are only 

421 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel` 

422 bitflag enables those alternate SQL forms, which are disabled 

423 by default. 

424 

425 If the dialect in use hasn't opted into that, then this attribute is 

426 False, unless the dialect in question overrides this and provides some 

427 other implementation (such as the Oracle Database dialects). 

428 

429 """ 

430 return self.insert_returning and self.use_insertmanyvalues 

431 

432 update_executemany_returning = False 

433 delete_executemany_returning = False 

434 

435 @util.memoized_property 

436 def loaded_dbapi(self) -> DBAPIModule: 

437 if self.dbapi is None: 

438 raise exc.InvalidRequestError( 

439 f"Dialect {self} does not have a Python DBAPI established " 

440 "and cannot be used for actual database interaction" 

441 ) 

442 return self.dbapi 

443 

444 @util.memoized_property 

445 def _bind_typing_render_casts(self): 

446 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS 

447 

448 def _ensure_has_table_connection(self, arg: Connection) -> None: 

449 if not isinstance(arg, Connection): 

450 raise exc.ArgumentError( 

451 "The argument passed to Dialect.has_table() should be a " 

452 "%s, got %s. " 

453 "Additionally, the Dialect.has_table() method is for " 

454 "internal dialect " 

455 "use only; please use " 

456 "``inspect(some_engine).has_table(<tablename>>)`` " 

457 "for public API use." % (Connection, type(arg)) 

458 ) 

459 

460 @util.memoized_property 

461 def _supports_statement_cache(self): 

462 ssc = self.__class__.__dict__.get("supports_statement_cache", None) 

463 if ssc is None: 

464 util.warn( 

465 "Dialect %s:%s will not make use of SQL compilation caching " 

466 "as it does not set the 'supports_statement_cache' attribute " 

467 "to ``True``. This can have " 

468 "significant performance implications including some " 

469 "performance degradations in comparison to prior SQLAlchemy " 

470 "versions. Dialect maintainers should seek to set this " 

471 "attribute to True after appropriate development and testing " 

472 "for SQLAlchemy 1.4 caching support. Alternatively, this " 

473 "attribute may be set to False which will disable this " 

474 "warning." % (self.name, self.driver), 

475 code="cprf", 

476 ) 

477 

478 return bool(ssc) 

479 

480 @util.memoized_property 

481 def _type_memos(self): 

482 return weakref.WeakKeyDictionary() 

483 

484 @property 

485 def dialect_description(self): # type: ignore[override] 

486 return self.name + "+" + self.driver 

487 

488 @property 

489 def supports_sane_rowcount_returning(self): 

490 """True if this dialect supports sane rowcount even if RETURNING is 

491 in use. 

492 

493 For dialects that don't support RETURNING, this is synonymous with 

494 ``supports_sane_rowcount``. 

495 

496 """ 

497 return self.supports_sane_rowcount 

498 

499 @classmethod 

500 def get_pool_class(cls, url: URL) -> Type[Pool]: 

501 return getattr(cls, "poolclass", pool.QueuePool) 

502 

503 def get_dialect_pool_class(self, url: URL) -> Type[Pool]: 

504 return self.get_pool_class(url) 

505 

506 @classmethod 

507 def load_provisioning(cls): 

508 package = ".".join(cls.__module__.split(".")[0:-1]) 

509 try: 

510 __import__(package + ".provision") 

511 except ImportError: 

512 pass 

513 

514 def _builtin_onconnect(self) -> Optional[_ListenerFnType]: 

515 if self._on_connect_isolation_level is not None: 

516 

517 def builtin_connect(dbapi_conn, conn_rec): 

518 self._assert_and_set_isolation_level( 

519 dbapi_conn, self._on_connect_isolation_level 

520 ) 

521 

522 return builtin_connect 

523 else: 

524 return None 

525 

526 def initialize(self, connection: Connection) -> None: 

527 try: 

528 self.server_version_info = self._get_server_version_info( 

529 connection 

530 ) 

531 except NotImplementedError: 

532 self.server_version_info = None 

533 try: 

534 self.default_schema_name = self._get_default_schema_name( 

535 connection 

536 ) 

537 except NotImplementedError: 

538 self.default_schema_name = None 

539 

540 try: 

541 self.default_isolation_level = self.get_default_isolation_level( 

542 connection.connection.dbapi_connection 

543 ) 

544 except NotImplementedError: 

545 self.default_isolation_level = None 

546 

547 if not self._user_defined_max_identifier_length: 

548 max_ident_length = self._check_max_identifier_length(connection) 

549 if max_ident_length: 

550 self.max_identifier_length = max_ident_length 

551 

552 if ( 

553 self.label_length 

554 and self.label_length > self.max_identifier_length 

555 ): 

556 raise exc.ArgumentError( 

557 "Label length of %d is greater than this dialect's" 

558 " maximum identifier length of %d" 

559 % (self.label_length, self.max_identifier_length) 

560 ) 

561 

562 def on_connect(self) -> Optional[Callable[[Any], None]]: 

563 # inherits the docstring from interfaces.Dialect.on_connect 

564 return None 

565 

566 def _check_max_identifier_length(self, connection): 

567 """Perform a connection / server version specific check to determine 

568 the max_identifier_length. 

569 

570 If the dialect's class level max_identifier_length should be used, 

571 can return None. 

572 

573 .. versionadded:: 1.3.9 

574 

575 """ 

576 return None 

577 

578 def get_default_isolation_level(self, dbapi_conn): 

579 """Given a DBAPI connection, return its isolation level, or 

580 a default isolation level if one cannot be retrieved. 

581 

582 May be overridden by subclasses in order to provide a 

583 "fallback" isolation level for databases that cannot reliably 

584 retrieve the actual isolation level. 

585 

586 By default, calls the :meth:`_engine.Interfaces.get_isolation_level` 

587 method, propagating any exceptions raised. 

588 

589 .. versionadded:: 1.3.22 

590 

591 """ 

592 return self.get_isolation_level(dbapi_conn) 

593 

594 def type_descriptor(self, typeobj): 

595 """Provide a database-specific :class:`.TypeEngine` object, given 

596 the generic object which comes from the types module. 

597 

598 This method looks for a dictionary called 

599 ``colspecs`` as a class or instance-level variable, 

600 and passes on to :func:`_types.adapt_type`. 

601 

602 """ 

603 return type_api.adapt_type(typeobj, self.colspecs) 

604 

605 def has_index(self, connection, table_name, index_name, schema=None, **kw): 

606 if not self.has_table(connection, table_name, schema=schema, **kw): 

607 return False 

608 for idx in self.get_indexes( 

609 connection, table_name, schema=schema, **kw 

610 ): 

611 if idx["name"] == index_name: 

612 return True 

613 else: 

614 return False 

615 

616 def has_schema( 

617 self, connection: Connection, schema_name: str, **kw: Any 

618 ) -> bool: 

619 return schema_name in self.get_schema_names(connection, **kw) 

620 

621 def validate_identifier(self, ident: str) -> None: 

622 if len(ident) > self.max_identifier_length: 

623 raise exc.IdentifierError( 

624 "Identifier '%s' exceeds maximum length of %d characters" 

625 % (ident, self.max_identifier_length) 

626 ) 

627 

628 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection: 

629 # inherits the docstring from interfaces.Dialect.connect 

630 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501 

631 

632 def create_connect_args(self, url: URL) -> ConnectArgsType: 

633 # inherits the docstring from interfaces.Dialect.create_connect_args 

634 opts = url.translate_connect_args() 

635 opts.update(url.query) 

636 return ([], opts) 

637 

638 def set_engine_execution_options( 

639 self, engine: Engine, opts: Mapping[str, Any] 

640 ) -> None: 

641 supported_names = set(self.connection_characteristics).intersection( 

642 opts 

643 ) 

644 if supported_names: 

645 characteristics: Mapping[str, Any] = util.immutabledict( 

646 (name, opts[name]) for name in supported_names 

647 ) 

648 

649 @event.listens_for(engine, "engine_connect") 

650 def set_connection_characteristics(connection): 

651 self._set_connection_characteristics( 

652 connection, characteristics 

653 ) 

654 

655 def set_connection_execution_options( 

656 self, connection: Connection, opts: Mapping[str, Any] 

657 ) -> None: 

658 supported_names = set(self.connection_characteristics).intersection( 

659 opts 

660 ) 

661 if supported_names: 

662 characteristics: Mapping[str, Any] = util.immutabledict( 

663 (name, opts[name]) for name in supported_names 

664 ) 

665 self._set_connection_characteristics(connection, characteristics) 

666 

667 def _set_connection_characteristics(self, connection, characteristics): 

668 characteristic_values = [ 

669 (name, self.connection_characteristics[name], value) 

670 for name, value in characteristics.items() 

671 ] 

672 

673 if connection.in_transaction(): 

674 trans_objs = [ 

675 (name, obj) 

676 for name, obj, _ in characteristic_values 

677 if obj.transactional 

678 ] 

679 if trans_objs: 

680 raise exc.InvalidRequestError( 

681 "This connection has already initialized a SQLAlchemy " 

682 "Transaction() object via begin() or autobegin; " 

683 "%s may not be altered unless rollback() or commit() " 

684 "is called first." 

685 % (", ".join(name for name, obj in trans_objs)) 

686 ) 

687 

688 dbapi_connection = connection.connection.dbapi_connection 

689 for _, characteristic, value in characteristic_values: 

690 characteristic.set_connection_characteristic( 

691 self, connection, dbapi_connection, value 

692 ) 

693 connection.connection._connection_record.finalize_callback.append( 

694 functools.partial(self._reset_characteristics, characteristics) 

695 ) 

696 

697 def _reset_characteristics(self, characteristics, dbapi_connection): 

698 for characteristic_name in characteristics: 

699 characteristic = self.connection_characteristics[ 

700 characteristic_name 

701 ] 

702 characteristic.reset_characteristic(self, dbapi_connection) 

703 

704 def do_begin(self, dbapi_connection): 

705 pass 

706 

707 def do_rollback(self, dbapi_connection): 

708 if self.skip_autocommit_rollback and self.detect_autocommit_setting( 

709 dbapi_connection 

710 ): 

711 return 

712 dbapi_connection.rollback() 

713 

714 def do_commit(self, dbapi_connection): 

715 dbapi_connection.commit() 

716 

717 def do_terminate(self, dbapi_connection): 

718 self.do_close(dbapi_connection) 

719 

720 def do_close(self, dbapi_connection): 

721 dbapi_connection.close() 

722 

723 @util.memoized_property 

724 def _dialect_specific_select_one(self): 

725 return str(expression.select(1).compile(dialect=self)) 

726 

727 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: 

728 try: 

729 return self.do_ping(dbapi_connection) 

730 except self.loaded_dbapi.Error as err: 

731 is_disconnect = self.is_disconnect(err, dbapi_connection, None) 

732 

733 if self._has_events: 

734 try: 

735 Connection._handle_dbapi_exception_noconnection( 

736 err, 

737 self, 

738 is_disconnect=is_disconnect, 

739 invalidate_pool_on_disconnect=False, 

740 is_pre_ping=True, 

741 ) 

742 except exc.StatementError as new_err: 

743 is_disconnect = new_err.connection_invalidated 

744 

745 if is_disconnect: 

746 return False 

747 else: 

748 raise 

749 

750 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool: 

751 cursor = dbapi_connection.cursor() 

752 try: 

753 cursor.execute(self._dialect_specific_select_one) 

754 finally: 

755 cursor.close() 

756 return True 

757 

758 def create_xid(self): 

759 """Create a random two-phase transaction ID. 

760 

761 This id will be passed to do_begin_twophase(), do_rollback_twophase(), 

762 do_commit_twophase(). Its format is unspecified. 

763 """ 

764 

765 return "_sa_%032x" % random.randint(0, 2**128) 

766 

767 def do_savepoint(self, connection, name): 

768 connection.execute(expression.SavepointClause(name)) 

769 

770 def do_rollback_to_savepoint(self, connection, name): 

771 connection.execute(expression.RollbackToSavepointClause(name)) 

772 

773 def do_release_savepoint(self, connection, name): 

774 connection.execute(expression.ReleaseSavepointClause(name)) 

775 

776 def _deliver_insertmanyvalues_batches( 

777 self, 

778 connection, 

779 cursor, 

780 statement, 

781 parameters, 

782 generic_setinputsizes, 

783 context, 

784 ): 

785 context = cast(DefaultExecutionContext, context) 

786 compiled = cast(SQLCompiler, context.compiled) 

787 

788 _composite_sentinel_proc: Sequence[ 

789 Optional[_ResultProcessorType[Any]] 

790 ] = () 

791 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None 

792 _sentinel_proc_initialized: bool = False 

793 

794 compiled_parameters = context.compiled_parameters 

795 

796 imv = compiled._insertmanyvalues 

797 assert imv is not None 

798 

799 is_returning: Final[bool] = bool(compiled.effective_returning) 

800 batch_size = context.execution_options.get( 

801 "insertmanyvalues_page_size", self.insertmanyvalues_page_size 

802 ) 

803 

804 if compiled.schema_translate_map: 

805 schema_translate_map = context.execution_options.get( 

806 "schema_translate_map", {} 

807 ) 

808 else: 

809 schema_translate_map = None 

810 

811 if is_returning: 

812 result: Optional[List[Any]] = [] 

813 context._insertmanyvalues_rows = result 

814 

815 sort_by_parameter_order = imv.sort_by_parameter_order 

816 

817 else: 

818 sort_by_parameter_order = False 

819 result = None 

820 

821 for imv_batch in compiled._deliver_insertmanyvalues_batches( 

822 statement, 

823 parameters, 

824 compiled_parameters, 

825 generic_setinputsizes, 

826 batch_size, 

827 sort_by_parameter_order, 

828 schema_translate_map, 

829 ): 

830 yield imv_batch 

831 

832 if is_returning: 

833 

834 try: 

835 rows = context.fetchall_for_returning(cursor) 

836 except BaseException as be: 

837 connection._handle_dbapi_exception( 

838 be, 

839 sql_util._long_statement(imv_batch.replaced_statement), 

840 imv_batch.replaced_parameters, 

841 None, 

842 context, 

843 is_sub_exec=True, 

844 ) 

845 

846 # I would have thought "is_returning: Final[bool]" 

847 # would have assured this but pylance thinks not 

848 assert result is not None 

849 

850 if imv.num_sentinel_columns and not imv_batch.is_downgraded: 

851 composite_sentinel = imv.num_sentinel_columns > 1 

852 if imv.implicit_sentinel: 

853 # for implicit sentinel, which is currently single-col 

854 # integer autoincrement, do a simple sort. 

855 assert not composite_sentinel 

856 result.extend( 

857 sorted(rows, key=operator.itemgetter(-1)) 

858 ) 

859 continue 

860 

861 # otherwise, create dictionaries to match up batches 

862 # with parameters 

863 assert imv.sentinel_param_keys 

864 assert imv.sentinel_columns 

865 

866 _nsc = imv.num_sentinel_columns 

867 

868 if not _sentinel_proc_initialized: 

869 if composite_sentinel: 

870 _composite_sentinel_proc = [ 

871 col.type._cached_result_processor( 

872 self, cursor_desc[1] 

873 ) 

874 for col, cursor_desc in zip( 

875 imv.sentinel_columns, 

876 cursor.description[-_nsc:], 

877 ) 

878 ] 

879 else: 

880 _scalar_sentinel_proc = ( 

881 imv.sentinel_columns[0] 

882 ).type._cached_result_processor( 

883 self, cursor.description[-1][1] 

884 ) 

885 _sentinel_proc_initialized = True 

886 

887 rows_by_sentinel: Union[ 

888 Dict[Tuple[Any, ...], Any], 

889 Dict[Any, Any], 

890 ] 

891 if composite_sentinel: 

892 rows_by_sentinel = { 

893 tuple( 

894 (proc(val) if proc else val) 

895 for val, proc in zip( 

896 row[-_nsc:], _composite_sentinel_proc 

897 ) 

898 ): row 

899 for row in rows 

900 } 

901 elif _scalar_sentinel_proc: 

902 rows_by_sentinel = { 

903 _scalar_sentinel_proc(row[-1]): row for row in rows 

904 } 

905 else: 

906 rows_by_sentinel = {row[-1]: row for row in rows} 

907 

908 if len(rows_by_sentinel) != len(imv_batch.batch): 

909 # see test_insert_exec.py:: 

910 # IMVSentinelTest::test_sentinel_incorrect_rowcount 

911 # for coverage / demonstration 

912 raise exc.InvalidRequestError( 

913 f"Sentinel-keyed result set did not produce " 

914 f"correct number of rows {len(imv_batch.batch)}; " 

915 "produced " 

916 f"{len(rows_by_sentinel)}. Please ensure the " 

917 "sentinel column is fully unique and populated in " 

918 "all cases." 

919 ) 

920 

921 try: 

922 ordered_rows = [ 

923 rows_by_sentinel[sentinel_keys] 

924 for sentinel_keys in imv_batch.sentinel_values 

925 ] 

926 except KeyError as ke: 

927 # see test_insert_exec.py:: 

928 # IMVSentinelTest::test_sentinel_cant_match_keys 

929 # for coverage / demonstration 

930 raise exc.InvalidRequestError( 

931 f"Can't match sentinel values in result set to " 

932 f"parameter sets; key {ke.args[0]!r} was not " 

933 "found. " 

934 "There may be a mismatch between the datatype " 

935 "passed to the DBAPI driver vs. that which it " 

936 "returns in a result row. Ensure the given " 

937 "Python value matches the expected result type " 

938 "*exactly*, taking care to not rely upon implicit " 

939 "conversions which may occur such as when using " 

940 "strings in place of UUID or integer values, etc. " 

941 ) from ke 

942 

943 result.extend(ordered_rows) 

944 

945 else: 

946 result.extend(rows) 

947 

948 def do_executemany(self, cursor, statement, parameters, context=None): 

949 cursor.executemany(statement, parameters) 

950 

951 def do_execute(self, cursor, statement, parameters, context=None): 

952 cursor.execute(statement, parameters) 

953 

954 def do_execute_no_params(self, cursor, statement, context=None): 

955 cursor.execute(statement) 

956 

957 def is_disconnect( 

958 self, 

959 e: DBAPIModule.Error, 

960 connection: Union[ 

961 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None 

962 ], 

963 cursor: Optional[interfaces.DBAPICursor], 

964 ) -> bool: 

965 return False 

966 

967 @util.memoized_instancemethod 

968 def _gen_allowed_isolation_levels(self, dbapi_conn): 

969 try: 

970 raw_levels = list(self.get_isolation_level_values(dbapi_conn)) 

971 except NotImplementedError: 

972 return None 

973 else: 

974 normalized_levels = [ 

975 level.replace("_", " ").upper() for level in raw_levels 

976 ] 

977 if raw_levels != normalized_levels: 

978 raise ValueError( 

979 f"Dialect {self.name!r} get_isolation_level_values() " 

980 f"method should return names as UPPERCASE using spaces, " 

981 f"not underscores; got " 

982 f"{sorted(set(raw_levels).difference(normalized_levels))}" 

983 ) 

984 return tuple(normalized_levels) 

985 

986 def _assert_and_set_isolation_level(self, dbapi_conn, level): 

987 level = level.replace("_", " ").upper() 

988 

989 _allowed_isolation_levels = self._gen_allowed_isolation_levels( 

990 dbapi_conn 

991 ) 

992 if ( 

993 _allowed_isolation_levels 

994 and level not in _allowed_isolation_levels 

995 ): 

996 raise exc.ArgumentError( 

997 f"Invalid value {level!r} for isolation_level. " 

998 f"Valid isolation levels for {self.name!r} are " 

999 f"{', '.join(_allowed_isolation_levels)}" 

1000 ) 

1001 

1002 self.set_isolation_level(dbapi_conn, level) 

1003 

1004 def reset_isolation_level(self, dbapi_conn): 

1005 if self._on_connect_isolation_level is not None: 

1006 assert ( 

1007 self._on_connect_isolation_level == "AUTOCOMMIT" 

1008 or self._on_connect_isolation_level 

1009 == self.default_isolation_level 

1010 ) 

1011 self._assert_and_set_isolation_level( 

1012 dbapi_conn, self._on_connect_isolation_level 

1013 ) 

1014 else: 

1015 assert self.default_isolation_level is not None 

1016 self._assert_and_set_isolation_level( 

1017 dbapi_conn, 

1018 self.default_isolation_level, 

1019 ) 

1020 

1021 def normalize_name(self, name): 

1022 if name is None: 

1023 return None 

1024 

1025 name_lower = name.lower() 

1026 name_upper = name.upper() 

1027 

1028 if name_upper == name_lower: 

1029 # name has no upper/lower conversion, e.g. non-european characters. 

1030 # return unchanged 

1031 return name 

1032 elif name_upper == name and not ( 

1033 self.identifier_preparer._requires_quotes 

1034 )(name_lower): 

1035 # name is all uppercase and doesn't require quoting; normalize 

1036 # to all lower case 

1037 return name_lower 

1038 elif name_lower == name: 

1039 # name is all lower case, which if denormalized means we need to 

1040 # force quoting on it 

1041 return quoted_name(name, quote=True) 

1042 else: 

1043 # name is mixed case, means it will be quoted in SQL when used 

1044 # later, no normalizes 

1045 return name 

1046 

1047 def denormalize_name(self, name): 

1048 if name is None: 

1049 return None 

1050 

1051 name_lower = name.lower() 

1052 name_upper = name.upper() 

1053 

1054 if name_upper == name_lower: 

1055 # name has no upper/lower conversion, e.g. non-european characters. 

1056 # return unchanged 

1057 return name 

1058 elif name_lower == name and not ( 

1059 self.identifier_preparer._requires_quotes 

1060 )(name_lower): 

1061 name = name_upper 

1062 return name 

1063 

1064 def get_driver_connection(self, connection: DBAPIConnection) -> Any: 

1065 return connection 

1066 

1067 def _overrides_default(self, method): 

1068 return ( 

1069 getattr(type(self), method).__code__ 

1070 is not getattr(DefaultDialect, method).__code__ 

1071 ) 

1072 

1073 def _default_multi_reflect( 

1074 self, 

1075 single_tbl_method, 

1076 connection, 

1077 kind, 

1078 schema, 

1079 filter_names, 

1080 scope, 

1081 **kw, 

1082 ): 

1083 names_fns = [] 

1084 temp_names_fns = [] 

1085 if ObjectKind.TABLE in kind: 

1086 names_fns.append(self.get_table_names) 

1087 temp_names_fns.append(self.get_temp_table_names) 

1088 if ObjectKind.VIEW in kind: 

1089 names_fns.append(self.get_view_names) 

1090 temp_names_fns.append(self.get_temp_view_names) 

1091 if ObjectKind.MATERIALIZED_VIEW in kind: 

1092 names_fns.append(self.get_materialized_view_names) 

1093 # no temp materialized view at the moment 

1094 # temp_names_fns.append(self.get_temp_materialized_view_names) 

1095 

1096 unreflectable = kw.pop("unreflectable", {}) 

1097 

1098 if ( 

1099 filter_names 

1100 and scope is ObjectScope.ANY 

1101 and kind is ObjectKind.ANY 

1102 ): 

1103 # if names are given and no qualification on type of table 

1104 # (i.e. the Table(..., autoload) case), take the names as given, 

1105 # don't run names queries. If a table does not exit 

1106 # NoSuchTableError is raised and it's skipped 

1107 

1108 # this also suits the case for mssql where we can reflect 

1109 # individual temp tables but there's no temp_names_fn 

1110 names = filter_names 

1111 else: 

1112 names = [] 

1113 name_kw = {"schema": schema, **kw} 

1114 fns = [] 

1115 if ObjectScope.DEFAULT in scope: 

1116 fns.extend(names_fns) 

1117 if ObjectScope.TEMPORARY in scope: 

1118 fns.extend(temp_names_fns) 

1119 

1120 for fn in fns: 

1121 try: 

1122 names.extend(fn(connection, **name_kw)) 

1123 except NotImplementedError: 

1124 pass 

1125 

1126 if filter_names: 

1127 filter_names = set(filter_names) 

1128 

1129 # iterate over all the tables/views and call the single table method 

1130 for table in names: 

1131 if not filter_names or table in filter_names: 

1132 key = (schema, table) 

1133 try: 

1134 yield ( 

1135 key, 

1136 single_tbl_method( 

1137 connection, table, schema=schema, **kw 

1138 ), 

1139 ) 

1140 except exc.UnreflectableTableError as err: 

1141 if key not in unreflectable: 

1142 unreflectable[key] = err 

1143 except exc.NoSuchTableError: 

1144 pass 

1145 

1146 def get_multi_table_options(self, connection, **kw): 

1147 return self._default_multi_reflect( 

1148 self.get_table_options, connection, **kw 

1149 ) 

1150 

1151 def get_multi_columns(self, connection, **kw): 

1152 return self._default_multi_reflect(self.get_columns, connection, **kw) 

1153 

1154 def get_multi_pk_constraint(self, connection, **kw): 

1155 return self._default_multi_reflect( 

1156 self.get_pk_constraint, connection, **kw 

1157 ) 

1158 

1159 def get_multi_foreign_keys(self, connection, **kw): 

1160 return self._default_multi_reflect( 

1161 self.get_foreign_keys, connection, **kw 

1162 ) 

1163 

1164 def get_multi_indexes(self, connection, **kw): 

1165 return self._default_multi_reflect(self.get_indexes, connection, **kw) 

1166 

1167 def get_multi_unique_constraints(self, connection, **kw): 

1168 return self._default_multi_reflect( 

1169 self.get_unique_constraints, connection, **kw 

1170 ) 

1171 

1172 def get_multi_check_constraints(self, connection, **kw): 

1173 return self._default_multi_reflect( 

1174 self.get_check_constraints, connection, **kw 

1175 ) 

1176 

1177 def get_multi_table_comment(self, connection, **kw): 

1178 return self._default_multi_reflect( 

1179 self.get_table_comment, connection, **kw 

1180 ) 

1181 

1182 

1183class StrCompileDialect(DefaultDialect): 

1184 statement_compiler = compiler.StrSQLCompiler 

1185 ddl_compiler = compiler.DDLCompiler 

1186 type_compiler_cls = compiler.StrSQLTypeCompiler 

1187 preparer = compiler.IdentifierPreparer 

1188 

1189 insert_returning = True 

1190 update_returning = True 

1191 delete_returning = True 

1192 

1193 supports_statement_cache = True 

1194 

1195 supports_identity_columns = True 

1196 

1197 supports_sequences = True 

1198 sequences_optional = True 

1199 preexecute_autoincrement_sequences = False 

1200 

1201 supports_native_boolean = True 

1202 

1203 supports_multivalues_insert = True 

1204 supports_simple_order_by_label = True 

1205 

1206 

1207class DefaultExecutionContext(ExecutionContext): 

1208 isinsert = False 

1209 isupdate = False 

1210 isdelete = False 

1211 is_crud = False 

1212 is_text = False 

1213 isddl = False 

1214 

1215 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE 

1216 

1217 compiled: Optional[Compiled] = None 

1218 result_column_struct: Optional[ 

1219 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool] 

1220 ] = None 

1221 returned_default_rows: Optional[Sequence[Row[Any]]] = None 

1222 

1223 execution_options: _ExecuteOptions = util.EMPTY_DICT 

1224 

1225 cursor_fetch_strategy = _cursor._DEFAULT_FETCH 

1226 

1227 invoked_statement: Optional[Executable] = None 

1228 

1229 _is_implicit_returning = False 

1230 _is_explicit_returning = False 

1231 _is_supplemental_returning = False 

1232 _is_server_side = False 

1233 

1234 _soft_closed = False 

1235 

1236 _rowcount: Optional[int] = None 

1237 

1238 # a hook for SQLite's translation of 

1239 # result column names 

1240 # NOTE: pyhive is using this hook, can't remove it :( 

1241 _translate_colname: Optional[ 

1242 Callable[[str], Tuple[str, Optional[str]]] 

1243 ] = None 

1244 

1245 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict() 

1246 """used by set_input_sizes(). 

1247 

1248 This collection comes from ``ExpandedState.parameter_expansion``. 

1249 

1250 """ 

1251 

1252 cache_hit = NO_CACHE_KEY 

1253 

1254 root_connection: Connection 

1255 _dbapi_connection: PoolProxiedConnection 

1256 dialect: Dialect 

1257 unicode_statement: str 

1258 cursor: DBAPICursor 

1259 compiled_parameters: List[_MutableCoreSingleExecuteParams] 

1260 parameters: _DBAPIMultiExecuteParams 

1261 extracted_parameters: Optional[Sequence[BindParameter[Any]]] 

1262 

1263 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) 

1264 

1265 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None 

1266 _num_sentinel_cols: int = 0 

1267 

1268 @classmethod 

1269 def _init_ddl( 

1270 cls, 

1271 dialect: Dialect, 

1272 connection: Connection, 

1273 dbapi_connection: PoolProxiedConnection, 

1274 execution_options: _ExecuteOptions, 

1275 compiled_ddl: DDLCompiler, 

1276 ) -> ExecutionContext: 

1277 """Initialize execution context for an ExecutableDDLElement 

1278 construct.""" 

1279 

1280 self = cls.__new__(cls) 

1281 self.root_connection = connection 

1282 self._dbapi_connection = dbapi_connection 

1283 self.dialect = connection.dialect 

1284 

1285 self.compiled = compiled = compiled_ddl 

1286 self.isddl = True 

1287 

1288 self.execution_options = execution_options 

1289 

1290 self.unicode_statement = str(compiled) 

1291 if compiled.schema_translate_map: 

1292 schema_translate_map = self.execution_options.get( 

1293 "schema_translate_map", {} 

1294 ) 

1295 

1296 rst = compiled.preparer._render_schema_translates 

1297 self.unicode_statement = rst( 

1298 self.unicode_statement, schema_translate_map 

1299 ) 

1300 

1301 self.statement = self.unicode_statement 

1302 

1303 self.cursor = self.create_cursor() 

1304 self.compiled_parameters = [] 

1305 

1306 if dialect.positional: 

1307 self.parameters = [dialect.execute_sequence_format()] 

1308 else: 

1309 self.parameters = [self._empty_dict_params] 

1310 

1311 return self 

1312 

1313 @classmethod 

1314 def _init_compiled( 

1315 cls, 

1316 dialect: Dialect, 

1317 connection: Connection, 

1318 dbapi_connection: PoolProxiedConnection, 

1319 execution_options: _ExecuteOptions, 

1320 compiled: SQLCompiler, 

1321 parameters: _CoreMultiExecuteParams, 

1322 invoked_statement: Executable, 

1323 extracted_parameters: Optional[Sequence[BindParameter[Any]]], 

1324 cache_hit: CacheStats = CacheStats.CACHING_DISABLED, 

1325 ) -> ExecutionContext: 

1326 """Initialize execution context for a Compiled construct.""" 

1327 

1328 self = cls.__new__(cls) 

1329 self.root_connection = connection 

1330 self._dbapi_connection = dbapi_connection 

1331 self.dialect = connection.dialect 

1332 self.extracted_parameters = extracted_parameters 

1333 self.invoked_statement = invoked_statement 

1334 self.compiled = compiled 

1335 self.cache_hit = cache_hit 

1336 

1337 self.execution_options = execution_options 

1338 

1339 self.result_column_struct = ( 

1340 compiled._result_columns, 

1341 compiled._ordered_columns, 

1342 compiled._textual_ordered_columns, 

1343 compiled._ad_hoc_textual, 

1344 compiled._loose_column_name_matching, 

1345 ) 

1346 

1347 self.isinsert = ii = compiled.isinsert 

1348 self.isupdate = iu = compiled.isupdate 

1349 self.isdelete = id_ = compiled.isdelete 

1350 self.is_text = compiled.isplaintext 

1351 

1352 if ii or iu or id_: 

1353 dml_statement = compiled.compile_state.statement # type: ignore 

1354 if TYPE_CHECKING: 

1355 assert isinstance(dml_statement, UpdateBase) 

1356 self.is_crud = True 

1357 self._is_explicit_returning = ier = bool(dml_statement._returning) 

1358 self._is_implicit_returning = iir = bool( 

1359 compiled.implicit_returning 

1360 ) 

1361 if iir and dml_statement._supplemental_returning: 

1362 self._is_supplemental_returning = True 

1363 

1364 # dont mix implicit and explicit returning 

1365 assert not (iir and ier) 

1366 

1367 if (ier or iir) and compiled.for_executemany: 

1368 if ii and not self.dialect.insert_executemany_returning: 

1369 raise exc.InvalidRequestError( 

1370 f"Dialect {self.dialect.dialect_description} with " 

1371 f"current server capabilities does not support " 

1372 "INSERT..RETURNING when executemany is used" 

1373 ) 

1374 elif ( 

1375 ii 

1376 and dml_statement._sort_by_parameter_order 

1377 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501 

1378 ): 

1379 raise exc.InvalidRequestError( 

1380 f"Dialect {self.dialect.dialect_description} with " 

1381 f"current server capabilities does not support " 

1382 "INSERT..RETURNING with deterministic row ordering " 

1383 "when executemany is used" 

1384 ) 

1385 elif ( 

1386 ii 

1387 and self.dialect.use_insertmanyvalues 

1388 and not compiled._insertmanyvalues 

1389 ): 

1390 raise exc.InvalidRequestError( 

1391 'Statement does not have "insertmanyvalues" ' 

1392 "enabled, can't use INSERT..RETURNING with " 

1393 "executemany in this case." 

1394 ) 

1395 elif iu and not self.dialect.update_executemany_returning: 

1396 raise exc.InvalidRequestError( 

1397 f"Dialect {self.dialect.dialect_description} with " 

1398 f"current server capabilities does not support " 

1399 "UPDATE..RETURNING when executemany is used" 

1400 ) 

1401 elif id_ and not self.dialect.delete_executemany_returning: 

1402 raise exc.InvalidRequestError( 

1403 f"Dialect {self.dialect.dialect_description} with " 

1404 f"current server capabilities does not support " 

1405 "DELETE..RETURNING when executemany is used" 

1406 ) 

1407 

1408 if not parameters: 

1409 self.compiled_parameters = [ 

1410 compiled.construct_params( 

1411 extracted_parameters=extracted_parameters, 

1412 escape_names=False, 

1413 ) 

1414 ] 

1415 else: 

1416 self.compiled_parameters = [ 

1417 compiled.construct_params( 

1418 m, 

1419 escape_names=False, 

1420 _group_number=grp, 

1421 extracted_parameters=extracted_parameters, 

1422 ) 

1423 for grp, m in enumerate(parameters) 

1424 ] 

1425 

1426 if len(parameters) > 1: 

1427 if self.isinsert and compiled._insertmanyvalues: 

1428 self.execute_style = ExecuteStyle.INSERTMANYVALUES 

1429 

1430 imv = compiled._insertmanyvalues 

1431 if imv.sentinel_columns is not None: 

1432 self._num_sentinel_cols = imv.num_sentinel_columns 

1433 else: 

1434 self.execute_style = ExecuteStyle.EXECUTEMANY 

1435 

1436 self.unicode_statement = compiled.string 

1437 

1438 self.cursor = self.create_cursor() 

1439 

1440 if self.compiled.insert_prefetch or self.compiled.update_prefetch: 

1441 self._process_execute_defaults() 

1442 

1443 processors = compiled._bind_processors 

1444 

1445 flattened_processors: Mapping[ 

1446 str, _BindProcessorType[Any] 

1447 ] = processors # type: ignore[assignment] 

1448 

1449 if compiled.literal_execute_params or compiled.post_compile_params: 

1450 if self.executemany: 

1451 raise exc.InvalidRequestError( 

1452 "'literal_execute' or 'expanding' parameters can't be " 

1453 "used with executemany()" 

1454 ) 

1455 

1456 expanded_state = compiled._process_parameters_for_postcompile( 

1457 self.compiled_parameters[0] 

1458 ) 

1459 

1460 # re-assign self.unicode_statement 

1461 self.unicode_statement = expanded_state.statement 

1462 

1463 self._expanded_parameters = expanded_state.parameter_expansion 

1464 

1465 flattened_processors = dict(processors) # type: ignore 

1466 flattened_processors.update(expanded_state.processors) 

1467 positiontup = expanded_state.positiontup 

1468 elif compiled.positional: 

1469 positiontup = self.compiled.positiontup 

1470 else: 

1471 positiontup = None 

1472 

1473 if compiled.schema_translate_map: 

1474 schema_translate_map = self.execution_options.get( 

1475 "schema_translate_map", {} 

1476 ) 

1477 rst = compiled.preparer._render_schema_translates 

1478 self.unicode_statement = rst( 

1479 self.unicode_statement, schema_translate_map 

1480 ) 

1481 

1482 # final self.unicode_statement is now assigned, encode if needed 

1483 # by dialect 

1484 self.statement = self.unicode_statement 

1485 

1486 # Convert the dictionary of bind parameter values 

1487 # into a dict or list to be sent to the DBAPI's 

1488 # execute() or executemany() method. 

1489 

1490 if compiled.positional: 

1491 core_positional_parameters: MutableSequence[Sequence[Any]] = [] 

1492 assert positiontup is not None 

1493 for compiled_params in self.compiled_parameters: 

1494 l_param: List[Any] = [ 

1495 ( 

1496 flattened_processors[key](compiled_params[key]) 

1497 if key in flattened_processors 

1498 else compiled_params[key] 

1499 ) 

1500 for key in positiontup 

1501 ] 

1502 core_positional_parameters.append( 

1503 dialect.execute_sequence_format(l_param) 

1504 ) 

1505 

1506 self.parameters = core_positional_parameters 

1507 else: 

1508 core_dict_parameters: MutableSequence[Dict[str, Any]] = [] 

1509 escaped_names = compiled.escaped_bind_names 

1510 

1511 # note that currently, "expanded" parameters will be present 

1512 # in self.compiled_parameters in their quoted form. This is 

1513 # slightly inconsistent with the approach taken as of 

1514 # #8056 where self.compiled_parameters is meant to contain unquoted 

1515 # param names. 

1516 d_param: Dict[str, Any] 

1517 for compiled_params in self.compiled_parameters: 

1518 if escaped_names: 

1519 d_param = { 

1520 escaped_names.get(key, key): ( 

1521 flattened_processors[key](compiled_params[key]) 

1522 if key in flattened_processors 

1523 else compiled_params[key] 

1524 ) 

1525 for key in compiled_params 

1526 } 

1527 else: 

1528 d_param = { 

1529 key: ( 

1530 flattened_processors[key](compiled_params[key]) 

1531 if key in flattened_processors 

1532 else compiled_params[key] 

1533 ) 

1534 for key in compiled_params 

1535 } 

1536 

1537 core_dict_parameters.append(d_param) 

1538 

1539 self.parameters = core_dict_parameters 

1540 

1541 return self 

1542 

1543 @classmethod 

1544 def _init_statement( 

1545 cls, 

1546 dialect: Dialect, 

1547 connection: Connection, 

1548 dbapi_connection: PoolProxiedConnection, 

1549 execution_options: _ExecuteOptions, 

1550 statement: str, 

1551 parameters: _DBAPIMultiExecuteParams, 

1552 ) -> ExecutionContext: 

1553 """Initialize execution context for a string SQL statement.""" 

1554 

1555 self = cls.__new__(cls) 

1556 self.root_connection = connection 

1557 self._dbapi_connection = dbapi_connection 

1558 self.dialect = connection.dialect 

1559 self.is_text = True 

1560 

1561 self.execution_options = execution_options 

1562 

1563 if not parameters: 

1564 if self.dialect.positional: 

1565 self.parameters = [dialect.execute_sequence_format()] 

1566 else: 

1567 self.parameters = [self._empty_dict_params] 

1568 elif isinstance(parameters[0], dialect.execute_sequence_format): 

1569 self.parameters = parameters 

1570 elif isinstance(parameters[0], dict): 

1571 self.parameters = parameters 

1572 else: 

1573 self.parameters = [ 

1574 dialect.execute_sequence_format(p) for p in parameters 

1575 ] 

1576 

1577 if len(parameters) > 1: 

1578 self.execute_style = ExecuteStyle.EXECUTEMANY 

1579 

1580 self.statement = self.unicode_statement = statement 

1581 

1582 self.cursor = self.create_cursor() 

1583 return self 

1584 

1585 @classmethod 

1586 def _init_default( 

1587 cls, 

1588 dialect: Dialect, 

1589 connection: Connection, 

1590 dbapi_connection: PoolProxiedConnection, 

1591 execution_options: _ExecuteOptions, 

1592 ) -> ExecutionContext: 

1593 """Initialize execution context for a ColumnDefault construct.""" 

1594 

1595 self = cls.__new__(cls) 

1596 self.root_connection = connection 

1597 self._dbapi_connection = dbapi_connection 

1598 self.dialect = connection.dialect 

1599 

1600 self.execution_options = execution_options 

1601 

1602 self.cursor = self.create_cursor() 

1603 return self 

1604 

1605 def _get_cache_stats(self) -> str: 

1606 if self.compiled is None: 

1607 return "raw sql" 

1608 

1609 now = perf_counter() 

1610 

1611 ch = self.cache_hit 

1612 

1613 gen_time = self.compiled._gen_time 

1614 assert gen_time is not None 

1615 

1616 if ch is NO_CACHE_KEY: 

1617 return "no key %.5fs" % (now - gen_time,) 

1618 elif ch is CACHE_HIT: 

1619 return "cached since %.4gs ago" % (now - gen_time,) 

1620 elif ch is CACHE_MISS: 

1621 return "generated in %.5fs" % (now - gen_time,) 

1622 elif ch is CACHING_DISABLED: 

1623 if "_cache_disable_reason" in self.execution_options: 

1624 return "caching disabled (%s) %.5fs " % ( 

1625 self.execution_options["_cache_disable_reason"], 

1626 now - gen_time, 

1627 ) 

1628 else: 

1629 return "caching disabled %.5fs" % (now - gen_time,) 

1630 elif ch is NO_DIALECT_SUPPORT: 

1631 return "dialect %s+%s does not support caching %.5fs" % ( 

1632 self.dialect.name, 

1633 self.dialect.driver, 

1634 now - gen_time, 

1635 ) 

1636 else: 

1637 return "unknown" 

1638 

1639 @property 

1640 def executemany(self): # type: ignore[override] 

1641 return self.execute_style in ( 

1642 ExecuteStyle.EXECUTEMANY, 

1643 ExecuteStyle.INSERTMANYVALUES, 

1644 ) 

1645 

1646 @util.memoized_property 

1647 def identifier_preparer(self): 

1648 if self.compiled: 

1649 return self.compiled.preparer 

1650 elif "schema_translate_map" in self.execution_options: 

1651 return self.dialect.identifier_preparer._with_schema_translate( 

1652 self.execution_options["schema_translate_map"] 

1653 ) 

1654 else: 

1655 return self.dialect.identifier_preparer 

1656 

1657 @util.memoized_property 

1658 def engine(self): 

1659 return self.root_connection.engine 

1660 

1661 @util.memoized_property 

1662 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1663 if TYPE_CHECKING: 

1664 assert isinstance(self.compiled, SQLCompiler) 

1665 return self.compiled.postfetch 

1666 

1667 @util.memoized_property 

1668 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1669 if TYPE_CHECKING: 

1670 assert isinstance(self.compiled, SQLCompiler) 

1671 if self.isinsert: 

1672 return self.compiled.insert_prefetch 

1673 elif self.isupdate: 

1674 return self.compiled.update_prefetch 

1675 else: 

1676 return () 

1677 

1678 @util.memoized_property 

1679 def no_parameters(self): 

1680 return self.execution_options.get("no_parameters", False) 

1681 

1682 def _execute_scalar( 

1683 self, 

1684 stmt: str, 

1685 type_: Optional[TypeEngine[Any]], 

1686 parameters: Optional[_DBAPISingleExecuteParams] = None, 

1687 ) -> Any: 

1688 """Execute a string statement on the current cursor, returning a 

1689 scalar result. 

1690 

1691 Used to fire off sequences, default phrases, and "select lastrowid" 

1692 types of statements individually or in the context of a parent INSERT 

1693 or UPDATE statement. 

1694 

1695 """ 

1696 

1697 conn = self.root_connection 

1698 

1699 if "schema_translate_map" in self.execution_options: 

1700 schema_translate_map = self.execution_options.get( 

1701 "schema_translate_map", {} 

1702 ) 

1703 

1704 rst = self.identifier_preparer._render_schema_translates 

1705 stmt = rst(stmt, schema_translate_map) 

1706 

1707 if not parameters: 

1708 if self.dialect.positional: 

1709 parameters = self.dialect.execute_sequence_format() 

1710 else: 

1711 parameters = {} 

1712 

1713 conn._cursor_execute(self.cursor, stmt, parameters, context=self) 

1714 row = self.cursor.fetchone() 

1715 if row is not None: 

1716 r = row[0] 

1717 else: 

1718 r = None 

1719 if type_ is not None: 

1720 # apply type post processors to the result 

1721 proc = type_._cached_result_processor( 

1722 self.dialect, self.cursor.description[0][1] 

1723 ) 

1724 if proc: 

1725 return proc(r) 

1726 return r 

1727 

1728 @util.memoized_property 

1729 def connection(self): 

1730 return self.root_connection 

1731 

1732 def _use_server_side_cursor(self): 

1733 if not self.dialect.supports_server_side_cursors: 

1734 return False 

1735 

1736 if self.dialect.server_side_cursors: 

1737 # this is deprecated 

1738 use_server_side = self.execution_options.get( 

1739 "stream_results", True 

1740 ) and ( 

1741 self.compiled 

1742 and isinstance(self.compiled.statement, expression.Selectable) 

1743 or ( 

1744 ( 

1745 not self.compiled 

1746 or isinstance( 

1747 self.compiled.statement, expression.TextClause 

1748 ) 

1749 ) 

1750 and self.unicode_statement 

1751 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement) 

1752 ) 

1753 ) 

1754 else: 

1755 use_server_side = self.execution_options.get( 

1756 "stream_results", False 

1757 ) 

1758 

1759 return use_server_side 

1760 

1761 def create_cursor(self) -> DBAPICursor: 

1762 if ( 

1763 # inlining initial preference checks for SS cursors 

1764 self.dialect.supports_server_side_cursors 

1765 and ( 

1766 self.execution_options.get("stream_results", False) 

1767 or ( 

1768 self.dialect.server_side_cursors 

1769 and self._use_server_side_cursor() 

1770 ) 

1771 ) 

1772 ): 

1773 self._is_server_side = True 

1774 return self.create_server_side_cursor() 

1775 else: 

1776 self._is_server_side = False 

1777 return self.create_default_cursor() 

1778 

1779 def fetchall_for_returning(self, cursor): 

1780 return cursor.fetchall() 

1781 

1782 def create_default_cursor(self) -> DBAPICursor: 

1783 return self._dbapi_connection.cursor() 

1784 

1785 def create_server_side_cursor(self) -> DBAPICursor: 

1786 raise NotImplementedError() 

1787 

1788 def pre_exec(self): 

1789 pass 

1790 

1791 def get_out_parameter_values(self, names): 

1792 raise NotImplementedError( 

1793 "This dialect does not support OUT parameters" 

1794 ) 

1795 

1796 def post_exec(self): 

1797 pass 

1798 

1799 def get_result_processor( 

1800 self, type_: TypeEngine[Any], colname: str, coltype: DBAPIType 

1801 ) -> Optional[_ResultProcessorType[Any]]: 

1802 """Return a 'result processor' for a given type as present in 

1803 cursor.description. 

1804 

1805 This has a default implementation that dialects can override 

1806 for context-sensitive result type handling. 

1807 

1808 """ 

1809 return type_._cached_result_processor(self.dialect, coltype) 

1810 

1811 def get_lastrowid(self) -> int: 

1812 """return self.cursor.lastrowid, or equivalent, after an INSERT. 

1813 

1814 This may involve calling special cursor functions, issuing a new SELECT 

1815 on the cursor (or a new one), or returning a stored value that was 

1816 calculated within post_exec(). 

1817 

1818 This function will only be called for dialects which support "implicit" 

1819 primary key generation, keep preexecute_autoincrement_sequences set to 

1820 False, and when no explicit id value was bound to the statement. 

1821 

1822 The function is called once for an INSERT statement that would need to 

1823 return the last inserted primary key for those dialects that make use 

1824 of the lastrowid concept. In these cases, it is called directly after 

1825 :meth:`.ExecutionContext.post_exec`. 

1826 

1827 """ 

1828 return self.cursor.lastrowid 

1829 

1830 def handle_dbapi_exception(self, e): 

1831 pass 

1832 

1833 @util.non_memoized_property 

1834 def rowcount(self) -> int: 

1835 if self._rowcount is not None: 

1836 return self._rowcount 

1837 else: 

1838 return self.cursor.rowcount 

1839 

1840 @property 

1841 def _has_rowcount(self): 

1842 return self._rowcount is not None 

1843 

1844 def supports_sane_rowcount(self): 

1845 return self.dialect.supports_sane_rowcount 

1846 

1847 def supports_sane_multi_rowcount(self): 

1848 return self.dialect.supports_sane_multi_rowcount 

1849 

1850 def _setup_result_proxy(self): 

1851 exec_opt = self.execution_options 

1852 

1853 if self._rowcount is None and exec_opt.get("preserve_rowcount", False): 

1854 self._rowcount = self.cursor.rowcount 

1855 

1856 yp: Optional[Union[int, bool]] 

1857 if self.is_crud or self.is_text: 

1858 result = self._setup_dml_or_text_result() 

1859 yp = False 

1860 else: 

1861 yp = exec_opt.get("yield_per", None) 

1862 sr = self._is_server_side or exec_opt.get("stream_results", False) 

1863 strategy = self.cursor_fetch_strategy 

1864 if sr and strategy is _cursor._DEFAULT_FETCH: 

1865 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1866 self.cursor, self.execution_options 

1867 ) 

1868 cursor_description: _DBAPICursorDescription = ( 

1869 strategy.alternate_cursor_description 

1870 or self.cursor.description 

1871 ) 

1872 if cursor_description is None: 

1873 strategy = _cursor._NO_CURSOR_DQL 

1874 

1875 result = _cursor.CursorResult(self, strategy, cursor_description) 

1876 

1877 compiled = self.compiled 

1878 

1879 if ( 

1880 compiled 

1881 and not self.isddl 

1882 and cast(SQLCompiler, compiled).has_out_parameters 

1883 ): 

1884 self._setup_out_parameters(result) 

1885 

1886 self._soft_closed = result._soft_closed 

1887 

1888 if yp: 

1889 result = result.yield_per(yp) 

1890 

1891 return result 

1892 

1893 def _setup_out_parameters(self, result): 

1894 compiled = cast(SQLCompiler, self.compiled) 

1895 

1896 out_bindparams = [ 

1897 (param, name) 

1898 for param, name in compiled.bind_names.items() 

1899 if param.isoutparam 

1900 ] 

1901 out_parameters = {} 

1902 

1903 for bindparam, raw_value in zip( 

1904 [param for param, name in out_bindparams], 

1905 self.get_out_parameter_values( 

1906 [name for param, name in out_bindparams] 

1907 ), 

1908 ): 

1909 type_ = bindparam.type 

1910 impl_type = type_.dialect_impl(self.dialect) 

1911 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi) 

1912 result_processor = impl_type.result_processor( 

1913 self.dialect, dbapi_type 

1914 ) 

1915 if result_processor is not None: 

1916 raw_value = result_processor(raw_value) 

1917 out_parameters[bindparam.key] = raw_value 

1918 

1919 result.out_parameters = out_parameters 

1920 

1921 def _setup_dml_or_text_result(self): 

1922 compiled = cast(SQLCompiler, self.compiled) 

1923 

1924 strategy: ResultFetchStrategy = self.cursor_fetch_strategy 

1925 

1926 if self.isinsert: 

1927 if ( 

1928 self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1929 and compiled.effective_returning 

1930 ): 

1931 strategy = _cursor.FullyBufferedCursorFetchStrategy( 

1932 self.cursor, 

1933 initial_buffer=self._insertmanyvalues_rows, 

1934 # maintain alt cursor description if set by the 

1935 # dialect, e.g. mssql preserves it 

1936 alternate_description=( 

1937 strategy.alternate_cursor_description 

1938 ), 

1939 ) 

1940 

1941 if compiled.postfetch_lastrowid: 

1942 self.inserted_primary_key_rows = ( 

1943 self._setup_ins_pk_from_lastrowid() 

1944 ) 

1945 # else if not self._is_implicit_returning, 

1946 # the default inserted_primary_key_rows accessor will 

1947 # return an "empty" primary key collection when accessed. 

1948 

1949 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: 

1950 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1951 self.cursor, self.execution_options 

1952 ) 

1953 

1954 if strategy is _cursor._NO_CURSOR_DML: 

1955 cursor_description = None 

1956 else: 

1957 cursor_description = ( 

1958 strategy.alternate_cursor_description 

1959 or self.cursor.description 

1960 ) 

1961 

1962 if cursor_description is None: 

1963 strategy = _cursor._NO_CURSOR_DML 

1964 elif self._num_sentinel_cols: 

1965 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1966 # strip out the sentinel columns from cursor description 

1967 # a similar logic is done to the rows only in CursorResult 

1968 cursor_description = cursor_description[ 

1969 0 : -self._num_sentinel_cols 

1970 ] 

1971 

1972 result: _cursor.CursorResult[Any] = _cursor.CursorResult( 

1973 self, strategy, cursor_description 

1974 ) 

1975 

1976 if self.isinsert: 

1977 if self._is_implicit_returning: 

1978 rows = result.all() 

1979 

1980 self.returned_default_rows = rows 

1981 

1982 self.inserted_primary_key_rows = ( 

1983 self._setup_ins_pk_from_implicit_returning(result, rows) 

1984 ) 

1985 

1986 # test that it has a cursor metadata that is accurate. the 

1987 # first row will have been fetched and current assumptions 

1988 # are that the result has only one row, until executemany() 

1989 # support is added here. 

1990 assert result._metadata.returns_rows 

1991 

1992 # Insert statement has both return_defaults() and 

1993 # returning(). rewind the result on the list of rows 

1994 # we just used. 

1995 if self._is_supplemental_returning: 

1996 result._rewind(rows) 

1997 else: 

1998 result._soft_close() 

1999 elif not self._is_explicit_returning: 

2000 result._soft_close() 

2001 

2002 # we assume here the result does not return any rows. 

2003 # *usually*, this will be true. However, some dialects 

2004 # such as that of MSSQL/pyodbc need to SELECT a post fetch 

2005 # function so this is not necessarily true. 

2006 # assert not result.returns_rows 

2007 

2008 elif self._is_implicit_returning: 

2009 rows = result.all() 

2010 

2011 if rows: 

2012 self.returned_default_rows = rows 

2013 self._rowcount = len(rows) 

2014 

2015 if self._is_supplemental_returning: 

2016 result._rewind(rows) 

2017 else: 

2018 result._soft_close() 

2019 

2020 # test that it has a cursor metadata that is accurate. 

2021 # the rows have all been fetched however. 

2022 assert result._metadata.returns_rows 

2023 

2024 elif not result._metadata.returns_rows: 

2025 # no results, get rowcount 

2026 # (which requires open cursor on some drivers) 

2027 if self._rowcount is None: 

2028 self._rowcount = self.cursor.rowcount 

2029 result._soft_close() 

2030 elif self.isupdate or self.isdelete: 

2031 if self._rowcount is None: 

2032 self._rowcount = self.cursor.rowcount 

2033 return result 

2034 

2035 @util.memoized_property 

2036 def inserted_primary_key_rows(self): 

2037 # if no specific "get primary key" strategy was set up 

2038 # during execution, return a "default" primary key based 

2039 # on what's in the compiled_parameters and nothing else. 

2040 return self._setup_ins_pk_from_empty() 

2041 

2042 def _setup_ins_pk_from_lastrowid(self): 

2043 getter = cast( 

2044 SQLCompiler, self.compiled 

2045 )._inserted_primary_key_from_lastrowid_getter 

2046 lastrowid = self.get_lastrowid() 

2047 return [getter(lastrowid, self.compiled_parameters[0])] 

2048 

2049 def _setup_ins_pk_from_empty(self): 

2050 getter = cast( 

2051 SQLCompiler, self.compiled 

2052 )._inserted_primary_key_from_lastrowid_getter 

2053 return [getter(None, param) for param in self.compiled_parameters] 

2054 

2055 def _setup_ins_pk_from_implicit_returning(self, result, rows): 

2056 if not rows: 

2057 return [] 

2058 

2059 getter = cast( 

2060 SQLCompiler, self.compiled 

2061 )._inserted_primary_key_from_returning_getter 

2062 compiled_params = self.compiled_parameters 

2063 

2064 return [ 

2065 getter(row, param) for row, param in zip(rows, compiled_params) 

2066 ] 

2067 

2068 def lastrow_has_defaults(self) -> bool: 

2069 return (self.isinsert or self.isupdate) and bool( 

2070 cast(SQLCompiler, self.compiled).postfetch 

2071 ) 

2072 

2073 def _prepare_set_input_sizes( 

2074 self, 

2075 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]: 

2076 """Given a cursor and ClauseParameters, prepare arguments 

2077 in order to call the appropriate 

2078 style of ``setinputsizes()`` on the cursor, using DB-API types 

2079 from the bind parameter's ``TypeEngine`` objects. 

2080 

2081 This method only called by those dialects which set the 

2082 :attr:`.Dialect.bind_typing` attribute to 

2083 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are 

2084 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an 

2085 option. 

2086 

2087 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used 

2088 for pg8000 and asyncpg, which has been changed to inline rendering 

2089 of casts. 

2090 

2091 """ 

2092 if self.isddl or self.is_text: 

2093 return None 

2094 

2095 compiled = cast(SQLCompiler, self.compiled) 

2096 

2097 inputsizes = compiled._get_set_input_sizes_lookup() 

2098 

2099 if inputsizes is None: 

2100 return None 

2101 

2102 dialect = self.dialect 

2103 

2104 # all of the rest of this... cython? 

2105 

2106 if dialect._has_events: 

2107 inputsizes = dict(inputsizes) 

2108 dialect.dispatch.do_setinputsizes( 

2109 inputsizes, self.cursor, self.statement, self.parameters, self 

2110 ) 

2111 

2112 if compiled.escaped_bind_names: 

2113 escaped_bind_names = compiled.escaped_bind_names 

2114 else: 

2115 escaped_bind_names = None 

2116 

2117 if dialect.positional: 

2118 items = [ 

2119 (key, compiled.binds[key]) 

2120 for key in compiled.positiontup or () 

2121 ] 

2122 else: 

2123 items = [ 

2124 (key, bindparam) 

2125 for bindparam, key in compiled.bind_names.items() 

2126 ] 

2127 

2128 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = [] 

2129 for key, bindparam in items: 

2130 if bindparam in compiled.literal_execute_params: 

2131 continue 

2132 

2133 if key in self._expanded_parameters: 

2134 if is_tuple_type(bindparam.type): 

2135 num = len(bindparam.type.types) 

2136 dbtypes = inputsizes[bindparam] 

2137 generic_inputsizes.extend( 

2138 ( 

2139 ( 

2140 escaped_bind_names.get(paramname, paramname) 

2141 if escaped_bind_names is not None 

2142 else paramname 

2143 ), 

2144 dbtypes[idx % num], 

2145 bindparam.type.types[idx % num], 

2146 ) 

2147 for idx, paramname in enumerate( 

2148 self._expanded_parameters[key] 

2149 ) 

2150 ) 

2151 else: 

2152 dbtype = inputsizes.get(bindparam, None) 

2153 generic_inputsizes.extend( 

2154 ( 

2155 ( 

2156 escaped_bind_names.get(paramname, paramname) 

2157 if escaped_bind_names is not None 

2158 else paramname 

2159 ), 

2160 dbtype, 

2161 bindparam.type, 

2162 ) 

2163 for paramname in self._expanded_parameters[key] 

2164 ) 

2165 else: 

2166 dbtype = inputsizes.get(bindparam, None) 

2167 

2168 escaped_name = ( 

2169 escaped_bind_names.get(key, key) 

2170 if escaped_bind_names is not None 

2171 else key 

2172 ) 

2173 

2174 generic_inputsizes.append( 

2175 (escaped_name, dbtype, bindparam.type) 

2176 ) 

2177 

2178 return generic_inputsizes 

2179 

2180 def _exec_default(self, column, default, type_): 

2181 if default.is_sequence: 

2182 return self.fire_sequence(default, type_) 

2183 elif default.is_callable: 

2184 # this codepath is not normally used as it's inlined 

2185 # into _process_execute_defaults 

2186 self.current_column = column 

2187 return default.arg(self) 

2188 elif default.is_clause_element: 

2189 return self._exec_default_clause_element(column, default, type_) 

2190 else: 

2191 # this codepath is not normally used as it's inlined 

2192 # into _process_execute_defaults 

2193 return default.arg 

2194 

2195 def _exec_default_clause_element(self, column, default, type_): 

2196 # execute a default that's a complete clause element. Here, we have 

2197 # to re-implement a miniature version of the compile->parameters-> 

2198 # cursor.execute() sequence, since we don't want to modify the state 

2199 # of the connection / result in progress or create new connection/ 

2200 # result objects etc. 

2201 # .. versionchanged:: 1.4 

2202 

2203 if not default._arg_is_typed: 

2204 default_arg = expression.type_coerce(default.arg, type_) 

2205 else: 

2206 default_arg = default.arg 

2207 compiled = expression.select(default_arg).compile(dialect=self.dialect) 

2208 compiled_params = compiled.construct_params() 

2209 processors = compiled._bind_processors 

2210 if compiled.positional: 

2211 parameters = self.dialect.execute_sequence_format( 

2212 [ 

2213 ( 

2214 processors[key](compiled_params[key]) # type: ignore 

2215 if key in processors 

2216 else compiled_params[key] 

2217 ) 

2218 for key in compiled.positiontup or () 

2219 ] 

2220 ) 

2221 else: 

2222 parameters = { 

2223 key: ( 

2224 processors[key](compiled_params[key]) # type: ignore 

2225 if key in processors 

2226 else compiled_params[key] 

2227 ) 

2228 for key in compiled_params 

2229 } 

2230 return self._execute_scalar( 

2231 str(compiled), type_, parameters=parameters 

2232 ) 

2233 

2234 current_parameters: Optional[_CoreSingleExecuteParams] = None 

2235 """A dictionary of parameters applied to the current row. 

2236 

2237 This attribute is only available in the context of a user-defined default 

2238 generation function, e.g. as described at :ref:`context_default_functions`. 

2239 It consists of a dictionary which includes entries for each column/value 

2240 pair that is to be part of the INSERT or UPDATE statement. The keys of the 

2241 dictionary will be the key value of each :class:`_schema.Column`, 

2242 which is usually 

2243 synonymous with the name. 

2244 

2245 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute 

2246 does not accommodate for the "multi-values" feature of the 

2247 :meth:`_expression.Insert.values` method. The 

2248 :meth:`.DefaultExecutionContext.get_current_parameters` method should be 

2249 preferred. 

2250 

2251 .. seealso:: 

2252 

2253 :meth:`.DefaultExecutionContext.get_current_parameters` 

2254 

2255 :ref:`context_default_functions` 

2256 

2257 """ 

2258 

2259 def get_current_parameters(self, isolate_multiinsert_groups=True): 

2260 """Return a dictionary of parameters applied to the current row. 

2261 

2262 This method can only be used in the context of a user-defined default 

2263 generation function, e.g. as described at 

2264 :ref:`context_default_functions`. When invoked, a dictionary is 

2265 returned which includes entries for each column/value pair that is part 

2266 of the INSERT or UPDATE statement. The keys of the dictionary will be 

2267 the key value of each :class:`_schema.Column`, 

2268 which is usually synonymous 

2269 with the name. 

2270 

2271 :param isolate_multiinsert_groups=True: indicates that multi-valued 

2272 INSERT constructs created using :meth:`_expression.Insert.values` 

2273 should be 

2274 handled by returning only the subset of parameters that are local 

2275 to the current column default invocation. When ``False``, the 

2276 raw parameters of the statement are returned including the 

2277 naming convention used in the case of multi-valued INSERT. 

2278 

2279 .. versionadded:: 1.2 added 

2280 :meth:`.DefaultExecutionContext.get_current_parameters` 

2281 which provides more functionality over the existing 

2282 :attr:`.DefaultExecutionContext.current_parameters` 

2283 attribute. 

2284 

2285 .. seealso:: 

2286 

2287 :attr:`.DefaultExecutionContext.current_parameters` 

2288 

2289 :ref:`context_default_functions` 

2290 

2291 """ 

2292 try: 

2293 parameters = self.current_parameters 

2294 column = self.current_column 

2295 except AttributeError: 

2296 raise exc.InvalidRequestError( 

2297 "get_current_parameters() can only be invoked in the " 

2298 "context of a Python side column default function" 

2299 ) 

2300 else: 

2301 assert column is not None 

2302 assert parameters is not None 

2303 compile_state = cast( 

2304 "DMLState", cast(SQLCompiler, self.compiled).compile_state 

2305 ) 

2306 assert compile_state is not None 

2307 if ( 

2308 isolate_multiinsert_groups 

2309 and dml.isinsert(compile_state) 

2310 and compile_state._has_multi_parameters 

2311 ): 

2312 if column._is_multiparam_column: 

2313 index = column.index + 1 

2314 d = {column.original.key: parameters[column.key]} 

2315 else: 

2316 d = {column.key: parameters[column.key]} 

2317 index = 0 

2318 assert compile_state._dict_parameters is not None 

2319 keys = compile_state._dict_parameters.keys() 

2320 d.update( 

2321 (key, parameters["%s_m%d" % (key, index)]) for key in keys 

2322 ) 

2323 return d 

2324 else: 

2325 return parameters 

2326 

2327 def get_insert_default(self, column): 

2328 if column.default is None: 

2329 return None 

2330 else: 

2331 return self._exec_default(column, column.default, column.type) 

2332 

2333 def get_update_default(self, column): 

2334 if column.onupdate is None: 

2335 return None 

2336 else: 

2337 return self._exec_default(column, column.onupdate, column.type) 

2338 

2339 def _process_execute_defaults(self): 

2340 compiled = cast(SQLCompiler, self.compiled) 

2341 

2342 key_getter = compiled._within_exec_param_key_getter 

2343 

2344 sentinel_counter = 0 

2345 

2346 if compiled.insert_prefetch: 

2347 prefetch_recs = [ 

2348 ( 

2349 c, 

2350 key_getter(c), 

2351 c._default_description_tuple, 

2352 self.get_insert_default, 

2353 ) 

2354 for c in compiled.insert_prefetch 

2355 ] 

2356 elif compiled.update_prefetch: 

2357 prefetch_recs = [ 

2358 ( 

2359 c, 

2360 key_getter(c), 

2361 c._onupdate_description_tuple, 

2362 self.get_update_default, 

2363 ) 

2364 for c in compiled.update_prefetch 

2365 ] 

2366 else: 

2367 prefetch_recs = [] 

2368 

2369 for param in self.compiled_parameters: 

2370 self.current_parameters = param 

2371 

2372 for ( 

2373 c, 

2374 param_key, 

2375 (arg, is_scalar, is_callable, is_sentinel), 

2376 fallback, 

2377 ) in prefetch_recs: 

2378 if is_sentinel: 

2379 param[param_key] = sentinel_counter 

2380 sentinel_counter += 1 

2381 elif is_scalar: 

2382 param[param_key] = arg 

2383 elif is_callable: 

2384 self.current_column = c 

2385 param[param_key] = arg(self) 

2386 else: 

2387 val = fallback(c) 

2388 if val is not None: 

2389 param[param_key] = val 

2390 

2391 del self.current_parameters 

2392 

2393 

2394DefaultDialect.execution_ctx_cls = DefaultExecutionContext