Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1059 statements  

1# engine/default.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Default implementations of per-dialect sqlalchemy.engine classes. 

10 

11These are semi-private implementation classes which are only of importance 

12to database dialect authors; dialects will usually use the classes here 

13as the base class for their own corresponding classes. 

14 

15""" 

16 

17from __future__ import annotations 

18 

19import functools 

20import operator 

21import random 

22import re 

23from time import perf_counter 

24import typing 

25from typing import Any 

26from typing import Callable 

27from typing import cast 

28from typing import Dict 

29from typing import Final 

30from typing import List 

31from typing import Literal 

32from typing import Mapping 

33from typing import MutableMapping 

34from typing import MutableSequence 

35from typing import Optional 

36from typing import Sequence 

37from typing import Set 

38from typing import Tuple 

39from typing import Type 

40from typing import TYPE_CHECKING 

41from typing import Union 

42import weakref 

43 

44from . import characteristics 

45from . import cursor as _cursor 

46from . import interfaces 

47from .base import Connection 

48from .interfaces import CacheStats 

49from .interfaces import DBAPICursor 

50from .interfaces import Dialect 

51from .interfaces import ExecuteStyle 

52from .interfaces import ExecutionContext 

53from .reflection import ObjectKind 

54from .reflection import ObjectScope 

55from .. import event 

56from .. import exc 

57from .. import pool 

58from .. import util 

59from ..sql import compiler 

60from ..sql import dml 

61from ..sql import expression 

62from ..sql import type_api 

63from ..sql import util as sql_util 

64from ..sql._typing import is_tuple_type 

65from ..sql.base import _NoArg 

66from ..sql.compiler import AggregateOrderByStyle 

67from ..sql.compiler import DDLCompiler 

68from ..sql.compiler import InsertmanyvaluesSentinelOpts 

69from ..sql.compiler import SQLCompiler 

70from ..sql.elements import quoted_name 

71from ..util.typing import TupleAny 

72from ..util.typing import Unpack 

73 

74if typing.TYPE_CHECKING: 

75 from .base import Engine 

76 from .cursor import ResultFetchStrategy 

77 from .interfaces import _CoreMultiExecuteParams 

78 from .interfaces import _CoreSingleExecuteParams 

79 from .interfaces import _DBAPICursorDescription 

80 from .interfaces import _DBAPIMultiExecuteParams 

81 from .interfaces import _DBAPISingleExecuteParams 

82 from .interfaces import _ExecuteOptions 

83 from .interfaces import _MutableCoreSingleExecuteParams 

84 from .interfaces import _ParamStyle 

85 from .interfaces import ConnectArgsType 

86 from .interfaces import DBAPIConnection 

87 from .interfaces import DBAPIModule 

88 from .interfaces import DBAPIType 

89 from .interfaces import IsolationLevel 

90 from .row import Row 

91 from .url import URL 

92 from ..event import _ListenerFnType 

93 from ..pool import Pool 

94 from ..pool import PoolProxiedConnection 

95 from ..sql import Executable 

96 from ..sql.compiler import Compiled 

97 from ..sql.compiler import Linting 

98 from ..sql.compiler import ResultColumnsEntry 

99 from ..sql.dml import DMLState 

100 from ..sql.dml import UpdateBase 

101 from ..sql.elements import BindParameter 

102 from ..sql.schema import Column 

103 from ..sql.sqltypes import _JSON_VALUE 

104 from ..sql.type_api import _BindProcessorType 

105 from ..sql.type_api import _ResultProcessorType 

106 from ..sql.type_api import TypeEngine 

107 

108 

109# When we're handed literal SQL, ensure it's a SELECT query 

110SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE) 

111 

112 

113( 

114 CACHE_HIT, 

115 CACHE_MISS, 

116 CACHING_DISABLED, 

117 NO_CACHE_KEY, 

118 NO_DIALECT_SUPPORT, 

119) = list(CacheStats) 

120 

121 

122class DefaultDialect(Dialect): 

123 """Default implementation of Dialect""" 

124 

125 statement_compiler = compiler.SQLCompiler 

126 ddl_compiler = compiler.DDLCompiler 

127 type_compiler_cls = compiler.GenericTypeCompiler 

128 

129 preparer = compiler.IdentifierPreparer 

130 supports_alter = True 

131 supports_comments = False 

132 supports_constraint_comments = False 

133 inline_comments = False 

134 supports_statement_cache = True 

135 

136 div_is_floordiv = True 

137 

138 bind_typing = interfaces.BindTyping.NONE 

139 

140 include_set_input_sizes: Optional[Set[Any]] = None 

141 exclude_set_input_sizes: Optional[Set[Any]] = None 

142 

143 # the first value we'd get for an autoincrement column. 

144 default_sequence_base = 1 

145 

146 # most DBAPIs happy with this for execute(). 

147 # not cx_oracle. 

148 execute_sequence_format = tuple 

149 

150 supports_schemas = True 

151 supports_views = True 

152 supports_sequences = False 

153 sequences_optional = False 

154 preexecute_autoincrement_sequences = False 

155 supports_identity_columns = False 

156 postfetch_lastrowid = True 

157 favor_returning_over_lastrowid = False 

158 insert_null_pk_still_autoincrements = False 

159 update_returning = False 

160 delete_returning = False 

161 update_returning_multifrom = False 

162 delete_returning_multifrom = False 

163 insert_returning = False 

164 

165 aggregate_order_by_style = AggregateOrderByStyle.INLINE 

166 

167 cte_follows_insert = False 

168 

169 supports_native_enum = False 

170 supports_native_boolean = False 

171 supports_native_uuid = False 

172 returns_native_bytes = False 

173 

174 supports_native_json_serialization = False 

175 supports_native_json_deserialization = False 

176 dialect_injects_custom_json_deserializer = False 

177 _json_serializer: Callable[[_JSON_VALUE], str] | None = None 

178 

179 _json_deserializer: Callable[[str], _JSON_VALUE] | None = None 

180 

181 non_native_boolean_check_constraint = True 

182 

183 supports_simple_order_by_label = True 

184 

185 tuple_in_values = False 

186 

187 connection_characteristics = util.immutabledict( 

188 { 

189 "isolation_level": characteristics.IsolationLevelCharacteristic(), 

190 "logging_token": characteristics.LoggingTokenCharacteristic(), 

191 } 

192 ) 

193 

194 engine_config_types: Mapping[str, Any] = util.immutabledict( 

195 { 

196 "pool_timeout": util.asint, 

197 "echo": util.bool_or_str("debug"), 

198 "echo_pool": util.bool_or_str("debug"), 

199 "pool_recycle": util.asint, 

200 "pool_size": util.asint, 

201 "max_overflow": util.asint, 

202 "future": util.asbool, 

203 } 

204 ) 

205 

206 # if the NUMERIC type 

207 # returns decimal.Decimal. 

208 # *not* the FLOAT type however. 

209 supports_native_decimal = False 

210 

211 name = "default" 

212 

213 # length at which to truncate 

214 # any identifier. 

215 max_identifier_length = 9999 

216 _user_defined_max_identifier_length: Optional[int] = None 

217 

218 isolation_level: Optional[str] = None 

219 

220 # sub-categories of max_identifier_length. 

221 # currently these accommodate for MySQL which allows alias names 

222 # of 255 but DDL names only of 64. 

223 max_index_name_length: Optional[int] = None 

224 max_constraint_name_length: Optional[int] = None 

225 

226 supports_sane_rowcount = True 

227 supports_sane_multi_rowcount = True 

228 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {} 

229 default_paramstyle = "named" 

230 

231 supports_default_values = False 

232 """dialect supports INSERT... DEFAULT VALUES syntax""" 

233 

234 supports_default_metavalue = False 

235 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

236 

237 default_metavalue_token = "DEFAULT" 

238 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

239 parenthesis.""" 

240 

241 # not sure if this is a real thing but the compiler will deliver it 

242 # if this is the only flag enabled. 

243 supports_empty_insert = True 

244 """dialect supports INSERT () VALUES ()""" 

245 

246 supports_multivalues_insert = False 

247 

248 use_insertmanyvalues: bool = False 

249 

250 use_insertmanyvalues_wo_returning: bool = False 

251 

252 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = ( 

253 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED 

254 ) 

255 

256 insertmanyvalues_page_size: int = 1000 

257 insertmanyvalues_max_parameters = 32700 

258 

259 supports_is_distinct_from = True 

260 

261 supports_server_side_cursors = False 

262 

263 server_side_cursors = False 

264 

265 # extra record-level locking features (#4860) 

266 supports_for_update_of = False 

267 

268 server_version_info = None 

269 

270 default_schema_name: Optional[str] = None 

271 

272 # indicates symbol names are 

273 # UPPERCASED if they are case insensitive 

274 # within the database. 

275 # if this is True, the methods normalize_name() 

276 # and denormalize_name() must be provided. 

277 requires_name_normalize = False 

278 

279 is_async = False 

280 

281 has_terminate = False 

282 

283 # TODO: this is not to be part of 2.0. implement rudimentary binary 

284 # literals for SQLite, PostgreSQL, MySQL only within 

285 # _Binary.literal_processor 

286 _legacy_binary_type_literal_encoding = "utf-8" 

287 

288 @util.deprecated_params( 

289 empty_in_strategy=( 

290 "1.4", 

291 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is " 

292 "deprecated, and no longer has any effect. All IN expressions " 

293 "are now rendered using " 

294 'the "expanding parameter" strategy which renders a set of bound' 

295 'expressions, or an "empty set" SELECT, at statement execution' 

296 "time.", 

297 ), 

298 server_side_cursors=( 

299 "1.4", 

300 "The :paramref:`_sa.create_engine.server_side_cursors` parameter " 

301 "is deprecated and will be removed in a future release. Please " 

302 "use the " 

303 ":paramref:`_engine.Connection.execution_options.stream_results` " 

304 "parameter.", 

305 ), 

306 ) 

307 def __init__( 

308 self, 

309 paramstyle: Optional[_ParamStyle] = None, 

310 isolation_level: Optional[IsolationLevel] = None, 

311 dbapi: Optional[DBAPIModule] = None, 

312 implicit_returning: Literal[True] = True, 

313 supports_native_boolean: Optional[bool] = None, 

314 max_identifier_length: Optional[int] = None, 

315 label_length: Optional[int] = None, 

316 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG, 

317 use_insertmanyvalues: Optional[bool] = None, 

318 # util.deprecated_params decorator cannot render the 

319 # Linting.NO_LINTING constant 

320 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore 

321 server_side_cursors: bool = False, 

322 skip_autocommit_rollback: bool = False, 

323 **kwargs: Any, 

324 ): 

325 if server_side_cursors: 

326 if not self.supports_server_side_cursors: 

327 raise exc.ArgumentError( 

328 "Dialect %s does not support server side cursors" % self 

329 ) 

330 else: 

331 self.server_side_cursors = True 

332 

333 if getattr(self, "use_setinputsizes", False): 

334 util.warn_deprecated( 

335 "The dialect-level use_setinputsizes attribute is " 

336 "deprecated. Please use " 

337 "bind_typing = BindTyping.SETINPUTSIZES", 

338 "2.0", 

339 ) 

340 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES 

341 

342 self.positional = False 

343 self._ischema = None 

344 

345 self.dbapi = dbapi 

346 

347 self.skip_autocommit_rollback = skip_autocommit_rollback 

348 

349 if paramstyle is not None: 

350 self.paramstyle = paramstyle 

351 elif self.dbapi is not None: 

352 self.paramstyle = self.dbapi.paramstyle 

353 else: 

354 self.paramstyle = self.default_paramstyle 

355 self.positional = self.paramstyle in ( 

356 "qmark", 

357 "format", 

358 "numeric", 

359 "numeric_dollar", 

360 ) 

361 self.identifier_preparer = self.preparer(self) 

362 self._on_connect_isolation_level = isolation_level 

363 

364 legacy_tt_callable = getattr(self, "type_compiler", None) 

365 if legacy_tt_callable is not None: 

366 tt_callable = cast( 

367 Type[compiler.GenericTypeCompiler], 

368 self.type_compiler, 

369 ) 

370 else: 

371 tt_callable = self.type_compiler_cls 

372 

373 self.type_compiler_instance = self.type_compiler = tt_callable(self) 

374 

375 if supports_native_boolean is not None: 

376 self.supports_native_boolean = supports_native_boolean 

377 

378 self._user_defined_max_identifier_length = max_identifier_length 

379 if self._user_defined_max_identifier_length: 

380 self.max_identifier_length = ( 

381 self._user_defined_max_identifier_length 

382 ) 

383 self.label_length = label_length 

384 self.compiler_linting = compiler_linting 

385 

386 if use_insertmanyvalues is not None: 

387 self.use_insertmanyvalues = use_insertmanyvalues 

388 

389 if insertmanyvalues_page_size is not _NoArg.NO_ARG: 

390 self.insertmanyvalues_page_size = insertmanyvalues_page_size 

391 

392 @property 

393 @util.deprecated( 

394 "2.0", 

395 "full_returning is deprecated, please use insert_returning, " 

396 "update_returning, delete_returning", 

397 ) 

398 def full_returning(self): 

399 return ( 

400 self.insert_returning 

401 and self.update_returning 

402 and self.delete_returning 

403 ) 

404 

405 @util.memoized_property 

406 def insert_executemany_returning(self): 

407 """Default implementation for insert_executemany_returning, if not 

408 otherwise overridden by the specific dialect. 

409 

410 The default dialect determines "insert_executemany_returning" is 

411 available if the dialect in use has opted into using the 

412 "use_insertmanyvalues" feature. If they haven't opted into that, then 

413 this attribute is False, unless the dialect in question overrides this 

414 and provides some other implementation (such as the Oracle Database 

415 dialects). 

416 

417 """ 

418 return self.insert_returning and self.use_insertmanyvalues 

419 

420 @util.memoized_property 

421 def insert_executemany_returning_sort_by_parameter_order(self): 

422 """Default implementation for 

423 insert_executemany_returning_deterministic_order, if not otherwise 

424 overridden by the specific dialect. 

425 

426 The default dialect determines "insert_executemany_returning" can have 

427 deterministic order only if the dialect in use has opted into using the 

428 "use_insertmanyvalues" feature, which implements deterministic ordering 

429 using client side sentinel columns only by default. The 

430 "insertmanyvalues" feature also features alternate forms that can 

431 use server-generated PK values as "sentinels", but those are only 

432 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel` 

433 bitflag enables those alternate SQL forms, which are disabled 

434 by default. 

435 

436 If the dialect in use hasn't opted into that, then this attribute is 

437 False, unless the dialect in question overrides this and provides some 

438 other implementation (such as the Oracle Database dialects). 

439 

440 """ 

441 return self.insert_returning and self.use_insertmanyvalues 

442 

443 update_executemany_returning = False 

444 delete_executemany_returning = False 

445 

446 @util.memoized_property 

447 def loaded_dbapi(self) -> DBAPIModule: 

448 if self.dbapi is None: 

449 raise exc.InvalidRequestError( 

450 f"Dialect {self} does not have a Python DBAPI established " 

451 "and cannot be used for actual database interaction" 

452 ) 

453 return self.dbapi 

454 

455 @util.memoized_property 

456 def _bind_typing_render_casts(self): 

457 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS 

458 

459 def _ensure_has_table_connection(self, arg: Connection) -> None: 

460 if not isinstance(arg, Connection): 

461 raise exc.ArgumentError( 

462 "The argument passed to Dialect.has_table() should be a " 

463 "%s, got %s. " 

464 "Additionally, the Dialect.has_table() method is for " 

465 "internal dialect " 

466 "use only; please use " 

467 "``inspect(some_engine).has_table(<tablename>>)`` " 

468 "for public API use." % (Connection, type(arg)) 

469 ) 

470 

471 @util.memoized_property 

472 def _supports_statement_cache(self): 

473 ssc = self.__class__.__dict__.get("supports_statement_cache", None) 

474 if ssc is None: 

475 util.warn( 

476 "Dialect %s:%s will not make use of SQL compilation caching " 

477 "as it does not set the 'supports_statement_cache' attribute " 

478 "to ``True``. This can have " 

479 "significant performance implications including some " 

480 "performance degradations in comparison to prior SQLAlchemy " 

481 "versions. Dialect maintainers should seek to set this " 

482 "attribute to True after appropriate development and testing " 

483 "for SQLAlchemy 1.4 caching support. Alternatively, this " 

484 "attribute may be set to False which will disable this " 

485 "warning." % (self.name, self.driver), 

486 code="cprf", 

487 ) 

488 

489 return bool(ssc) 

490 

491 @util.memoized_property 

492 def _type_memos(self): 

493 return weakref.WeakKeyDictionary() 

494 

495 @property 

496 def dialect_description(self): # type: ignore[override] 

497 return self.name + "+" + self.driver 

498 

499 @property 

500 def supports_sane_rowcount_returning(self): 

501 """True if this dialect supports sane rowcount even if RETURNING is 

502 in use. 

503 

504 For dialects that don't support RETURNING, this is synonymous with 

505 ``supports_sane_rowcount``. 

506 

507 """ 

508 return self.supports_sane_rowcount 

509 

510 @classmethod 

511 def get_pool_class(cls, url: URL) -> Type[Pool]: 

512 default: Type[pool.Pool] 

513 if cls.is_async: 

514 default = pool.AsyncAdaptedQueuePool 

515 else: 

516 default = pool.QueuePool 

517 

518 return getattr(cls, "poolclass", default) 

519 

520 def get_dialect_pool_class(self, url: URL) -> Type[Pool]: 

521 return self.get_pool_class(url) 

522 

523 @classmethod 

524 def load_provisioning(cls): 

525 package = ".".join(cls.__module__.split(".")[0:-1]) 

526 try: 

527 __import__(package + ".provision") 

528 except ImportError: 

529 pass 

530 

531 def _builtin_onconnect(self) -> Optional[_ListenerFnType]: 

532 if self._on_connect_isolation_level is not None: 

533 

534 def builtin_connect(dbapi_conn, conn_rec): 

535 self._assert_and_set_isolation_level( 

536 dbapi_conn, self._on_connect_isolation_level 

537 ) 

538 

539 return builtin_connect 

540 else: 

541 return None 

542 

543 def initialize(self, connection: Connection) -> None: 

544 try: 

545 self.server_version_info = self._get_server_version_info( 

546 connection 

547 ) 

548 except NotImplementedError: 

549 self.server_version_info = None 

550 try: 

551 self.default_schema_name = self._get_default_schema_name( 

552 connection 

553 ) 

554 except NotImplementedError: 

555 self.default_schema_name = None 

556 

557 try: 

558 self.default_isolation_level = self.get_default_isolation_level( 

559 connection.connection.dbapi_connection 

560 ) 

561 except NotImplementedError: 

562 self.default_isolation_level = None 

563 

564 if not self._user_defined_max_identifier_length: 

565 max_ident_length = self._check_max_identifier_length(connection) 

566 if max_ident_length: 

567 self.max_identifier_length = max_ident_length 

568 

569 if ( 

570 self.label_length 

571 and self.label_length > self.max_identifier_length 

572 ): 

573 raise exc.ArgumentError( 

574 "Label length of %d is greater than this dialect's" 

575 " maximum identifier length of %d" 

576 % (self.label_length, self.max_identifier_length) 

577 ) 

578 

579 def on_connect(self) -> Optional[Callable[[Any], None]]: 

580 # inherits the docstring from interfaces.Dialect.on_connect 

581 return None 

582 

583 def _check_max_identifier_length(self, connection): 

584 """Perform a connection / server version specific check to determine 

585 the max_identifier_length. 

586 

587 If the dialect's class level max_identifier_length should be used, 

588 can return None. 

589 

590 """ 

591 return None 

592 

593 def get_default_isolation_level(self, dbapi_conn): 

594 """Given a DBAPI connection, return its isolation level, or 

595 a default isolation level if one cannot be retrieved. 

596 

597 May be overridden by subclasses in order to provide a 

598 "fallback" isolation level for databases that cannot reliably 

599 retrieve the actual isolation level. 

600 

601 By default, calls the :meth:`_engine.Interfaces.get_isolation_level` 

602 method, propagating any exceptions raised. 

603 

604 """ 

605 return self.get_isolation_level(dbapi_conn) 

606 

607 def type_descriptor(self, typeobj): 

608 """Provide a database-specific :class:`.TypeEngine` object, given 

609 the generic object which comes from the types module. 

610 

611 This method looks for a dictionary called 

612 ``colspecs`` as a class or instance-level variable, 

613 and passes on to :func:`_types.adapt_type`. 

614 

615 """ 

616 return type_api.adapt_type(typeobj, self.colspecs) 

617 

618 def has_index(self, connection, table_name, index_name, schema=None, **kw): 

619 if not self.has_table(connection, table_name, schema=schema, **kw): 

620 return False 

621 for idx in self.get_indexes( 

622 connection, table_name, schema=schema, **kw 

623 ): 

624 if idx["name"] == index_name: 

625 return True 

626 else: 

627 return False 

628 

629 def has_schema( 

630 self, connection: Connection, schema_name: str, **kw: Any 

631 ) -> bool: 

632 return schema_name in self.get_schema_names(connection, **kw) 

633 

634 def validate_identifier(self, ident: str) -> None: 

635 if len(ident) > self.max_identifier_length: 

636 raise exc.IdentifierError( 

637 "Identifier '%s' exceeds maximum length of %d characters" 

638 % (ident, self.max_identifier_length) 

639 ) 

640 

641 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection: 

642 # inherits the docstring from interfaces.Dialect.connect 

643 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501 

644 

645 def create_connect_args(self, url: URL) -> ConnectArgsType: 

646 # inherits the docstring from interfaces.Dialect.create_connect_args 

647 opts = url.translate_connect_args() 

648 opts.update(url.query) 

649 return ([], opts) 

650 

651 def set_engine_execution_options( 

652 self, engine: Engine, opts: Mapping[str, Any] 

653 ) -> None: 

654 supported_names = set(self.connection_characteristics).intersection( 

655 opts 

656 ) 

657 if supported_names: 

658 characteristics: Mapping[str, Any] = util.immutabledict( 

659 (name, opts[name]) for name in supported_names 

660 ) 

661 

662 @event.listens_for(engine, "engine_connect") 

663 def set_connection_characteristics(connection): 

664 self._set_connection_characteristics( 

665 connection, characteristics 

666 ) 

667 

668 def set_connection_execution_options( 

669 self, connection: Connection, opts: Mapping[str, Any] 

670 ) -> None: 

671 supported_names = set(self.connection_characteristics).intersection( 

672 opts 

673 ) 

674 if supported_names: 

675 characteristics: Mapping[str, Any] = util.immutabledict( 

676 (name, opts[name]) for name in supported_names 

677 ) 

678 self._set_connection_characteristics(connection, characteristics) 

679 

680 def _set_connection_characteristics(self, connection, characteristics): 

681 characteristic_values = [ 

682 (name, self.connection_characteristics[name], value) 

683 for name, value in characteristics.items() 

684 ] 

685 

686 if connection.in_transaction(): 

687 trans_objs = [ 

688 (name, obj) 

689 for name, obj, _ in characteristic_values 

690 if obj.transactional 

691 ] 

692 if trans_objs: 

693 raise exc.InvalidRequestError( 

694 "This connection has already initialized a SQLAlchemy " 

695 "Transaction() object via begin() or autobegin; " 

696 "%s may not be altered unless rollback() or commit() " 

697 "is called first." 

698 % (", ".join(name for name, obj in trans_objs)) 

699 ) 

700 

701 dbapi_connection = connection.connection.dbapi_connection 

702 for _, characteristic, value in characteristic_values: 

703 characteristic.set_connection_characteristic( 

704 self, connection, dbapi_connection, value 

705 ) 

706 connection.connection._connection_record.finalize_callback.append( 

707 functools.partial(self._reset_characteristics, characteristics) 

708 ) 

709 

710 def _reset_characteristics(self, characteristics, dbapi_connection): 

711 for characteristic_name in characteristics: 

712 characteristic = self.connection_characteristics[ 

713 characteristic_name 

714 ] 

715 characteristic.reset_characteristic(self, dbapi_connection) 

716 

717 def do_begin(self, dbapi_connection): 

718 pass 

719 

720 def do_rollback(self, dbapi_connection): 

721 if self.skip_autocommit_rollback and self.detect_autocommit_setting( 

722 dbapi_connection 

723 ): 

724 return 

725 dbapi_connection.rollback() 

726 

727 def do_commit(self, dbapi_connection): 

728 dbapi_connection.commit() 

729 

730 def do_terminate(self, dbapi_connection): 

731 self.do_close(dbapi_connection) 

732 

733 def do_close(self, dbapi_connection): 

734 dbapi_connection.close() 

735 

736 @util.memoized_property 

737 def _dialect_specific_select_one(self): 

738 return str(expression.select(1).compile(dialect=self)) 

739 

740 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: 

741 try: 

742 return self.do_ping(dbapi_connection) 

743 except self.loaded_dbapi.Error as err: 

744 is_disconnect = self.is_disconnect(err, dbapi_connection, None) 

745 

746 if self._has_events: 

747 try: 

748 Connection._handle_dbapi_exception_noconnection( 

749 err, 

750 self, 

751 is_disconnect=is_disconnect, 

752 invalidate_pool_on_disconnect=False, 

753 is_pre_ping=True, 

754 ) 

755 except exc.StatementError as new_err: 

756 is_disconnect = new_err.connection_invalidated 

757 

758 if is_disconnect: 

759 return False 

760 else: 

761 raise 

762 

763 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool: 

764 cursor = dbapi_connection.cursor() 

765 try: 

766 cursor.execute(self._dialect_specific_select_one) 

767 finally: 

768 cursor.close() 

769 return True 

770 

771 def create_xid(self): 

772 """Create a random two-phase transaction ID. 

773 

774 This id will be passed to do_begin_twophase(), do_rollback_twophase(), 

775 do_commit_twophase(). Its format is unspecified. 

776 """ 

777 

778 return "_sa_%032x" % random.randint(0, 2**128) 

779 

780 def do_savepoint(self, connection, name): 

781 connection.execute(expression.SavepointClause(name)) 

782 

783 def do_rollback_to_savepoint(self, connection, name): 

784 connection.execute(expression.RollbackToSavepointClause(name)) 

785 

786 def do_release_savepoint(self, connection, name): 

787 connection.execute(expression.ReleaseSavepointClause(name)) 

788 

789 def _deliver_insertmanyvalues_batches( 

790 self, 

791 connection, 

792 cursor, 

793 statement, 

794 parameters, 

795 generic_setinputsizes, 

796 context, 

797 ): 

798 context = cast(DefaultExecutionContext, context) 

799 compiled = cast(SQLCompiler, context.compiled) 

800 

801 _composite_sentinel_proc: Sequence[ 

802 Optional[_ResultProcessorType[Any]] 

803 ] = () 

804 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None 

805 _sentinel_proc_initialized: bool = False 

806 

807 compiled_parameters = context.compiled_parameters 

808 

809 imv = compiled._insertmanyvalues 

810 assert imv is not None 

811 

812 is_returning: Final[bool] = bool(compiled.effective_returning) 

813 batch_size = context.execution_options.get( 

814 "insertmanyvalues_page_size", self.insertmanyvalues_page_size 

815 ) 

816 

817 if compiled.schema_translate_map: 

818 schema_translate_map = context.execution_options.get( 

819 "schema_translate_map", {} 

820 ) 

821 else: 

822 schema_translate_map = None 

823 

824 if is_returning: 

825 result: Optional[List[Any]] = [] 

826 context._insertmanyvalues_rows = result 

827 

828 sort_by_parameter_order = imv.sort_by_parameter_order 

829 

830 else: 

831 sort_by_parameter_order = False 

832 result = None 

833 

834 for imv_batch in compiled._deliver_insertmanyvalues_batches( 

835 statement, 

836 parameters, 

837 compiled_parameters, 

838 generic_setinputsizes, 

839 batch_size, 

840 sort_by_parameter_order, 

841 schema_translate_map, 

842 ): 

843 yield imv_batch 

844 

845 if is_returning: 

846 

847 try: 

848 rows = context.fetchall_for_returning(cursor) 

849 except BaseException as be: 

850 connection._handle_dbapi_exception( 

851 be, 

852 sql_util._long_statement(imv_batch.replaced_statement), 

853 imv_batch.replaced_parameters, 

854 None, 

855 context, 

856 is_sub_exec=True, 

857 ) 

858 

859 # I would have thought "is_returning: Final[bool]" 

860 # would have assured this but pylance thinks not 

861 assert result is not None 

862 

863 if imv.num_sentinel_columns and not imv_batch.is_downgraded: 

864 composite_sentinel = imv.num_sentinel_columns > 1 

865 if imv.implicit_sentinel: 

866 # for implicit sentinel, which is currently single-col 

867 # integer autoincrement, do a simple sort. 

868 assert not composite_sentinel 

869 result.extend( 

870 sorted(rows, key=operator.itemgetter(-1)) 

871 ) 

872 continue 

873 

874 # otherwise, create dictionaries to match up batches 

875 # with parameters 

876 assert imv.sentinel_param_keys 

877 assert imv.sentinel_columns 

878 

879 _nsc = imv.num_sentinel_columns 

880 

881 if not _sentinel_proc_initialized: 

882 if composite_sentinel: 

883 _composite_sentinel_proc = [ 

884 col.type._cached_result_processor( 

885 self, cursor_desc[1] 

886 ) 

887 for col, cursor_desc in zip( 

888 imv.sentinel_columns, 

889 cursor.description[-_nsc:], 

890 ) 

891 ] 

892 else: 

893 _scalar_sentinel_proc = ( 

894 imv.sentinel_columns[0] 

895 ).type._cached_result_processor( 

896 self, cursor.description[-1][1] 

897 ) 

898 _sentinel_proc_initialized = True 

899 

900 rows_by_sentinel: Union[ 

901 Dict[Tuple[Any, ...], Any], 

902 Dict[Any, Any], 

903 ] 

904 

905 if composite_sentinel: 

906 rows_by_sentinel = { 

907 tuple( 

908 (proc(val) if proc else val) 

909 for val, proc in zip( 

910 row[-_nsc:], _composite_sentinel_proc 

911 ) 

912 ): row 

913 for row in rows 

914 } 

915 elif _scalar_sentinel_proc: 

916 rows_by_sentinel = { 

917 _scalar_sentinel_proc(row[-1]): row for row in rows 

918 } 

919 else: 

920 rows_by_sentinel = {row[-1]: row for row in rows} 

921 

922 if len(rows_by_sentinel) != len(imv_batch.batch): 

923 # see test_insert_exec.py:: 

924 # IMVSentinelTest::test_sentinel_incorrect_rowcount 

925 # for coverage / demonstration 

926 raise exc.InvalidRequestError( 

927 f"Sentinel-keyed result set did not produce " 

928 f"correct number of rows {len(imv_batch.batch)}; " 

929 "produced " 

930 f"{len(rows_by_sentinel)}. Please ensure the " 

931 "sentinel column is fully unique and populated in " 

932 "all cases." 

933 ) 

934 

935 try: 

936 ordered_rows = [ 

937 rows_by_sentinel[sentinel_keys] 

938 for sentinel_keys in imv_batch.sentinel_values 

939 ] 

940 except KeyError as ke: 

941 # see test_insert_exec.py:: 

942 # IMVSentinelTest::test_sentinel_cant_match_keys 

943 # for coverage / demonstration 

944 raise exc.InvalidRequestError( 

945 f"Can't match sentinel values in result set to " 

946 f"parameter sets; key {ke.args[0]!r} was not " 

947 "found. " 

948 "There may be a mismatch between the datatype " 

949 "passed to the DBAPI driver vs. that which it " 

950 "returns in a result row. Ensure the given " 

951 "Python value matches the expected result type " 

952 "*exactly*, taking care to not rely upon implicit " 

953 "conversions which may occur such as when using " 

954 "strings in place of UUID or integer values, etc. " 

955 ) from ke 

956 

957 result.extend(ordered_rows) 

958 

959 else: 

960 result.extend(rows) 

961 

962 def do_executemany(self, cursor, statement, parameters, context=None): 

963 cursor.executemany(statement, parameters) 

964 

965 def do_execute(self, cursor, statement, parameters, context=None): 

966 cursor.execute(statement, parameters) 

967 

968 def do_execute_no_params(self, cursor, statement, context=None): 

969 cursor.execute(statement) 

970 

971 def is_disconnect( 

972 self, 

973 e: DBAPIModule.Error, 

974 connection: Union[ 

975 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None 

976 ], 

977 cursor: Optional[interfaces.DBAPICursor], 

978 ) -> bool: 

979 return False 

980 

981 @util.memoized_instancemethod 

982 def _gen_allowed_isolation_levels(self, dbapi_conn): 

983 try: 

984 raw_levels = list(self.get_isolation_level_values(dbapi_conn)) 

985 except NotImplementedError: 

986 return None 

987 else: 

988 normalized_levels = [ 

989 level.replace("_", " ").upper() for level in raw_levels 

990 ] 

991 if raw_levels != normalized_levels: 

992 raise ValueError( 

993 f"Dialect {self.name!r} get_isolation_level_values() " 

994 f"method should return names as UPPERCASE using spaces, " 

995 f"not underscores; got " 

996 f"{sorted(set(raw_levels).difference(normalized_levels))}" 

997 ) 

998 return tuple(normalized_levels) 

999 

1000 def _assert_and_set_isolation_level(self, dbapi_conn, level): 

1001 level = level.replace("_", " ").upper() 

1002 

1003 _allowed_isolation_levels = self._gen_allowed_isolation_levels( 

1004 dbapi_conn 

1005 ) 

1006 if ( 

1007 _allowed_isolation_levels 

1008 and level not in _allowed_isolation_levels 

1009 ): 

1010 raise exc.ArgumentError( 

1011 f"Invalid value {level!r} for isolation_level. " 

1012 f"Valid isolation levels for {self.name!r} are " 

1013 f"{', '.join(_allowed_isolation_levels)}" 

1014 ) 

1015 

1016 self.set_isolation_level(dbapi_conn, level) 

1017 

1018 def reset_isolation_level(self, dbapi_conn): 

1019 if self._on_connect_isolation_level is not None: 

1020 assert ( 

1021 self._on_connect_isolation_level == "AUTOCOMMIT" 

1022 or self._on_connect_isolation_level 

1023 == self.default_isolation_level 

1024 ) 

1025 self._assert_and_set_isolation_level( 

1026 dbapi_conn, self._on_connect_isolation_level 

1027 ) 

1028 else: 

1029 assert self.default_isolation_level is not None 

1030 self._assert_and_set_isolation_level( 

1031 dbapi_conn, 

1032 self.default_isolation_level, 

1033 ) 

1034 

1035 def normalize_name(self, name): 

1036 if name is None: 

1037 return None 

1038 

1039 name_lower = name.lower() 

1040 name_upper = name.upper() 

1041 

1042 if name_upper == name_lower: 

1043 # name has no upper/lower conversion, e.g. non-european characters. 

1044 # return unchanged 

1045 return name 

1046 elif name_upper == name and not ( 

1047 self.identifier_preparer._requires_quotes 

1048 )(name_lower): 

1049 # name is all uppercase and doesn't require quoting; normalize 

1050 # to all lower case 

1051 return name_lower 

1052 elif name_lower == name: 

1053 # name is all lower case, which if denormalized means we need to 

1054 # force quoting on it 

1055 return quoted_name(name, quote=True) 

1056 else: 

1057 # name is mixed case, means it will be quoted in SQL when used 

1058 # later, no normalizes 

1059 return name 

1060 

1061 def denormalize_name(self, name): 

1062 if name is None: 

1063 return None 

1064 

1065 name_lower = name.lower() 

1066 name_upper = name.upper() 

1067 

1068 if name_upper == name_lower: 

1069 # name has no upper/lower conversion, e.g. non-european characters. 

1070 # return unchanged 

1071 return name 

1072 elif name_lower == name and not ( 

1073 self.identifier_preparer._requires_quotes 

1074 )(name_lower): 

1075 name = name_upper 

1076 return name 

1077 

1078 def get_driver_connection(self, connection: DBAPIConnection) -> Any: 

1079 return connection 

1080 

1081 def _overrides_default(self, method): 

1082 return ( 

1083 getattr(type(self), method).__code__ 

1084 is not getattr(DefaultDialect, method).__code__ 

1085 ) 

1086 

1087 def _default_multi_reflect( 

1088 self, 

1089 single_tbl_method, 

1090 connection, 

1091 kind, 

1092 schema, 

1093 filter_names, 

1094 scope, 

1095 **kw, 

1096 ): 

1097 names_fns = [] 

1098 temp_names_fns = [] 

1099 if ObjectKind.TABLE in kind: 

1100 names_fns.append(self.get_table_names) 

1101 temp_names_fns.append(self.get_temp_table_names) 

1102 if ObjectKind.VIEW in kind: 

1103 names_fns.append(self.get_view_names) 

1104 temp_names_fns.append(self.get_temp_view_names) 

1105 if ObjectKind.MATERIALIZED_VIEW in kind: 

1106 names_fns.append(self.get_materialized_view_names) 

1107 # no temp materialized view at the moment 

1108 # temp_names_fns.append(self.get_temp_materialized_view_names) 

1109 

1110 unreflectable = kw.pop("unreflectable", {}) 

1111 

1112 if ( 

1113 filter_names 

1114 and scope is ObjectScope.ANY 

1115 and kind is ObjectKind.ANY 

1116 ): 

1117 # if names are given and no qualification on type of table 

1118 # (i.e. the Table(..., autoload) case), take the names as given, 

1119 # don't run names queries. If a table does not exit 

1120 # NoSuchTableError is raised and it's skipped 

1121 

1122 # this also suits the case for mssql where we can reflect 

1123 # individual temp tables but there's no temp_names_fn 

1124 names = filter_names 

1125 else: 

1126 names = [] 

1127 name_kw = {"schema": schema, **kw} 

1128 fns = [] 

1129 if ObjectScope.DEFAULT in scope: 

1130 fns.extend(names_fns) 

1131 if ObjectScope.TEMPORARY in scope: 

1132 fns.extend(temp_names_fns) 

1133 

1134 for fn in fns: 

1135 try: 

1136 names.extend(fn(connection, **name_kw)) 

1137 except NotImplementedError: 

1138 pass 

1139 

1140 if filter_names: 

1141 filter_names = set(filter_names) 

1142 

1143 # iterate over all the tables/views and call the single table method 

1144 for table in names: 

1145 if not filter_names or table in filter_names: 

1146 key = (schema, table) 

1147 try: 

1148 yield ( 

1149 key, 

1150 single_tbl_method( 

1151 connection, table, schema=schema, **kw 

1152 ), 

1153 ) 

1154 except exc.UnreflectableTableError as err: 

1155 if key not in unreflectable: 

1156 unreflectable[key] = err 

1157 except exc.NoSuchTableError: 

1158 pass 

1159 

1160 def get_multi_table_options(self, connection, **kw): 

1161 return self._default_multi_reflect( 

1162 self.get_table_options, connection, **kw 

1163 ) 

1164 

1165 def get_multi_columns(self, connection, **kw): 

1166 return self._default_multi_reflect(self.get_columns, connection, **kw) 

1167 

1168 def get_multi_pk_constraint(self, connection, **kw): 

1169 return self._default_multi_reflect( 

1170 self.get_pk_constraint, connection, **kw 

1171 ) 

1172 

1173 def get_multi_foreign_keys(self, connection, **kw): 

1174 return self._default_multi_reflect( 

1175 self.get_foreign_keys, connection, **kw 

1176 ) 

1177 

1178 def get_multi_indexes(self, connection, **kw): 

1179 return self._default_multi_reflect(self.get_indexes, connection, **kw) 

1180 

1181 def get_multi_unique_constraints(self, connection, **kw): 

1182 return self._default_multi_reflect( 

1183 self.get_unique_constraints, connection, **kw 

1184 ) 

1185 

1186 def get_multi_check_constraints(self, connection, **kw): 

1187 return self._default_multi_reflect( 

1188 self.get_check_constraints, connection, **kw 

1189 ) 

1190 

1191 def get_multi_table_comment(self, connection, **kw): 

1192 return self._default_multi_reflect( 

1193 self.get_table_comment, connection, **kw 

1194 ) 

1195 

1196 

1197class StrCompileDialect(DefaultDialect): 

1198 statement_compiler = compiler.StrSQLCompiler 

1199 ddl_compiler = compiler.DDLCompiler 

1200 type_compiler_cls = compiler.StrSQLTypeCompiler 

1201 preparer = compiler.IdentifierPreparer 

1202 

1203 insert_returning = True 

1204 update_returning = True 

1205 delete_returning = True 

1206 

1207 supports_statement_cache = True 

1208 

1209 supports_identity_columns = True 

1210 

1211 supports_sequences = True 

1212 sequences_optional = True 

1213 preexecute_autoincrement_sequences = False 

1214 

1215 supports_native_boolean = True 

1216 

1217 supports_multivalues_insert = True 

1218 supports_simple_order_by_label = True 

1219 

1220 

1221class DefaultExecutionContext(ExecutionContext): 

1222 isinsert = False 

1223 isupdate = False 

1224 isdelete = False 

1225 is_crud = False 

1226 is_text = False 

1227 isddl = False 

1228 

1229 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE 

1230 

1231 compiled: Optional[Compiled] = None 

1232 result_column_struct: Optional[ 

1233 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool] 

1234 ] = None 

1235 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None 

1236 

1237 execution_options: _ExecuteOptions = util.EMPTY_DICT 

1238 

1239 cursor_fetch_strategy = _cursor._DEFAULT_FETCH 

1240 

1241 invoked_statement: Optional[Executable] = None 

1242 

1243 _is_implicit_returning = False 

1244 _is_explicit_returning = False 

1245 _is_supplemental_returning = False 

1246 _is_server_side = False 

1247 

1248 _soft_closed = False 

1249 

1250 _rowcount: Optional[int] = None 

1251 

1252 # a hook for SQLite's translation of 

1253 # result column names 

1254 # NOTE: pyhive is using this hook, can't remove it :( 

1255 _translate_colname: Optional[ 

1256 Callable[[str], Tuple[str, Optional[str]]] 

1257 ] = None 

1258 

1259 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict() 

1260 """used by set_input_sizes(). 

1261 

1262 This collection comes from ``ExpandedState.parameter_expansion``. 

1263 

1264 """ 

1265 

1266 cache_hit = NO_CACHE_KEY 

1267 

1268 root_connection: Connection 

1269 _dbapi_connection: PoolProxiedConnection 

1270 dialect: Dialect 

1271 unicode_statement: str 

1272 cursor: DBAPICursor 

1273 compiled_parameters: List[_MutableCoreSingleExecuteParams] 

1274 parameters: _DBAPIMultiExecuteParams 

1275 extracted_parameters: Optional[Sequence[BindParameter[Any]]] 

1276 

1277 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) 

1278 

1279 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None 

1280 _num_sentinel_cols: int = 0 

1281 

1282 @classmethod 

1283 def _init_ddl( 

1284 cls, 

1285 dialect: Dialect, 

1286 connection: Connection, 

1287 dbapi_connection: PoolProxiedConnection, 

1288 execution_options: _ExecuteOptions, 

1289 compiled_ddl: DDLCompiler, 

1290 ) -> ExecutionContext: 

1291 """Initialize execution context for an ExecutableDDLElement 

1292 construct.""" 

1293 

1294 self = cls.__new__(cls) 

1295 self.root_connection = connection 

1296 self._dbapi_connection = dbapi_connection 

1297 self.dialect = connection.dialect 

1298 

1299 self.compiled = compiled = compiled_ddl 

1300 self.isddl = True 

1301 

1302 self.execution_options = execution_options 

1303 

1304 self.unicode_statement = str(compiled) 

1305 if compiled.schema_translate_map: 

1306 schema_translate_map = self.execution_options.get( 

1307 "schema_translate_map", {} 

1308 ) 

1309 

1310 rst = compiled.preparer._render_schema_translates 

1311 self.unicode_statement = rst( 

1312 self.unicode_statement, schema_translate_map 

1313 ) 

1314 

1315 self.statement = self.unicode_statement 

1316 

1317 self.cursor = self.create_cursor() 

1318 self.compiled_parameters = [] 

1319 

1320 if dialect.positional: 

1321 self.parameters = [dialect.execute_sequence_format()] 

1322 else: 

1323 self.parameters = [self._empty_dict_params] 

1324 

1325 return self 

1326 

1327 @classmethod 

1328 def _init_compiled( 

1329 cls, 

1330 dialect: Dialect, 

1331 connection: Connection, 

1332 dbapi_connection: PoolProxiedConnection, 

1333 execution_options: _ExecuteOptions, 

1334 compiled: SQLCompiler, 

1335 parameters: _CoreMultiExecuteParams, 

1336 invoked_statement: Executable, 

1337 extracted_parameters: Optional[Sequence[BindParameter[Any]]], 

1338 cache_hit: CacheStats = CacheStats.CACHING_DISABLED, 

1339 param_dict: _CoreSingleExecuteParams | None = None, 

1340 ) -> ExecutionContext: 

1341 """Initialize execution context for a Compiled construct.""" 

1342 

1343 self = cls.__new__(cls) 

1344 self.root_connection = connection 

1345 self._dbapi_connection = dbapi_connection 

1346 self.dialect = connection.dialect 

1347 self.extracted_parameters = extracted_parameters 

1348 self.invoked_statement = invoked_statement 

1349 self.compiled = compiled 

1350 self.cache_hit = cache_hit 

1351 

1352 self.execution_options = execution_options 

1353 

1354 self.result_column_struct = ( 

1355 compiled._result_columns, 

1356 compiled._ordered_columns, 

1357 compiled._textual_ordered_columns, 

1358 compiled._ad_hoc_textual, 

1359 compiled._loose_column_name_matching, 

1360 ) 

1361 

1362 self.isinsert = ii = compiled.isinsert 

1363 self.isupdate = iu = compiled.isupdate 

1364 self.isdelete = id_ = compiled.isdelete 

1365 self.is_text = compiled.isplaintext 

1366 

1367 if ii or iu or id_: 

1368 dml_statement = compiled.compile_state.statement # type: ignore 

1369 if TYPE_CHECKING: 

1370 assert isinstance(dml_statement, UpdateBase) 

1371 self.is_crud = True 

1372 self._is_explicit_returning = ier = bool(dml_statement._returning) 

1373 self._is_implicit_returning = iir = bool( 

1374 compiled.implicit_returning 

1375 ) 

1376 if iir and dml_statement._supplemental_returning: 

1377 self._is_supplemental_returning = True 

1378 

1379 # dont mix implicit and explicit returning 

1380 assert not (iir and ier) 

1381 

1382 if (ier or iir) and compiled.for_executemany: 

1383 if ii and not self.dialect.insert_executemany_returning: 

1384 raise exc.InvalidRequestError( 

1385 f"Dialect {self.dialect.dialect_description} with " 

1386 f"current server capabilities does not support " 

1387 "INSERT..RETURNING when executemany is used" 

1388 ) 

1389 elif ( 

1390 ii 

1391 and dml_statement._sort_by_parameter_order 

1392 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501 

1393 ): 

1394 raise exc.InvalidRequestError( 

1395 f"Dialect {self.dialect.dialect_description} with " 

1396 f"current server capabilities does not support " 

1397 "INSERT..RETURNING with deterministic row ordering " 

1398 "when executemany is used" 

1399 ) 

1400 elif ( 

1401 ii 

1402 and self.dialect.use_insertmanyvalues 

1403 and not compiled._insertmanyvalues 

1404 ): 

1405 raise exc.InvalidRequestError( 

1406 'Statement does not have "insertmanyvalues" ' 

1407 "enabled, can't use INSERT..RETURNING with " 

1408 "executemany in this case." 

1409 ) 

1410 elif iu and not self.dialect.update_executemany_returning: 

1411 raise exc.InvalidRequestError( 

1412 f"Dialect {self.dialect.dialect_description} with " 

1413 f"current server capabilities does not support " 

1414 "UPDATE..RETURNING when executemany is used" 

1415 ) 

1416 elif id_ and not self.dialect.delete_executemany_returning: 

1417 raise exc.InvalidRequestError( 

1418 f"Dialect {self.dialect.dialect_description} with " 

1419 f"current server capabilities does not support " 

1420 "DELETE..RETURNING when executemany is used" 

1421 ) 

1422 

1423 if not parameters: 

1424 self.compiled_parameters = [ 

1425 compiled.construct_params( 

1426 extracted_parameters=extracted_parameters, 

1427 escape_names=False, 

1428 _collected_params=param_dict, 

1429 ) 

1430 ] 

1431 else: 

1432 self.compiled_parameters = [ 

1433 compiled.construct_params( 

1434 m, 

1435 escape_names=False, 

1436 _group_number=grp, 

1437 extracted_parameters=extracted_parameters, 

1438 _collected_params=param_dict, 

1439 ) 

1440 for grp, m in enumerate(parameters) 

1441 ] 

1442 

1443 if len(parameters) > 1: 

1444 if self.isinsert and compiled._insertmanyvalues: 

1445 self.execute_style = ExecuteStyle.INSERTMANYVALUES 

1446 

1447 imv = compiled._insertmanyvalues 

1448 if imv.sentinel_columns is not None: 

1449 self._num_sentinel_cols = imv.num_sentinel_columns 

1450 else: 

1451 self.execute_style = ExecuteStyle.EXECUTEMANY 

1452 

1453 self.unicode_statement = compiled.string 

1454 

1455 self.cursor = self.create_cursor() 

1456 

1457 if self.compiled.insert_prefetch or self.compiled.update_prefetch: 

1458 self._process_execute_defaults() 

1459 

1460 processors = compiled._bind_processors 

1461 

1462 flattened_processors: Mapping[ 

1463 str, _BindProcessorType[Any] 

1464 ] = processors # type: ignore[assignment] 

1465 

1466 if compiled.literal_execute_params or compiled.post_compile_params: 

1467 if self.executemany: 

1468 raise exc.InvalidRequestError( 

1469 "'literal_execute' or 'expanding' parameters can't be " 

1470 "used with executemany()" 

1471 ) 

1472 

1473 expanded_state = compiled._process_parameters_for_postcompile( 

1474 self.compiled_parameters[0] 

1475 ) 

1476 

1477 # re-assign self.unicode_statement 

1478 self.unicode_statement = expanded_state.statement 

1479 

1480 self._expanded_parameters = expanded_state.parameter_expansion 

1481 

1482 flattened_processors = dict(processors) # type: ignore 

1483 flattened_processors.update(expanded_state.processors) 

1484 positiontup = expanded_state.positiontup 

1485 elif compiled.positional: 

1486 positiontup = self.compiled.positiontup 

1487 else: 

1488 positiontup = None 

1489 

1490 if compiled.schema_translate_map: 

1491 schema_translate_map = self.execution_options.get( 

1492 "schema_translate_map", {} 

1493 ) 

1494 rst = compiled.preparer._render_schema_translates 

1495 self.unicode_statement = rst( 

1496 self.unicode_statement, schema_translate_map 

1497 ) 

1498 

1499 # final self.unicode_statement is now assigned, encode if needed 

1500 # by dialect 

1501 self.statement = self.unicode_statement 

1502 

1503 # Convert the dictionary of bind parameter values 

1504 # into a dict or list to be sent to the DBAPI's 

1505 # execute() or executemany() method. 

1506 

1507 if compiled.positional: 

1508 core_positional_parameters: MutableSequence[Sequence[Any]] = [] 

1509 assert positiontup is not None 

1510 for compiled_params in self.compiled_parameters: 

1511 l_param: List[Any] = [ 

1512 ( 

1513 flattened_processors[key](compiled_params[key]) 

1514 if key in flattened_processors 

1515 else compiled_params[key] 

1516 ) 

1517 for key in positiontup 

1518 ] 

1519 core_positional_parameters.append( 

1520 dialect.execute_sequence_format(l_param) 

1521 ) 

1522 

1523 self.parameters = core_positional_parameters 

1524 else: 

1525 core_dict_parameters: MutableSequence[Dict[str, Any]] = [] 

1526 escaped_names = compiled.escaped_bind_names 

1527 

1528 # note that currently, "expanded" parameters will be present 

1529 # in self.compiled_parameters in their quoted form. This is 

1530 # slightly inconsistent with the approach taken as of 

1531 # #8056 where self.compiled_parameters is meant to contain unquoted 

1532 # param names. 

1533 d_param: Dict[str, Any] 

1534 for compiled_params in self.compiled_parameters: 

1535 if escaped_names: 

1536 d_param = { 

1537 escaped_names.get(key, key): ( 

1538 flattened_processors[key](compiled_params[key]) 

1539 if key in flattened_processors 

1540 else compiled_params[key] 

1541 ) 

1542 for key in compiled_params 

1543 } 

1544 else: 

1545 d_param = { 

1546 key: ( 

1547 flattened_processors[key](compiled_params[key]) 

1548 if key in flattened_processors 

1549 else compiled_params[key] 

1550 ) 

1551 for key in compiled_params 

1552 } 

1553 

1554 core_dict_parameters.append(d_param) 

1555 

1556 self.parameters = core_dict_parameters 

1557 

1558 return self 

1559 

1560 @classmethod 

1561 def _init_statement( 

1562 cls, 

1563 dialect: Dialect, 

1564 connection: Connection, 

1565 dbapi_connection: PoolProxiedConnection, 

1566 execution_options: _ExecuteOptions, 

1567 statement: str, 

1568 parameters: _DBAPIMultiExecuteParams, 

1569 ) -> ExecutionContext: 

1570 """Initialize execution context for a string SQL statement.""" 

1571 

1572 self = cls.__new__(cls) 

1573 self.root_connection = connection 

1574 self._dbapi_connection = dbapi_connection 

1575 self.dialect = connection.dialect 

1576 self.is_text = True 

1577 

1578 self.execution_options = execution_options 

1579 

1580 if not parameters: 

1581 if self.dialect.positional: 

1582 self.parameters = [dialect.execute_sequence_format()] 

1583 else: 

1584 self.parameters = [self._empty_dict_params] 

1585 elif isinstance(parameters[0], dialect.execute_sequence_format): 

1586 self.parameters = parameters 

1587 elif isinstance(parameters[0], dict): 

1588 self.parameters = parameters 

1589 else: 

1590 self.parameters = [ 

1591 dialect.execute_sequence_format(p) for p in parameters 

1592 ] 

1593 

1594 if len(parameters) > 1: 

1595 self.execute_style = ExecuteStyle.EXECUTEMANY 

1596 

1597 self.statement = self.unicode_statement = statement 

1598 

1599 self.cursor = self.create_cursor() 

1600 return self 

1601 

1602 @classmethod 

1603 def _init_default( 

1604 cls, 

1605 dialect: Dialect, 

1606 connection: Connection, 

1607 dbapi_connection: PoolProxiedConnection, 

1608 execution_options: _ExecuteOptions, 

1609 ) -> ExecutionContext: 

1610 """Initialize execution context for a ColumnDefault construct.""" 

1611 

1612 self = cls.__new__(cls) 

1613 self.root_connection = connection 

1614 self._dbapi_connection = dbapi_connection 

1615 self.dialect = connection.dialect 

1616 

1617 self.execution_options = execution_options 

1618 

1619 self.cursor = self.create_cursor() 

1620 return self 

1621 

1622 def _get_cache_stats(self) -> str: 

1623 if self.compiled is None: 

1624 return "raw sql" 

1625 

1626 now = perf_counter() 

1627 

1628 ch = self.cache_hit 

1629 

1630 gen_time = self.compiled._gen_time 

1631 assert gen_time is not None 

1632 

1633 if ch is NO_CACHE_KEY: 

1634 return "no key %.5fs" % (now - gen_time,) 

1635 elif ch is CACHE_HIT: 

1636 return "cached since %.4gs ago" % (now - gen_time,) 

1637 elif ch is CACHE_MISS: 

1638 return "generated in %.5fs" % (now - gen_time,) 

1639 elif ch is CACHING_DISABLED: 

1640 if "_cache_disable_reason" in self.execution_options: 

1641 return "caching disabled (%s) %.5fs " % ( 

1642 self.execution_options["_cache_disable_reason"], 

1643 now - gen_time, 

1644 ) 

1645 else: 

1646 return "caching disabled %.5fs" % (now - gen_time,) 

1647 elif ch is NO_DIALECT_SUPPORT: 

1648 return "dialect %s+%s does not support caching %.5fs" % ( 

1649 self.dialect.name, 

1650 self.dialect.driver, 

1651 now - gen_time, 

1652 ) 

1653 else: 

1654 return "unknown" 

1655 

1656 @property 

1657 def executemany(self): # type: ignore[override] 

1658 return self.execute_style in ( 

1659 ExecuteStyle.EXECUTEMANY, 

1660 ExecuteStyle.INSERTMANYVALUES, 

1661 ) 

1662 

1663 @util.memoized_property 

1664 def identifier_preparer(self): 

1665 if self.compiled: 

1666 return self.compiled.preparer 

1667 elif "schema_translate_map" in self.execution_options: 

1668 return self.dialect.identifier_preparer._with_schema_translate( 

1669 self.execution_options["schema_translate_map"] 

1670 ) 

1671 else: 

1672 return self.dialect.identifier_preparer 

1673 

1674 @util.memoized_property 

1675 def engine(self): 

1676 return self.root_connection.engine 

1677 

1678 @util.memoized_property 

1679 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1680 if TYPE_CHECKING: 

1681 assert isinstance(self.compiled, SQLCompiler) 

1682 return self.compiled.postfetch 

1683 

1684 @util.memoized_property 

1685 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1686 if TYPE_CHECKING: 

1687 assert isinstance(self.compiled, SQLCompiler) 

1688 if self.isinsert: 

1689 return self.compiled.insert_prefetch 

1690 elif self.isupdate: 

1691 return self.compiled.update_prefetch 

1692 else: 

1693 return () 

1694 

1695 @util.memoized_property 

1696 def no_parameters(self): 

1697 return self.execution_options.get("no_parameters", False) 

1698 

1699 def _execute_scalar( 

1700 self, 

1701 stmt: str, 

1702 type_: Optional[TypeEngine[Any]], 

1703 parameters: Optional[_DBAPISingleExecuteParams] = None, 

1704 ) -> Any: 

1705 """Execute a string statement on the current cursor, returning a 

1706 scalar result. 

1707 

1708 Used to fire off sequences, default phrases, and "select lastrowid" 

1709 types of statements individually or in the context of a parent INSERT 

1710 or UPDATE statement. 

1711 

1712 """ 

1713 

1714 conn = self.root_connection 

1715 

1716 if "schema_translate_map" in self.execution_options: 

1717 schema_translate_map = self.execution_options.get( 

1718 "schema_translate_map", {} 

1719 ) 

1720 

1721 rst = self.identifier_preparer._render_schema_translates 

1722 stmt = rst(stmt, schema_translate_map) 

1723 

1724 if not parameters: 

1725 if self.dialect.positional: 

1726 parameters = self.dialect.execute_sequence_format() 

1727 else: 

1728 parameters = {} 

1729 

1730 conn._cursor_execute(self.cursor, stmt, parameters, context=self) 

1731 row = self.cursor.fetchone() 

1732 if row is not None: 

1733 r = row[0] 

1734 else: 

1735 r = None 

1736 if type_ is not None: 

1737 # apply type post processors to the result 

1738 proc = type_._cached_result_processor( 

1739 self.dialect, self.cursor.description[0][1] 

1740 ) 

1741 if proc: 

1742 return proc(r) 

1743 return r 

1744 

1745 @util.memoized_property 

1746 def connection(self): 

1747 return self.root_connection 

1748 

1749 def _use_server_side_cursor(self): 

1750 if not self.dialect.supports_server_side_cursors: 

1751 return False 

1752 

1753 if self.dialect.server_side_cursors: 

1754 # this is deprecated 

1755 use_server_side = self.execution_options.get( 

1756 "stream_results", True 

1757 ) and ( 

1758 self.compiled 

1759 and isinstance(self.compiled.statement, expression.Selectable) 

1760 or ( 

1761 ( 

1762 not self.compiled 

1763 or isinstance( 

1764 self.compiled.statement, expression.TextClause 

1765 ) 

1766 ) 

1767 and self.unicode_statement 

1768 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement) 

1769 ) 

1770 ) 

1771 else: 

1772 use_server_side = self.execution_options.get( 

1773 "stream_results", False 

1774 ) 

1775 

1776 return use_server_side 

1777 

1778 def create_cursor(self) -> DBAPICursor: 

1779 if ( 

1780 # inlining initial preference checks for SS cursors 

1781 self.dialect.supports_server_side_cursors 

1782 and ( 

1783 self.execution_options.get("stream_results", False) 

1784 or ( 

1785 self.dialect.server_side_cursors 

1786 and self._use_server_side_cursor() 

1787 ) 

1788 ) 

1789 ): 

1790 self._is_server_side = True 

1791 return self.create_server_side_cursor() 

1792 else: 

1793 self._is_server_side = False 

1794 return self.create_default_cursor() 

1795 

1796 def fetchall_for_returning(self, cursor): 

1797 return cursor.fetchall() 

1798 

1799 def create_default_cursor(self) -> DBAPICursor: 

1800 return self._dbapi_connection.cursor() 

1801 

1802 def create_server_side_cursor(self) -> DBAPICursor: 

1803 raise NotImplementedError() 

1804 

1805 def pre_exec(self): 

1806 pass 

1807 

1808 def get_out_parameter_values(self, names): 

1809 raise NotImplementedError( 

1810 "This dialect does not support OUT parameters" 

1811 ) 

1812 

1813 def post_exec(self): 

1814 pass 

1815 

1816 def get_result_processor( 

1817 self, type_: TypeEngine[Any], colname: str, coltype: DBAPIType 

1818 ) -> Optional[_ResultProcessorType[Any]]: 

1819 """Return a 'result processor' for a given type as present in 

1820 cursor.description. 

1821 

1822 This has a default implementation that dialects can override 

1823 for context-sensitive result type handling. 

1824 

1825 """ 

1826 return type_._cached_result_processor(self.dialect, coltype) 

1827 

1828 def get_lastrowid(self) -> int: 

1829 """return self.cursor.lastrowid, or equivalent, after an INSERT. 

1830 

1831 This may involve calling special cursor functions, issuing a new SELECT 

1832 on the cursor (or a new one), or returning a stored value that was 

1833 calculated within post_exec(). 

1834 

1835 This function will only be called for dialects which support "implicit" 

1836 primary key generation, keep preexecute_autoincrement_sequences set to 

1837 False, and when no explicit id value was bound to the statement. 

1838 

1839 The function is called once for an INSERT statement that would need to 

1840 return the last inserted primary key for those dialects that make use 

1841 of the lastrowid concept. In these cases, it is called directly after 

1842 :meth:`.ExecutionContext.post_exec`. 

1843 

1844 """ 

1845 return self.cursor.lastrowid 

1846 

1847 def handle_dbapi_exception(self, e): 

1848 pass 

1849 

1850 @util.non_memoized_property 

1851 def rowcount(self) -> int: 

1852 if self._rowcount is not None: 

1853 return self._rowcount 

1854 else: 

1855 return self.cursor.rowcount 

1856 

1857 @property 

1858 def _has_rowcount(self): 

1859 return self._rowcount is not None 

1860 

1861 def supports_sane_rowcount(self): 

1862 return self.dialect.supports_sane_rowcount 

1863 

1864 def supports_sane_multi_rowcount(self): 

1865 return self.dialect.supports_sane_multi_rowcount 

1866 

1867 def _setup_result_proxy(self): 

1868 exec_opt = self.execution_options 

1869 

1870 if self._rowcount is None and exec_opt.get("preserve_rowcount", False): 

1871 self._rowcount = self.cursor.rowcount 

1872 

1873 yp: Optional[Union[int, bool]] 

1874 if self.is_crud or self.is_text: 

1875 result = self._setup_dml_or_text_result() 

1876 yp = False 

1877 else: 

1878 yp = exec_opt.get("yield_per", None) 

1879 sr = self._is_server_side or exec_opt.get("stream_results", False) 

1880 strategy = self.cursor_fetch_strategy 

1881 if sr and strategy is _cursor._DEFAULT_FETCH: 

1882 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1883 self.cursor, self.execution_options 

1884 ) 

1885 cursor_description: _DBAPICursorDescription = ( 

1886 strategy.alternate_cursor_description 

1887 or self.cursor.description 

1888 ) 

1889 if cursor_description is None: 

1890 strategy = _cursor._NO_CURSOR_DQL 

1891 

1892 result = _cursor.CursorResult(self, strategy, cursor_description) 

1893 

1894 compiled = self.compiled 

1895 

1896 if ( 

1897 compiled 

1898 and not self.isddl 

1899 and cast(SQLCompiler, compiled).has_out_parameters 

1900 ): 

1901 self._setup_out_parameters(result) 

1902 

1903 self._soft_closed = result._soft_closed 

1904 

1905 if yp: 

1906 result = result.yield_per(yp) 

1907 

1908 return result 

1909 

1910 def _setup_out_parameters(self, result): 

1911 compiled = cast(SQLCompiler, self.compiled) 

1912 

1913 out_bindparams = [ 

1914 (param, name) 

1915 for param, name in compiled.bind_names.items() 

1916 if param.isoutparam 

1917 ] 

1918 out_parameters = {} 

1919 

1920 for bindparam, raw_value in zip( 

1921 [param for param, name in out_bindparams], 

1922 self.get_out_parameter_values( 

1923 [name for param, name in out_bindparams] 

1924 ), 

1925 ): 

1926 type_ = bindparam.type 

1927 impl_type = type_.dialect_impl(self.dialect) 

1928 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi) 

1929 result_processor = impl_type.result_processor( 

1930 self.dialect, dbapi_type 

1931 ) 

1932 if result_processor is not None: 

1933 raw_value = result_processor(raw_value) 

1934 out_parameters[bindparam.key] = raw_value 

1935 

1936 result.out_parameters = out_parameters 

1937 

1938 def _setup_dml_or_text_result(self): 

1939 compiled = cast(SQLCompiler, self.compiled) 

1940 

1941 strategy: ResultFetchStrategy = self.cursor_fetch_strategy 

1942 

1943 if self.isinsert: 

1944 if ( 

1945 self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1946 and compiled.effective_returning 

1947 ): 

1948 strategy = _cursor.FullyBufferedCursorFetchStrategy( 

1949 self.cursor, 

1950 initial_buffer=self._insertmanyvalues_rows, 

1951 # maintain alt cursor description if set by the 

1952 # dialect, e.g. mssql preserves it 

1953 alternate_description=( 

1954 strategy.alternate_cursor_description 

1955 ), 

1956 ) 

1957 

1958 if compiled.postfetch_lastrowid: 

1959 self.inserted_primary_key_rows = ( 

1960 self._setup_ins_pk_from_lastrowid() 

1961 ) 

1962 # else if not self._is_implicit_returning, 

1963 # the default inserted_primary_key_rows accessor will 

1964 # return an "empty" primary key collection when accessed. 

1965 

1966 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: 

1967 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1968 self.cursor, self.execution_options 

1969 ) 

1970 

1971 if strategy is _cursor._NO_CURSOR_DML: 

1972 cursor_description = None 

1973 else: 

1974 cursor_description = ( 

1975 strategy.alternate_cursor_description 

1976 or self.cursor.description 

1977 ) 

1978 

1979 if cursor_description is None: 

1980 strategy = _cursor._NO_CURSOR_DML 

1981 elif self._num_sentinel_cols: 

1982 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1983 # the sentinel columns are handled in CursorResult._init_metadata 

1984 # using essentially _reduce 

1985 

1986 result: _cursor.CursorResult[Any] = _cursor.CursorResult( 

1987 self, strategy, cursor_description 

1988 ) 

1989 

1990 if self.isinsert: 

1991 if self._is_implicit_returning: 

1992 rows = result.all() 

1993 

1994 self.returned_default_rows = rows 

1995 

1996 self.inserted_primary_key_rows = ( 

1997 self._setup_ins_pk_from_implicit_returning(result, rows) 

1998 ) 

1999 

2000 # test that it has a cursor metadata that is accurate. the 

2001 # first row will have been fetched and current assumptions 

2002 # are that the result has only one row, until executemany() 

2003 # support is added here. 

2004 assert result._metadata.returns_rows 

2005 

2006 # Insert statement has both return_defaults() and 

2007 # returning(). rewind the result on the list of rows 

2008 # we just used. 

2009 if self._is_supplemental_returning: 

2010 result._rewind(rows) 

2011 else: 

2012 result._soft_close() 

2013 elif not self._is_explicit_returning: 

2014 result._soft_close() 

2015 

2016 # we assume here the result does not return any rows. 

2017 # *usually*, this will be true. However, some dialects 

2018 # such as that of MSSQL/pyodbc need to SELECT a post fetch 

2019 # function so this is not necessarily true. 

2020 # assert not result.returns_rows 

2021 

2022 elif self._is_implicit_returning: 

2023 rows = result.all() 

2024 

2025 if rows: 

2026 self.returned_default_rows = rows 

2027 self._rowcount = len(rows) 

2028 

2029 if self._is_supplemental_returning: 

2030 result._rewind(rows) 

2031 else: 

2032 result._soft_close() 

2033 

2034 # test that it has a cursor metadata that is accurate. 

2035 # the rows have all been fetched however. 

2036 assert result._metadata.returns_rows 

2037 

2038 elif not result._metadata.returns_rows: 

2039 # no results, get rowcount 

2040 # (which requires open cursor on some drivers) 

2041 if self._rowcount is None: 

2042 self._rowcount = self.cursor.rowcount 

2043 result._soft_close() 

2044 elif self.isupdate or self.isdelete: 

2045 if self._rowcount is None: 

2046 self._rowcount = self.cursor.rowcount 

2047 return result 

2048 

2049 @util.memoized_property 

2050 def inserted_primary_key_rows(self): 

2051 # if no specific "get primary key" strategy was set up 

2052 # during execution, return a "default" primary key based 

2053 # on what's in the compiled_parameters and nothing else. 

2054 return self._setup_ins_pk_from_empty() 

2055 

2056 def _setup_ins_pk_from_lastrowid(self): 

2057 getter = cast( 

2058 SQLCompiler, self.compiled 

2059 )._inserted_primary_key_from_lastrowid_getter 

2060 lastrowid = self.get_lastrowid() 

2061 return [getter(lastrowid, self.compiled_parameters[0])] 

2062 

2063 def _setup_ins_pk_from_empty(self): 

2064 getter = cast( 

2065 SQLCompiler, self.compiled 

2066 )._inserted_primary_key_from_lastrowid_getter 

2067 return [getter(None, param) for param in self.compiled_parameters] 

2068 

2069 def _setup_ins_pk_from_implicit_returning(self, result, rows): 

2070 if not rows: 

2071 return [] 

2072 

2073 getter = cast( 

2074 SQLCompiler, self.compiled 

2075 )._inserted_primary_key_from_returning_getter 

2076 compiled_params = self.compiled_parameters 

2077 

2078 return [ 

2079 getter(row, param) for row, param in zip(rows, compiled_params) 

2080 ] 

2081 

2082 def lastrow_has_defaults(self) -> bool: 

2083 return (self.isinsert or self.isupdate) and bool( 

2084 cast(SQLCompiler, self.compiled).postfetch 

2085 ) 

2086 

2087 def _prepare_set_input_sizes( 

2088 self, 

2089 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]: 

2090 """Given a cursor and ClauseParameters, prepare arguments 

2091 in order to call the appropriate 

2092 style of ``setinputsizes()`` on the cursor, using DB-API types 

2093 from the bind parameter's ``TypeEngine`` objects. 

2094 

2095 This method only called by those dialects which set the 

2096 :attr:`.Dialect.bind_typing` attribute to 

2097 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are 

2098 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an 

2099 option. 

2100 

2101 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used 

2102 for pg8000 and asyncpg, which has been changed to inline rendering 

2103 of casts. 

2104 

2105 """ 

2106 if self.isddl or self.is_text: 

2107 return None 

2108 

2109 compiled = cast(SQLCompiler, self.compiled) 

2110 

2111 inputsizes = compiled._get_set_input_sizes_lookup() 

2112 

2113 if inputsizes is None: 

2114 return None 

2115 

2116 dialect = self.dialect 

2117 

2118 # all of the rest of this... cython? 

2119 

2120 if dialect._has_events: 

2121 inputsizes = dict(inputsizes) 

2122 dialect.dispatch.do_setinputsizes( 

2123 inputsizes, self.cursor, self.statement, self.parameters, self 

2124 ) 

2125 

2126 if compiled.escaped_bind_names: 

2127 escaped_bind_names = compiled.escaped_bind_names 

2128 else: 

2129 escaped_bind_names = None 

2130 

2131 if dialect.positional: 

2132 items = [ 

2133 (key, compiled.binds[key]) 

2134 for key in compiled.positiontup or () 

2135 ] 

2136 else: 

2137 items = [ 

2138 (key, bindparam) 

2139 for bindparam, key in compiled.bind_names.items() 

2140 ] 

2141 

2142 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = [] 

2143 for key, bindparam in items: 

2144 if bindparam in compiled.literal_execute_params: 

2145 continue 

2146 

2147 if key in self._expanded_parameters: 

2148 if is_tuple_type(bindparam.type): 

2149 num = len(bindparam.type.types) 

2150 dbtypes = inputsizes[bindparam] 

2151 generic_inputsizes.extend( 

2152 ( 

2153 ( 

2154 escaped_bind_names.get(paramname, paramname) 

2155 if escaped_bind_names is not None 

2156 else paramname 

2157 ), 

2158 dbtypes[idx % num], 

2159 bindparam.type.types[idx % num], 

2160 ) 

2161 for idx, paramname in enumerate( 

2162 self._expanded_parameters[key] 

2163 ) 

2164 ) 

2165 else: 

2166 dbtype = inputsizes.get(bindparam, None) 

2167 generic_inputsizes.extend( 

2168 ( 

2169 ( 

2170 escaped_bind_names.get(paramname, paramname) 

2171 if escaped_bind_names is not None 

2172 else paramname 

2173 ), 

2174 dbtype, 

2175 bindparam.type, 

2176 ) 

2177 for paramname in self._expanded_parameters[key] 

2178 ) 

2179 else: 

2180 dbtype = inputsizes.get(bindparam, None) 

2181 

2182 escaped_name = ( 

2183 escaped_bind_names.get(key, key) 

2184 if escaped_bind_names is not None 

2185 else key 

2186 ) 

2187 

2188 generic_inputsizes.append( 

2189 (escaped_name, dbtype, bindparam.type) 

2190 ) 

2191 

2192 return generic_inputsizes 

2193 

2194 def _exec_default(self, column, default, type_): 

2195 if default.is_sequence: 

2196 return self.fire_sequence(default, type_) 

2197 elif default.is_callable: 

2198 # this codepath is not normally used as it's inlined 

2199 # into _process_execute_defaults 

2200 self.current_column = column 

2201 return default.arg(self) 

2202 elif default.is_clause_element: 

2203 return self._exec_default_clause_element(column, default, type_) 

2204 else: 

2205 # this codepath is not normally used as it's inlined 

2206 # into _process_execute_defaults 

2207 return default.arg 

2208 

2209 def _exec_default_clause_element(self, column, default, type_): 

2210 # execute a default that's a complete clause element. Here, we have 

2211 # to re-implement a miniature version of the compile->parameters-> 

2212 # cursor.execute() sequence, since we don't want to modify the state 

2213 # of the connection / result in progress or create new connection/ 

2214 # result objects etc. 

2215 # .. versionchanged:: 1.4 

2216 

2217 if not default._arg_is_typed: 

2218 default_arg = expression.type_coerce(default.arg, type_) 

2219 else: 

2220 default_arg = default.arg 

2221 compiled = expression.select(default_arg).compile(dialect=self.dialect) 

2222 compiled_params = compiled.construct_params() 

2223 processors = compiled._bind_processors 

2224 if compiled.positional: 

2225 parameters = self.dialect.execute_sequence_format( 

2226 [ 

2227 ( 

2228 processors[key](compiled_params[key]) # type: ignore 

2229 if key in processors 

2230 else compiled_params[key] 

2231 ) 

2232 for key in compiled.positiontup or () 

2233 ] 

2234 ) 

2235 else: 

2236 parameters = { 

2237 key: ( 

2238 processors[key](compiled_params[key]) # type: ignore 

2239 if key in processors 

2240 else compiled_params[key] 

2241 ) 

2242 for key in compiled_params 

2243 } 

2244 return self._execute_scalar( 

2245 str(compiled), type_, parameters=parameters 

2246 ) 

2247 

2248 current_parameters: Optional[_CoreSingleExecuteParams] = None 

2249 """A dictionary of parameters applied to the current row. 

2250 

2251 This attribute is only available in the context of a user-defined default 

2252 generation function, e.g. as described at :ref:`context_default_functions`. 

2253 It consists of a dictionary which includes entries for each column/value 

2254 pair that is to be part of the INSERT or UPDATE statement. The keys of the 

2255 dictionary will be the key value of each :class:`_schema.Column`, 

2256 which is usually 

2257 synonymous with the name. 

2258 

2259 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute 

2260 does not accommodate for the "multi-values" feature of the 

2261 :meth:`_expression.Insert.values` method. The 

2262 :meth:`.DefaultExecutionContext.get_current_parameters` method should be 

2263 preferred. 

2264 

2265 .. seealso:: 

2266 

2267 :meth:`.DefaultExecutionContext.get_current_parameters` 

2268 

2269 :ref:`context_default_functions` 

2270 

2271 """ 

2272 

2273 def get_current_parameters(self, isolate_multiinsert_groups=True): 

2274 """Return a dictionary of parameters applied to the current row. 

2275 

2276 This method can only be used in the context of a user-defined default 

2277 generation function, e.g. as described at 

2278 :ref:`context_default_functions`. When invoked, a dictionary is 

2279 returned which includes entries for each column/value pair that is part 

2280 of the INSERT or UPDATE statement. The keys of the dictionary will be 

2281 the key value of each :class:`_schema.Column`, 

2282 which is usually synonymous 

2283 with the name. 

2284 

2285 :param isolate_multiinsert_groups=True: indicates that multi-valued 

2286 INSERT constructs created using :meth:`_expression.Insert.values` 

2287 should be 

2288 handled by returning only the subset of parameters that are local 

2289 to the current column default invocation. When ``False``, the 

2290 raw parameters of the statement are returned including the 

2291 naming convention used in the case of multi-valued INSERT. 

2292 

2293 .. seealso:: 

2294 

2295 :attr:`.DefaultExecutionContext.current_parameters` 

2296 

2297 :ref:`context_default_functions` 

2298 

2299 """ 

2300 try: 

2301 parameters = self.current_parameters 

2302 column = self.current_column 

2303 except AttributeError: 

2304 raise exc.InvalidRequestError( 

2305 "get_current_parameters() can only be invoked in the " 

2306 "context of a Python side column default function" 

2307 ) 

2308 else: 

2309 assert column is not None 

2310 assert parameters is not None 

2311 compile_state = cast( 

2312 "DMLState", cast(SQLCompiler, self.compiled).compile_state 

2313 ) 

2314 assert compile_state is not None 

2315 if ( 

2316 isolate_multiinsert_groups 

2317 and dml.isinsert(compile_state) 

2318 and compile_state._has_multi_parameters 

2319 ): 

2320 if column._is_multiparam_column: 

2321 index = column.index + 1 

2322 d = {column.original.key: parameters[column.key]} 

2323 else: 

2324 d = {column.key: parameters[column.key]} 

2325 index = 0 

2326 assert compile_state._dict_parameters is not None 

2327 keys = compile_state._dict_parameters.keys() 

2328 d.update( 

2329 (key, parameters["%s_m%d" % (key, index)]) for key in keys 

2330 ) 

2331 return d 

2332 else: 

2333 return parameters 

2334 

2335 def get_insert_default(self, column): 

2336 if column.default is None: 

2337 return None 

2338 else: 

2339 return self._exec_default(column, column.default, column.type) 

2340 

2341 def get_update_default(self, column): 

2342 if column.onupdate is None: 

2343 return None 

2344 else: 

2345 return self._exec_default(column, column.onupdate, column.type) 

2346 

2347 def _process_execute_defaults(self): 

2348 compiled = cast(SQLCompiler, self.compiled) 

2349 

2350 key_getter = compiled._within_exec_param_key_getter 

2351 

2352 sentinel_counter = 0 

2353 

2354 if compiled.insert_prefetch: 

2355 prefetch_recs = [ 

2356 ( 

2357 c, 

2358 key_getter(c), 

2359 c._default_description_tuple, 

2360 self.get_insert_default, 

2361 ) 

2362 for c in compiled.insert_prefetch 

2363 ] 

2364 elif compiled.update_prefetch: 

2365 prefetch_recs = [ 

2366 ( 

2367 c, 

2368 key_getter(c), 

2369 c._onupdate_description_tuple, 

2370 self.get_update_default, 

2371 ) 

2372 for c in compiled.update_prefetch 

2373 ] 

2374 else: 

2375 prefetch_recs = [] 

2376 

2377 for param in self.compiled_parameters: 

2378 self.current_parameters = param 

2379 

2380 for ( 

2381 c, 

2382 param_key, 

2383 (arg, is_scalar, is_callable, is_sentinel), 

2384 fallback, 

2385 ) in prefetch_recs: 

2386 if is_sentinel: 

2387 param[param_key] = sentinel_counter 

2388 sentinel_counter += 1 

2389 elif is_scalar: 

2390 param[param_key] = arg 

2391 elif is_callable: 

2392 self.current_column = c 

2393 param[param_key] = arg(self) 

2394 else: 

2395 val = fallback(c) 

2396 if val is not None: 

2397 param[param_key] = val 

2398 

2399 del self.current_parameters 

2400 

2401 

2402DefaultDialect.execution_ctx_cls = DefaultExecutionContext