Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1054 statements  

1# engine/default.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Default implementations of per-dialect sqlalchemy.engine classes. 

10 

11These are semi-private implementation classes which are only of importance 

12to database dialect authors; dialects will usually use the classes here 

13as the base class for their own corresponding classes. 

14 

15""" 

16 

17from __future__ import annotations 

18 

19import functools 

20import operator 

21import random 

22import re 

23from time import perf_counter 

24import typing 

25from typing import Any 

26from typing import Callable 

27from typing import cast 

28from typing import Dict 

29from typing import Final 

30from typing import List 

31from typing import Literal 

32from typing import Mapping 

33from typing import MutableMapping 

34from typing import MutableSequence 

35from typing import Optional 

36from typing import Sequence 

37from typing import Set 

38from typing import Tuple 

39from typing import Type 

40from typing import TYPE_CHECKING 

41from typing import Union 

42import weakref 

43 

44from . import characteristics 

45from . import cursor as _cursor 

46from . import interfaces 

47from .base import Connection 

48from .interfaces import CacheStats 

49from .interfaces import DBAPICursor 

50from .interfaces import Dialect 

51from .interfaces import ExecuteStyle 

52from .interfaces import ExecutionContext 

53from .reflection import ObjectKind 

54from .reflection import ObjectScope 

55from .. import event 

56from .. import exc 

57from .. import pool 

58from .. import util 

59from ..sql import compiler 

60from ..sql import dml 

61from ..sql import expression 

62from ..sql import type_api 

63from ..sql import util as sql_util 

64from ..sql._typing import is_tuple_type 

65from ..sql.base import _NoArg 

66from ..sql.compiler import AggregateOrderByStyle 

67from ..sql.compiler import DDLCompiler 

68from ..sql.compiler import InsertmanyvaluesSentinelOpts 

69from ..sql.compiler import SQLCompiler 

70from ..sql.elements import quoted_name 

71from ..util.typing import TupleAny 

72from ..util.typing import Unpack 

73 

74if typing.TYPE_CHECKING: 

75 from types import ModuleType 

76 

77 from .base import Engine 

78 from .cursor import ResultFetchStrategy 

79 from .interfaces import _CoreMultiExecuteParams 

80 from .interfaces import _CoreSingleExecuteParams 

81 from .interfaces import _DBAPICursorDescription 

82 from .interfaces import _DBAPIMultiExecuteParams 

83 from .interfaces import _DBAPISingleExecuteParams 

84 from .interfaces import _ExecuteOptions 

85 from .interfaces import _MutableCoreSingleExecuteParams 

86 from .interfaces import _ParamStyle 

87 from .interfaces import ConnectArgsType 

88 from .interfaces import DBAPIConnection 

89 from .interfaces import DBAPIModule 

90 from .interfaces import DBAPIType 

91 from .interfaces import IsolationLevel 

92 from .row import Row 

93 from .url import URL 

94 from ..event import _ListenerFnType 

95 from ..pool import Pool 

96 from ..pool import PoolProxiedConnection 

97 from ..sql import Executable 

98 from ..sql.compiler import Compiled 

99 from ..sql.compiler import Linting 

100 from ..sql.compiler import ResultColumnsEntry 

101 from ..sql.dml import DMLState 

102 from ..sql.dml import UpdateBase 

103 from ..sql.elements import BindParameter 

104 from ..sql.schema import Column 

105 from ..sql.type_api import _BindProcessorType 

106 from ..sql.type_api import _ResultProcessorType 

107 from ..sql.type_api import TypeEngine 

108 

109 

110# When we're handed literal SQL, ensure it's a SELECT query 

111SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE) 

112 

113 

114( 

115 CACHE_HIT, 

116 CACHE_MISS, 

117 CACHING_DISABLED, 

118 NO_CACHE_KEY, 

119 NO_DIALECT_SUPPORT, 

120) = list(CacheStats) 

121 

122 

123class DefaultDialect(Dialect): 

124 """Default implementation of Dialect""" 

125 

126 statement_compiler = compiler.SQLCompiler 

127 ddl_compiler = compiler.DDLCompiler 

128 type_compiler_cls = compiler.GenericTypeCompiler 

129 

130 preparer = compiler.IdentifierPreparer 

131 supports_alter = True 

132 supports_comments = False 

133 supports_constraint_comments = False 

134 inline_comments = False 

135 supports_statement_cache = True 

136 

137 div_is_floordiv = True 

138 

139 bind_typing = interfaces.BindTyping.NONE 

140 

141 include_set_input_sizes: Optional[Set[Any]] = None 

142 exclude_set_input_sizes: Optional[Set[Any]] = None 

143 

144 # the first value we'd get for an autoincrement column. 

145 default_sequence_base = 1 

146 

147 # most DBAPIs happy with this for execute(). 

148 # not cx_oracle. 

149 execute_sequence_format = tuple 

150 

151 supports_schemas = True 

152 supports_views = True 

153 supports_sequences = False 

154 sequences_optional = False 

155 preexecute_autoincrement_sequences = False 

156 supports_identity_columns = False 

157 postfetch_lastrowid = True 

158 favor_returning_over_lastrowid = False 

159 insert_null_pk_still_autoincrements = False 

160 update_returning = False 

161 delete_returning = False 

162 update_returning_multifrom = False 

163 delete_returning_multifrom = False 

164 insert_returning = False 

165 

166 aggregate_order_by_style = AggregateOrderByStyle.INLINE 

167 

168 cte_follows_insert = False 

169 

170 supports_native_enum = False 

171 supports_native_boolean = False 

172 supports_native_uuid = False 

173 returns_native_bytes = False 

174 

175 non_native_boolean_check_constraint = True 

176 

177 supports_simple_order_by_label = True 

178 

179 tuple_in_values = False 

180 

181 connection_characteristics = util.immutabledict( 

182 { 

183 "isolation_level": characteristics.IsolationLevelCharacteristic(), 

184 "logging_token": characteristics.LoggingTokenCharacteristic(), 

185 } 

186 ) 

187 

188 engine_config_types: Mapping[str, Any] = util.immutabledict( 

189 { 

190 "pool_timeout": util.asint, 

191 "echo": util.bool_or_str("debug"), 

192 "echo_pool": util.bool_or_str("debug"), 

193 "pool_recycle": util.asint, 

194 "pool_size": util.asint, 

195 "max_overflow": util.asint, 

196 "future": util.asbool, 

197 } 

198 ) 

199 

200 # if the NUMERIC type 

201 # returns decimal.Decimal. 

202 # *not* the FLOAT type however. 

203 supports_native_decimal = False 

204 

205 name = "default" 

206 

207 # length at which to truncate 

208 # any identifier. 

209 max_identifier_length = 9999 

210 _user_defined_max_identifier_length: Optional[int] = None 

211 

212 isolation_level: Optional[str] = None 

213 

214 # sub-categories of max_identifier_length. 

215 # currently these accommodate for MySQL which allows alias names 

216 # of 255 but DDL names only of 64. 

217 max_index_name_length: Optional[int] = None 

218 max_constraint_name_length: Optional[int] = None 

219 

220 supports_sane_rowcount = True 

221 supports_sane_multi_rowcount = True 

222 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {} 

223 default_paramstyle = "named" 

224 

225 supports_default_values = False 

226 """dialect supports INSERT... DEFAULT VALUES syntax""" 

227 

228 supports_default_metavalue = False 

229 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

230 

231 default_metavalue_token = "DEFAULT" 

232 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

233 parenthesis.""" 

234 

235 # not sure if this is a real thing but the compiler will deliver it 

236 # if this is the only flag enabled. 

237 supports_empty_insert = True 

238 """dialect supports INSERT () VALUES ()""" 

239 

240 supports_multivalues_insert = False 

241 

242 use_insertmanyvalues: bool = False 

243 

244 use_insertmanyvalues_wo_returning: bool = False 

245 

246 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = ( 

247 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED 

248 ) 

249 

250 insertmanyvalues_page_size: int = 1000 

251 insertmanyvalues_max_parameters = 32700 

252 

253 supports_is_distinct_from = True 

254 

255 supports_server_side_cursors = False 

256 

257 server_side_cursors = False 

258 

259 # extra record-level locking features (#4860) 

260 supports_for_update_of = False 

261 

262 server_version_info = None 

263 

264 default_schema_name: Optional[str] = None 

265 

266 # indicates symbol names are 

267 # UPPERCASED if they are case insensitive 

268 # within the database. 

269 # if this is True, the methods normalize_name() 

270 # and denormalize_name() must be provided. 

271 requires_name_normalize = False 

272 

273 is_async = False 

274 

275 has_terminate = False 

276 

277 # TODO: this is not to be part of 2.0. implement rudimentary binary 

278 # literals for SQLite, PostgreSQL, MySQL only within 

279 # _Binary.literal_processor 

280 _legacy_binary_type_literal_encoding = "utf-8" 

281 

282 @util.deprecated_params( 

283 empty_in_strategy=( 

284 "1.4", 

285 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is " 

286 "deprecated, and no longer has any effect. All IN expressions " 

287 "are now rendered using " 

288 'the "expanding parameter" strategy which renders a set of bound' 

289 'expressions, or an "empty set" SELECT, at statement execution' 

290 "time.", 

291 ), 

292 server_side_cursors=( 

293 "1.4", 

294 "The :paramref:`_sa.create_engine.server_side_cursors` parameter " 

295 "is deprecated and will be removed in a future release. Please " 

296 "use the " 

297 ":paramref:`_engine.Connection.execution_options.stream_results` " 

298 "parameter.", 

299 ), 

300 ) 

301 def __init__( 

302 self, 

303 paramstyle: Optional[_ParamStyle] = None, 

304 isolation_level: Optional[IsolationLevel] = None, 

305 dbapi: Optional[ModuleType] = None, 

306 implicit_returning: Literal[True] = True, 

307 supports_native_boolean: Optional[bool] = None, 

308 max_identifier_length: Optional[int] = None, 

309 label_length: Optional[int] = None, 

310 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG, 

311 use_insertmanyvalues: Optional[bool] = None, 

312 # util.deprecated_params decorator cannot render the 

313 # Linting.NO_LINTING constant 

314 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore 

315 server_side_cursors: bool = False, 

316 skip_autocommit_rollback: bool = False, 

317 **kwargs: Any, 

318 ): 

319 if server_side_cursors: 

320 if not self.supports_server_side_cursors: 

321 raise exc.ArgumentError( 

322 "Dialect %s does not support server side cursors" % self 

323 ) 

324 else: 

325 self.server_side_cursors = True 

326 

327 if getattr(self, "use_setinputsizes", False): 

328 util.warn_deprecated( 

329 "The dialect-level use_setinputsizes attribute is " 

330 "deprecated. Please use " 

331 "bind_typing = BindTyping.SETINPUTSIZES", 

332 "2.0", 

333 ) 

334 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES 

335 

336 self.positional = False 

337 self._ischema = None 

338 

339 self.dbapi = dbapi 

340 

341 self.skip_autocommit_rollback = skip_autocommit_rollback 

342 

343 if paramstyle is not None: 

344 self.paramstyle = paramstyle 

345 elif self.dbapi is not None: 

346 self.paramstyle = self.dbapi.paramstyle 

347 else: 

348 self.paramstyle = self.default_paramstyle 

349 self.positional = self.paramstyle in ( 

350 "qmark", 

351 "format", 

352 "numeric", 

353 "numeric_dollar", 

354 ) 

355 self.identifier_preparer = self.preparer(self) 

356 self._on_connect_isolation_level = isolation_level 

357 

358 legacy_tt_callable = getattr(self, "type_compiler", None) 

359 if legacy_tt_callable is not None: 

360 tt_callable = cast( 

361 Type[compiler.GenericTypeCompiler], 

362 self.type_compiler, 

363 ) 

364 else: 

365 tt_callable = self.type_compiler_cls 

366 

367 self.type_compiler_instance = self.type_compiler = tt_callable(self) 

368 

369 if supports_native_boolean is not None: 

370 self.supports_native_boolean = supports_native_boolean 

371 

372 self._user_defined_max_identifier_length = max_identifier_length 

373 if self._user_defined_max_identifier_length: 

374 self.max_identifier_length = ( 

375 self._user_defined_max_identifier_length 

376 ) 

377 self.label_length = label_length 

378 self.compiler_linting = compiler_linting 

379 

380 if use_insertmanyvalues is not None: 

381 self.use_insertmanyvalues = use_insertmanyvalues 

382 

383 if insertmanyvalues_page_size is not _NoArg.NO_ARG: 

384 self.insertmanyvalues_page_size = insertmanyvalues_page_size 

385 

386 @property 

387 @util.deprecated( 

388 "2.0", 

389 "full_returning is deprecated, please use insert_returning, " 

390 "update_returning, delete_returning", 

391 ) 

392 def full_returning(self): 

393 return ( 

394 self.insert_returning 

395 and self.update_returning 

396 and self.delete_returning 

397 ) 

398 

399 @util.memoized_property 

400 def insert_executemany_returning(self): 

401 """Default implementation for insert_executemany_returning, if not 

402 otherwise overridden by the specific dialect. 

403 

404 The default dialect determines "insert_executemany_returning" is 

405 available if the dialect in use has opted into using the 

406 "use_insertmanyvalues" feature. If they haven't opted into that, then 

407 this attribute is False, unless the dialect in question overrides this 

408 and provides some other implementation (such as the Oracle Database 

409 dialects). 

410 

411 """ 

412 return self.insert_returning and self.use_insertmanyvalues 

413 

414 @util.memoized_property 

415 def insert_executemany_returning_sort_by_parameter_order(self): 

416 """Default implementation for 

417 insert_executemany_returning_deterministic_order, if not otherwise 

418 overridden by the specific dialect. 

419 

420 The default dialect determines "insert_executemany_returning" can have 

421 deterministic order only if the dialect in use has opted into using the 

422 "use_insertmanyvalues" feature, which implements deterministic ordering 

423 using client side sentinel columns only by default. The 

424 "insertmanyvalues" feature also features alternate forms that can 

425 use server-generated PK values as "sentinels", but those are only 

426 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel` 

427 bitflag enables those alternate SQL forms, which are disabled 

428 by default. 

429 

430 If the dialect in use hasn't opted into that, then this attribute is 

431 False, unless the dialect in question overrides this and provides some 

432 other implementation (such as the Oracle Database dialects). 

433 

434 """ 

435 return self.insert_returning and self.use_insertmanyvalues 

436 

437 update_executemany_returning = False 

438 delete_executemany_returning = False 

439 

440 @util.memoized_property 

441 def loaded_dbapi(self) -> DBAPIModule: 

442 if self.dbapi is None: 

443 raise exc.InvalidRequestError( 

444 f"Dialect {self} does not have a Python DBAPI established " 

445 "and cannot be used for actual database interaction" 

446 ) 

447 return self.dbapi 

448 

449 @util.memoized_property 

450 def _bind_typing_render_casts(self): 

451 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS 

452 

453 def _ensure_has_table_connection(self, arg: Connection) -> None: 

454 if not isinstance(arg, Connection): 

455 raise exc.ArgumentError( 

456 "The argument passed to Dialect.has_table() should be a " 

457 "%s, got %s. " 

458 "Additionally, the Dialect.has_table() method is for " 

459 "internal dialect " 

460 "use only; please use " 

461 "``inspect(some_engine).has_table(<tablename>>)`` " 

462 "for public API use." % (Connection, type(arg)) 

463 ) 

464 

465 @util.memoized_property 

466 def _supports_statement_cache(self): 

467 ssc = self.__class__.__dict__.get("supports_statement_cache", None) 

468 if ssc is None: 

469 util.warn( 

470 "Dialect %s:%s will not make use of SQL compilation caching " 

471 "as it does not set the 'supports_statement_cache' attribute " 

472 "to ``True``. This can have " 

473 "significant performance implications including some " 

474 "performance degradations in comparison to prior SQLAlchemy " 

475 "versions. Dialect maintainers should seek to set this " 

476 "attribute to True after appropriate development and testing " 

477 "for SQLAlchemy 1.4 caching support. Alternatively, this " 

478 "attribute may be set to False which will disable this " 

479 "warning." % (self.name, self.driver), 

480 code="cprf", 

481 ) 

482 

483 return bool(ssc) 

484 

485 @util.memoized_property 

486 def _type_memos(self): 

487 return weakref.WeakKeyDictionary() 

488 

489 @property 

490 def dialect_description(self): # type: ignore[override] 

491 return self.name + "+" + self.driver 

492 

493 @property 

494 def supports_sane_rowcount_returning(self): 

495 """True if this dialect supports sane rowcount even if RETURNING is 

496 in use. 

497 

498 For dialects that don't support RETURNING, this is synonymous with 

499 ``supports_sane_rowcount``. 

500 

501 """ 

502 return self.supports_sane_rowcount 

503 

504 @classmethod 

505 def get_pool_class(cls, url: URL) -> Type[Pool]: 

506 default: Type[pool.Pool] 

507 if cls.is_async: 

508 default = pool.AsyncAdaptedQueuePool 

509 else: 

510 default = pool.QueuePool 

511 

512 return getattr(cls, "poolclass", default) 

513 

514 def get_dialect_pool_class(self, url: URL) -> Type[Pool]: 

515 return self.get_pool_class(url) 

516 

517 @classmethod 

518 def load_provisioning(cls): 

519 package = ".".join(cls.__module__.split(".")[0:-1]) 

520 try: 

521 __import__(package + ".provision") 

522 except ImportError: 

523 pass 

524 

525 def _builtin_onconnect(self) -> Optional[_ListenerFnType]: 

526 if self._on_connect_isolation_level is not None: 

527 

528 def builtin_connect(dbapi_conn, conn_rec): 

529 self._assert_and_set_isolation_level( 

530 dbapi_conn, self._on_connect_isolation_level 

531 ) 

532 

533 return builtin_connect 

534 else: 

535 return None 

536 

537 def initialize(self, connection: Connection) -> None: 

538 try: 

539 self.server_version_info = self._get_server_version_info( 

540 connection 

541 ) 

542 except NotImplementedError: 

543 self.server_version_info = None 

544 try: 

545 self.default_schema_name = self._get_default_schema_name( 

546 connection 

547 ) 

548 except NotImplementedError: 

549 self.default_schema_name = None 

550 

551 try: 

552 self.default_isolation_level = self.get_default_isolation_level( 

553 connection.connection.dbapi_connection 

554 ) 

555 except NotImplementedError: 

556 self.default_isolation_level = None 

557 

558 if not self._user_defined_max_identifier_length: 

559 max_ident_length = self._check_max_identifier_length(connection) 

560 if max_ident_length: 

561 self.max_identifier_length = max_ident_length 

562 

563 if ( 

564 self.label_length 

565 and self.label_length > self.max_identifier_length 

566 ): 

567 raise exc.ArgumentError( 

568 "Label length of %d is greater than this dialect's" 

569 " maximum identifier length of %d" 

570 % (self.label_length, self.max_identifier_length) 

571 ) 

572 

573 def on_connect(self) -> Optional[Callable[[Any], None]]: 

574 # inherits the docstring from interfaces.Dialect.on_connect 

575 return None 

576 

577 def _check_max_identifier_length(self, connection): 

578 """Perform a connection / server version specific check to determine 

579 the max_identifier_length. 

580 

581 If the dialect's class level max_identifier_length should be used, 

582 can return None. 

583 

584 """ 

585 return None 

586 

587 def get_default_isolation_level(self, dbapi_conn): 

588 """Given a DBAPI connection, return its isolation level, or 

589 a default isolation level if one cannot be retrieved. 

590 

591 May be overridden by subclasses in order to provide a 

592 "fallback" isolation level for databases that cannot reliably 

593 retrieve the actual isolation level. 

594 

595 By default, calls the :meth:`_engine.Interfaces.get_isolation_level` 

596 method, propagating any exceptions raised. 

597 

598 """ 

599 return self.get_isolation_level(dbapi_conn) 

600 

601 def type_descriptor(self, typeobj): 

602 """Provide a database-specific :class:`.TypeEngine` object, given 

603 the generic object which comes from the types module. 

604 

605 This method looks for a dictionary called 

606 ``colspecs`` as a class or instance-level variable, 

607 and passes on to :func:`_types.adapt_type`. 

608 

609 """ 

610 return type_api.adapt_type(typeobj, self.colspecs) 

611 

612 def has_index(self, connection, table_name, index_name, schema=None, **kw): 

613 if not self.has_table(connection, table_name, schema=schema, **kw): 

614 return False 

615 for idx in self.get_indexes( 

616 connection, table_name, schema=schema, **kw 

617 ): 

618 if idx["name"] == index_name: 

619 return True 

620 else: 

621 return False 

622 

623 def has_schema( 

624 self, connection: Connection, schema_name: str, **kw: Any 

625 ) -> bool: 

626 return schema_name in self.get_schema_names(connection, **kw) 

627 

628 def validate_identifier(self, ident: str) -> None: 

629 if len(ident) > self.max_identifier_length: 

630 raise exc.IdentifierError( 

631 "Identifier '%s' exceeds maximum length of %d characters" 

632 % (ident, self.max_identifier_length) 

633 ) 

634 

635 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection: 

636 # inherits the docstring from interfaces.Dialect.connect 

637 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501 

638 

639 def create_connect_args(self, url: URL) -> ConnectArgsType: 

640 # inherits the docstring from interfaces.Dialect.create_connect_args 

641 opts = url.translate_connect_args() 

642 opts.update(url.query) 

643 return ([], opts) 

644 

645 def set_engine_execution_options( 

646 self, engine: Engine, opts: Mapping[str, Any] 

647 ) -> None: 

648 supported_names = set(self.connection_characteristics).intersection( 

649 opts 

650 ) 

651 if supported_names: 

652 characteristics: Mapping[str, Any] = util.immutabledict( 

653 (name, opts[name]) for name in supported_names 

654 ) 

655 

656 @event.listens_for(engine, "engine_connect") 

657 def set_connection_characteristics(connection): 

658 self._set_connection_characteristics( 

659 connection, characteristics 

660 ) 

661 

662 def set_connection_execution_options( 

663 self, connection: Connection, opts: Mapping[str, Any] 

664 ) -> None: 

665 supported_names = set(self.connection_characteristics).intersection( 

666 opts 

667 ) 

668 if supported_names: 

669 characteristics: Mapping[str, Any] = util.immutabledict( 

670 (name, opts[name]) for name in supported_names 

671 ) 

672 self._set_connection_characteristics(connection, characteristics) 

673 

674 def _set_connection_characteristics(self, connection, characteristics): 

675 characteristic_values = [ 

676 (name, self.connection_characteristics[name], value) 

677 for name, value in characteristics.items() 

678 ] 

679 

680 if connection.in_transaction(): 

681 trans_objs = [ 

682 (name, obj) 

683 for name, obj, _ in characteristic_values 

684 if obj.transactional 

685 ] 

686 if trans_objs: 

687 raise exc.InvalidRequestError( 

688 "This connection has already initialized a SQLAlchemy " 

689 "Transaction() object via begin() or autobegin; " 

690 "%s may not be altered unless rollback() or commit() " 

691 "is called first." 

692 % (", ".join(name for name, obj in trans_objs)) 

693 ) 

694 

695 dbapi_connection = connection.connection.dbapi_connection 

696 for _, characteristic, value in characteristic_values: 

697 characteristic.set_connection_characteristic( 

698 self, connection, dbapi_connection, value 

699 ) 

700 connection.connection._connection_record.finalize_callback.append( 

701 functools.partial(self._reset_characteristics, characteristics) 

702 ) 

703 

704 def _reset_characteristics(self, characteristics, dbapi_connection): 

705 for characteristic_name in characteristics: 

706 characteristic = self.connection_characteristics[ 

707 characteristic_name 

708 ] 

709 characteristic.reset_characteristic(self, dbapi_connection) 

710 

711 def do_begin(self, dbapi_connection): 

712 pass 

713 

714 def do_rollback(self, dbapi_connection): 

715 if self.skip_autocommit_rollback and self.detect_autocommit_setting( 

716 dbapi_connection 

717 ): 

718 return 

719 dbapi_connection.rollback() 

720 

721 def do_commit(self, dbapi_connection): 

722 dbapi_connection.commit() 

723 

724 def do_terminate(self, dbapi_connection): 

725 self.do_close(dbapi_connection) 

726 

727 def do_close(self, dbapi_connection): 

728 dbapi_connection.close() 

729 

730 @util.memoized_property 

731 def _dialect_specific_select_one(self): 

732 return str(expression.select(1).compile(dialect=self)) 

733 

734 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: 

735 try: 

736 return self.do_ping(dbapi_connection) 

737 except self.loaded_dbapi.Error as err: 

738 is_disconnect = self.is_disconnect(err, dbapi_connection, None) 

739 

740 if self._has_events: 

741 try: 

742 Connection._handle_dbapi_exception_noconnection( 

743 err, 

744 self, 

745 is_disconnect=is_disconnect, 

746 invalidate_pool_on_disconnect=False, 

747 is_pre_ping=True, 

748 ) 

749 except exc.StatementError as new_err: 

750 is_disconnect = new_err.connection_invalidated 

751 

752 if is_disconnect: 

753 return False 

754 else: 

755 raise 

756 

757 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool: 

758 cursor = dbapi_connection.cursor() 

759 try: 

760 cursor.execute(self._dialect_specific_select_one) 

761 finally: 

762 cursor.close() 

763 return True 

764 

765 def create_xid(self): 

766 """Create a random two-phase transaction ID. 

767 

768 This id will be passed to do_begin_twophase(), do_rollback_twophase(), 

769 do_commit_twophase(). Its format is unspecified. 

770 """ 

771 

772 return "_sa_%032x" % random.randint(0, 2**128) 

773 

774 def do_savepoint(self, connection, name): 

775 connection.execute(expression.SavepointClause(name)) 

776 

777 def do_rollback_to_savepoint(self, connection, name): 

778 connection.execute(expression.RollbackToSavepointClause(name)) 

779 

780 def do_release_savepoint(self, connection, name): 

781 connection.execute(expression.ReleaseSavepointClause(name)) 

782 

783 def _deliver_insertmanyvalues_batches( 

784 self, 

785 connection, 

786 cursor, 

787 statement, 

788 parameters, 

789 generic_setinputsizes, 

790 context, 

791 ): 

792 context = cast(DefaultExecutionContext, context) 

793 compiled = cast(SQLCompiler, context.compiled) 

794 

795 _composite_sentinel_proc: Sequence[ 

796 Optional[_ResultProcessorType[Any]] 

797 ] = () 

798 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None 

799 _sentinel_proc_initialized: bool = False 

800 

801 compiled_parameters = context.compiled_parameters 

802 

803 imv = compiled._insertmanyvalues 

804 assert imv is not None 

805 

806 is_returning: Final[bool] = bool(compiled.effective_returning) 

807 batch_size = context.execution_options.get( 

808 "insertmanyvalues_page_size", self.insertmanyvalues_page_size 

809 ) 

810 

811 if compiled.schema_translate_map: 

812 schema_translate_map = context.execution_options.get( 

813 "schema_translate_map", {} 

814 ) 

815 else: 

816 schema_translate_map = None 

817 

818 if is_returning: 

819 result: Optional[List[Any]] = [] 

820 context._insertmanyvalues_rows = result 

821 

822 sort_by_parameter_order = imv.sort_by_parameter_order 

823 

824 else: 

825 sort_by_parameter_order = False 

826 result = None 

827 

828 for imv_batch in compiled._deliver_insertmanyvalues_batches( 

829 statement, 

830 parameters, 

831 compiled_parameters, 

832 generic_setinputsizes, 

833 batch_size, 

834 sort_by_parameter_order, 

835 schema_translate_map, 

836 ): 

837 yield imv_batch 

838 

839 if is_returning: 

840 

841 try: 

842 rows = context.fetchall_for_returning(cursor) 

843 except BaseException as be: 

844 connection._handle_dbapi_exception( 

845 be, 

846 sql_util._long_statement(imv_batch.replaced_statement), 

847 imv_batch.replaced_parameters, 

848 None, 

849 context, 

850 is_sub_exec=True, 

851 ) 

852 

853 # I would have thought "is_returning: Final[bool]" 

854 # would have assured this but pylance thinks not 

855 assert result is not None 

856 

857 if imv.num_sentinel_columns and not imv_batch.is_downgraded: 

858 composite_sentinel = imv.num_sentinel_columns > 1 

859 if imv.implicit_sentinel: 

860 # for implicit sentinel, which is currently single-col 

861 # integer autoincrement, do a simple sort. 

862 assert not composite_sentinel 

863 result.extend( 

864 sorted(rows, key=operator.itemgetter(-1)) 

865 ) 

866 continue 

867 

868 # otherwise, create dictionaries to match up batches 

869 # with parameters 

870 assert imv.sentinel_param_keys 

871 assert imv.sentinel_columns 

872 

873 _nsc = imv.num_sentinel_columns 

874 

875 if not _sentinel_proc_initialized: 

876 if composite_sentinel: 

877 _composite_sentinel_proc = [ 

878 col.type._cached_result_processor( 

879 self, cursor_desc[1] 

880 ) 

881 for col, cursor_desc in zip( 

882 imv.sentinel_columns, 

883 cursor.description[-_nsc:], 

884 ) 

885 ] 

886 else: 

887 _scalar_sentinel_proc = ( 

888 imv.sentinel_columns[0] 

889 ).type._cached_result_processor( 

890 self, cursor.description[-1][1] 

891 ) 

892 _sentinel_proc_initialized = True 

893 

894 rows_by_sentinel: Union[ 

895 Dict[Tuple[Any, ...], Any], 

896 Dict[Any, Any], 

897 ] 

898 if composite_sentinel: 

899 rows_by_sentinel = { 

900 tuple( 

901 (proc(val) if proc else val) 

902 for val, proc in zip( 

903 row[-_nsc:], _composite_sentinel_proc 

904 ) 

905 ): row 

906 for row in rows 

907 } 

908 elif _scalar_sentinel_proc: 

909 rows_by_sentinel = { 

910 _scalar_sentinel_proc(row[-1]): row for row in rows 

911 } 

912 else: 

913 rows_by_sentinel = {row[-1]: row for row in rows} 

914 

915 if len(rows_by_sentinel) != len(imv_batch.batch): 

916 # see test_insert_exec.py:: 

917 # IMVSentinelTest::test_sentinel_incorrect_rowcount 

918 # for coverage / demonstration 

919 raise exc.InvalidRequestError( 

920 f"Sentinel-keyed result set did not produce " 

921 f"correct number of rows {len(imv_batch.batch)}; " 

922 "produced " 

923 f"{len(rows_by_sentinel)}. Please ensure the " 

924 "sentinel column is fully unique and populated in " 

925 "all cases." 

926 ) 

927 

928 try: 

929 ordered_rows = [ 

930 rows_by_sentinel[sentinel_keys] 

931 for sentinel_keys in imv_batch.sentinel_values 

932 ] 

933 except KeyError as ke: 

934 # see test_insert_exec.py:: 

935 # IMVSentinelTest::test_sentinel_cant_match_keys 

936 # for coverage / demonstration 

937 raise exc.InvalidRequestError( 

938 f"Can't match sentinel values in result set to " 

939 f"parameter sets; key {ke.args[0]!r} was not " 

940 "found. " 

941 "There may be a mismatch between the datatype " 

942 "passed to the DBAPI driver vs. that which it " 

943 "returns in a result row. Ensure the given " 

944 "Python value matches the expected result type " 

945 "*exactly*, taking care to not rely upon implicit " 

946 "conversions which may occur such as when using " 

947 "strings in place of UUID or integer values, etc. " 

948 ) from ke 

949 

950 result.extend(ordered_rows) 

951 

952 else: 

953 result.extend(rows) 

954 

955 def do_executemany(self, cursor, statement, parameters, context=None): 

956 cursor.executemany(statement, parameters) 

957 

958 def do_execute(self, cursor, statement, parameters, context=None): 

959 cursor.execute(statement, parameters) 

960 

961 def do_execute_no_params(self, cursor, statement, context=None): 

962 cursor.execute(statement) 

963 

964 def is_disconnect( 

965 self, 

966 e: DBAPIModule.Error, 

967 connection: Union[ 

968 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None 

969 ], 

970 cursor: Optional[interfaces.DBAPICursor], 

971 ) -> bool: 

972 return False 

973 

974 @util.memoized_instancemethod 

975 def _gen_allowed_isolation_levels(self, dbapi_conn): 

976 try: 

977 raw_levels = list(self.get_isolation_level_values(dbapi_conn)) 

978 except NotImplementedError: 

979 return None 

980 else: 

981 normalized_levels = [ 

982 level.replace("_", " ").upper() for level in raw_levels 

983 ] 

984 if raw_levels != normalized_levels: 

985 raise ValueError( 

986 f"Dialect {self.name!r} get_isolation_level_values() " 

987 f"method should return names as UPPERCASE using spaces, " 

988 f"not underscores; got " 

989 f"{sorted(set(raw_levels).difference(normalized_levels))}" 

990 ) 

991 return tuple(normalized_levels) 

992 

993 def _assert_and_set_isolation_level(self, dbapi_conn, level): 

994 level = level.replace("_", " ").upper() 

995 

996 _allowed_isolation_levels = self._gen_allowed_isolation_levels( 

997 dbapi_conn 

998 ) 

999 if ( 

1000 _allowed_isolation_levels 

1001 and level not in _allowed_isolation_levels 

1002 ): 

1003 raise exc.ArgumentError( 

1004 f"Invalid value {level!r} for isolation_level. " 

1005 f"Valid isolation levels for {self.name!r} are " 

1006 f"{', '.join(_allowed_isolation_levels)}" 

1007 ) 

1008 

1009 self.set_isolation_level(dbapi_conn, level) 

1010 

1011 def reset_isolation_level(self, dbapi_conn): 

1012 if self._on_connect_isolation_level is not None: 

1013 assert ( 

1014 self._on_connect_isolation_level == "AUTOCOMMIT" 

1015 or self._on_connect_isolation_level 

1016 == self.default_isolation_level 

1017 ) 

1018 self._assert_and_set_isolation_level( 

1019 dbapi_conn, self._on_connect_isolation_level 

1020 ) 

1021 else: 

1022 assert self.default_isolation_level is not None 

1023 self._assert_and_set_isolation_level( 

1024 dbapi_conn, 

1025 self.default_isolation_level, 

1026 ) 

1027 

1028 def normalize_name(self, name): 

1029 if name is None: 

1030 return None 

1031 

1032 name_lower = name.lower() 

1033 name_upper = name.upper() 

1034 

1035 if name_upper == name_lower: 

1036 # name has no upper/lower conversion, e.g. non-european characters. 

1037 # return unchanged 

1038 return name 

1039 elif name_upper == name and not ( 

1040 self.identifier_preparer._requires_quotes 

1041 )(name_lower): 

1042 # name is all uppercase and doesn't require quoting; normalize 

1043 # to all lower case 

1044 return name_lower 

1045 elif name_lower == name: 

1046 # name is all lower case, which if denormalized means we need to 

1047 # force quoting on it 

1048 return quoted_name(name, quote=True) 

1049 else: 

1050 # name is mixed case, means it will be quoted in SQL when used 

1051 # later, no normalizes 

1052 return name 

1053 

1054 def denormalize_name(self, name): 

1055 if name is None: 

1056 return None 

1057 

1058 name_lower = name.lower() 

1059 name_upper = name.upper() 

1060 

1061 if name_upper == name_lower: 

1062 # name has no upper/lower conversion, e.g. non-european characters. 

1063 # return unchanged 

1064 return name 

1065 elif name_lower == name and not ( 

1066 self.identifier_preparer._requires_quotes 

1067 )(name_lower): 

1068 name = name_upper 

1069 return name 

1070 

1071 def get_driver_connection(self, connection: DBAPIConnection) -> Any: 

1072 return connection 

1073 

1074 def _overrides_default(self, method): 

1075 return ( 

1076 getattr(type(self), method).__code__ 

1077 is not getattr(DefaultDialect, method).__code__ 

1078 ) 

1079 

1080 def _default_multi_reflect( 

1081 self, 

1082 single_tbl_method, 

1083 connection, 

1084 kind, 

1085 schema, 

1086 filter_names, 

1087 scope, 

1088 **kw, 

1089 ): 

1090 names_fns = [] 

1091 temp_names_fns = [] 

1092 if ObjectKind.TABLE in kind: 

1093 names_fns.append(self.get_table_names) 

1094 temp_names_fns.append(self.get_temp_table_names) 

1095 if ObjectKind.VIEW in kind: 

1096 names_fns.append(self.get_view_names) 

1097 temp_names_fns.append(self.get_temp_view_names) 

1098 if ObjectKind.MATERIALIZED_VIEW in kind: 

1099 names_fns.append(self.get_materialized_view_names) 

1100 # no temp materialized view at the moment 

1101 # temp_names_fns.append(self.get_temp_materialized_view_names) 

1102 

1103 unreflectable = kw.pop("unreflectable", {}) 

1104 

1105 if ( 

1106 filter_names 

1107 and scope is ObjectScope.ANY 

1108 and kind is ObjectKind.ANY 

1109 ): 

1110 # if names are given and no qualification on type of table 

1111 # (i.e. the Table(..., autoload) case), take the names as given, 

1112 # don't run names queries. If a table does not exit 

1113 # NoSuchTableError is raised and it's skipped 

1114 

1115 # this also suits the case for mssql where we can reflect 

1116 # individual temp tables but there's no temp_names_fn 

1117 names = filter_names 

1118 else: 

1119 names = [] 

1120 name_kw = {"schema": schema, **kw} 

1121 fns = [] 

1122 if ObjectScope.DEFAULT in scope: 

1123 fns.extend(names_fns) 

1124 if ObjectScope.TEMPORARY in scope: 

1125 fns.extend(temp_names_fns) 

1126 

1127 for fn in fns: 

1128 try: 

1129 names.extend(fn(connection, **name_kw)) 

1130 except NotImplementedError: 

1131 pass 

1132 

1133 if filter_names: 

1134 filter_names = set(filter_names) 

1135 

1136 # iterate over all the tables/views and call the single table method 

1137 for table in names: 

1138 if not filter_names or table in filter_names: 

1139 key = (schema, table) 

1140 try: 

1141 yield ( 

1142 key, 

1143 single_tbl_method( 

1144 connection, table, schema=schema, **kw 

1145 ), 

1146 ) 

1147 except exc.UnreflectableTableError as err: 

1148 if key not in unreflectable: 

1149 unreflectable[key] = err 

1150 except exc.NoSuchTableError: 

1151 pass 

1152 

1153 def get_multi_table_options(self, connection, **kw): 

1154 return self._default_multi_reflect( 

1155 self.get_table_options, connection, **kw 

1156 ) 

1157 

1158 def get_multi_columns(self, connection, **kw): 

1159 return self._default_multi_reflect(self.get_columns, connection, **kw) 

1160 

1161 def get_multi_pk_constraint(self, connection, **kw): 

1162 return self._default_multi_reflect( 

1163 self.get_pk_constraint, connection, **kw 

1164 ) 

1165 

1166 def get_multi_foreign_keys(self, connection, **kw): 

1167 return self._default_multi_reflect( 

1168 self.get_foreign_keys, connection, **kw 

1169 ) 

1170 

1171 def get_multi_indexes(self, connection, **kw): 

1172 return self._default_multi_reflect(self.get_indexes, connection, **kw) 

1173 

1174 def get_multi_unique_constraints(self, connection, **kw): 

1175 return self._default_multi_reflect( 

1176 self.get_unique_constraints, connection, **kw 

1177 ) 

1178 

1179 def get_multi_check_constraints(self, connection, **kw): 

1180 return self._default_multi_reflect( 

1181 self.get_check_constraints, connection, **kw 

1182 ) 

1183 

1184 def get_multi_table_comment(self, connection, **kw): 

1185 return self._default_multi_reflect( 

1186 self.get_table_comment, connection, **kw 

1187 ) 

1188 

1189 

1190class StrCompileDialect(DefaultDialect): 

1191 statement_compiler = compiler.StrSQLCompiler 

1192 ddl_compiler = compiler.DDLCompiler 

1193 type_compiler_cls = compiler.StrSQLTypeCompiler 

1194 preparer = compiler.IdentifierPreparer 

1195 

1196 insert_returning = True 

1197 update_returning = True 

1198 delete_returning = True 

1199 

1200 supports_statement_cache = True 

1201 

1202 supports_identity_columns = True 

1203 

1204 supports_sequences = True 

1205 sequences_optional = True 

1206 preexecute_autoincrement_sequences = False 

1207 

1208 supports_native_boolean = True 

1209 

1210 supports_multivalues_insert = True 

1211 supports_simple_order_by_label = True 

1212 

1213 

1214class DefaultExecutionContext(ExecutionContext): 

1215 isinsert = False 

1216 isupdate = False 

1217 isdelete = False 

1218 is_crud = False 

1219 is_text = False 

1220 isddl = False 

1221 

1222 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE 

1223 

1224 compiled: Optional[Compiled] = None 

1225 result_column_struct: Optional[ 

1226 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool] 

1227 ] = None 

1228 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None 

1229 

1230 execution_options: _ExecuteOptions = util.EMPTY_DICT 

1231 

1232 cursor_fetch_strategy = _cursor._DEFAULT_FETCH 

1233 

1234 invoked_statement: Optional[Executable] = None 

1235 

1236 _is_implicit_returning = False 

1237 _is_explicit_returning = False 

1238 _is_supplemental_returning = False 

1239 _is_server_side = False 

1240 

1241 _soft_closed = False 

1242 

1243 _rowcount: Optional[int] = None 

1244 

1245 # a hook for SQLite's translation of 

1246 # result column names 

1247 # NOTE: pyhive is using this hook, can't remove it :( 

1248 _translate_colname: Optional[ 

1249 Callable[[str], Tuple[str, Optional[str]]] 

1250 ] = None 

1251 

1252 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict() 

1253 """used by set_input_sizes(). 

1254 

1255 This collection comes from ``ExpandedState.parameter_expansion``. 

1256 

1257 """ 

1258 

1259 cache_hit = NO_CACHE_KEY 

1260 

1261 root_connection: Connection 

1262 _dbapi_connection: PoolProxiedConnection 

1263 dialect: Dialect 

1264 unicode_statement: str 

1265 cursor: DBAPICursor 

1266 compiled_parameters: List[_MutableCoreSingleExecuteParams] 

1267 parameters: _DBAPIMultiExecuteParams 

1268 extracted_parameters: Optional[Sequence[BindParameter[Any]]] 

1269 

1270 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) 

1271 

1272 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None 

1273 _num_sentinel_cols: int = 0 

1274 

1275 @classmethod 

1276 def _init_ddl( 

1277 cls, 

1278 dialect: Dialect, 

1279 connection: Connection, 

1280 dbapi_connection: PoolProxiedConnection, 

1281 execution_options: _ExecuteOptions, 

1282 compiled_ddl: DDLCompiler, 

1283 ) -> ExecutionContext: 

1284 """Initialize execution context for an ExecutableDDLElement 

1285 construct.""" 

1286 

1287 self = cls.__new__(cls) 

1288 self.root_connection = connection 

1289 self._dbapi_connection = dbapi_connection 

1290 self.dialect = connection.dialect 

1291 

1292 self.compiled = compiled = compiled_ddl 

1293 self.isddl = True 

1294 

1295 self.execution_options = execution_options 

1296 

1297 self.unicode_statement = str(compiled) 

1298 if compiled.schema_translate_map: 

1299 schema_translate_map = self.execution_options.get( 

1300 "schema_translate_map", {} 

1301 ) 

1302 

1303 rst = compiled.preparer._render_schema_translates 

1304 self.unicode_statement = rst( 

1305 self.unicode_statement, schema_translate_map 

1306 ) 

1307 

1308 self.statement = self.unicode_statement 

1309 

1310 self.cursor = self.create_cursor() 

1311 self.compiled_parameters = [] 

1312 

1313 if dialect.positional: 

1314 self.parameters = [dialect.execute_sequence_format()] 

1315 else: 

1316 self.parameters = [self._empty_dict_params] 

1317 

1318 return self 

1319 

1320 @classmethod 

1321 def _init_compiled( 

1322 cls, 

1323 dialect: Dialect, 

1324 connection: Connection, 

1325 dbapi_connection: PoolProxiedConnection, 

1326 execution_options: _ExecuteOptions, 

1327 compiled: SQLCompiler, 

1328 parameters: _CoreMultiExecuteParams, 

1329 invoked_statement: Executable, 

1330 extracted_parameters: Optional[Sequence[BindParameter[Any]]], 

1331 cache_hit: CacheStats = CacheStats.CACHING_DISABLED, 

1332 ) -> ExecutionContext: 

1333 """Initialize execution context for a Compiled construct.""" 

1334 

1335 self = cls.__new__(cls) 

1336 self.root_connection = connection 

1337 self._dbapi_connection = dbapi_connection 

1338 self.dialect = connection.dialect 

1339 self.extracted_parameters = extracted_parameters 

1340 self.invoked_statement = invoked_statement 

1341 self.compiled = compiled 

1342 self.cache_hit = cache_hit 

1343 

1344 self.execution_options = execution_options 

1345 

1346 self.result_column_struct = ( 

1347 compiled._result_columns, 

1348 compiled._ordered_columns, 

1349 compiled._textual_ordered_columns, 

1350 compiled._ad_hoc_textual, 

1351 compiled._loose_column_name_matching, 

1352 ) 

1353 

1354 self.isinsert = ii = compiled.isinsert 

1355 self.isupdate = iu = compiled.isupdate 

1356 self.isdelete = id_ = compiled.isdelete 

1357 self.is_text = compiled.isplaintext 

1358 

1359 if ii or iu or id_: 

1360 dml_statement = compiled.compile_state.statement # type: ignore 

1361 if TYPE_CHECKING: 

1362 assert isinstance(dml_statement, UpdateBase) 

1363 self.is_crud = True 

1364 self._is_explicit_returning = ier = bool(dml_statement._returning) 

1365 self._is_implicit_returning = iir = bool( 

1366 compiled.implicit_returning 

1367 ) 

1368 if iir and dml_statement._supplemental_returning: 

1369 self._is_supplemental_returning = True 

1370 

1371 # dont mix implicit and explicit returning 

1372 assert not (iir and ier) 

1373 

1374 if (ier or iir) and compiled.for_executemany: 

1375 if ii and not self.dialect.insert_executemany_returning: 

1376 raise exc.InvalidRequestError( 

1377 f"Dialect {self.dialect.dialect_description} with " 

1378 f"current server capabilities does not support " 

1379 "INSERT..RETURNING when executemany is used" 

1380 ) 

1381 elif ( 

1382 ii 

1383 and dml_statement._sort_by_parameter_order 

1384 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501 

1385 ): 

1386 raise exc.InvalidRequestError( 

1387 f"Dialect {self.dialect.dialect_description} with " 

1388 f"current server capabilities does not support " 

1389 "INSERT..RETURNING with deterministic row ordering " 

1390 "when executemany is used" 

1391 ) 

1392 elif ( 

1393 ii 

1394 and self.dialect.use_insertmanyvalues 

1395 and not compiled._insertmanyvalues 

1396 ): 

1397 raise exc.InvalidRequestError( 

1398 'Statement does not have "insertmanyvalues" ' 

1399 "enabled, can't use INSERT..RETURNING with " 

1400 "executemany in this case." 

1401 ) 

1402 elif iu and not self.dialect.update_executemany_returning: 

1403 raise exc.InvalidRequestError( 

1404 f"Dialect {self.dialect.dialect_description} with " 

1405 f"current server capabilities does not support " 

1406 "UPDATE..RETURNING when executemany is used" 

1407 ) 

1408 elif id_ and not self.dialect.delete_executemany_returning: 

1409 raise exc.InvalidRequestError( 

1410 f"Dialect {self.dialect.dialect_description} with " 

1411 f"current server capabilities does not support " 

1412 "DELETE..RETURNING when executemany is used" 

1413 ) 

1414 

1415 if not parameters: 

1416 self.compiled_parameters = [ 

1417 compiled.construct_params( 

1418 extracted_parameters=extracted_parameters, 

1419 escape_names=False, 

1420 ) 

1421 ] 

1422 else: 

1423 self.compiled_parameters = [ 

1424 compiled.construct_params( 

1425 m, 

1426 escape_names=False, 

1427 _group_number=grp, 

1428 extracted_parameters=extracted_parameters, 

1429 ) 

1430 for grp, m in enumerate(parameters) 

1431 ] 

1432 

1433 if len(parameters) > 1: 

1434 if self.isinsert and compiled._insertmanyvalues: 

1435 self.execute_style = ExecuteStyle.INSERTMANYVALUES 

1436 

1437 imv = compiled._insertmanyvalues 

1438 if imv.sentinel_columns is not None: 

1439 self._num_sentinel_cols = imv.num_sentinel_columns 

1440 else: 

1441 self.execute_style = ExecuteStyle.EXECUTEMANY 

1442 

1443 self.unicode_statement = compiled.string 

1444 

1445 self.cursor = self.create_cursor() 

1446 

1447 if self.compiled.insert_prefetch or self.compiled.update_prefetch: 

1448 self._process_execute_defaults() 

1449 

1450 processors = compiled._bind_processors 

1451 

1452 flattened_processors: Mapping[ 

1453 str, _BindProcessorType[Any] 

1454 ] = processors # type: ignore[assignment] 

1455 

1456 if compiled.literal_execute_params or compiled.post_compile_params: 

1457 if self.executemany: 

1458 raise exc.InvalidRequestError( 

1459 "'literal_execute' or 'expanding' parameters can't be " 

1460 "used with executemany()" 

1461 ) 

1462 

1463 expanded_state = compiled._process_parameters_for_postcompile( 

1464 self.compiled_parameters[0] 

1465 ) 

1466 

1467 # re-assign self.unicode_statement 

1468 self.unicode_statement = expanded_state.statement 

1469 

1470 self._expanded_parameters = expanded_state.parameter_expansion 

1471 

1472 flattened_processors = dict(processors) # type: ignore 

1473 flattened_processors.update(expanded_state.processors) 

1474 positiontup = expanded_state.positiontup 

1475 elif compiled.positional: 

1476 positiontup = self.compiled.positiontup 

1477 else: 

1478 positiontup = None 

1479 

1480 if compiled.schema_translate_map: 

1481 schema_translate_map = self.execution_options.get( 

1482 "schema_translate_map", {} 

1483 ) 

1484 rst = compiled.preparer._render_schema_translates 

1485 self.unicode_statement = rst( 

1486 self.unicode_statement, schema_translate_map 

1487 ) 

1488 

1489 # final self.unicode_statement is now assigned, encode if needed 

1490 # by dialect 

1491 self.statement = self.unicode_statement 

1492 

1493 # Convert the dictionary of bind parameter values 

1494 # into a dict or list to be sent to the DBAPI's 

1495 # execute() or executemany() method. 

1496 

1497 if compiled.positional: 

1498 core_positional_parameters: MutableSequence[Sequence[Any]] = [] 

1499 assert positiontup is not None 

1500 for compiled_params in self.compiled_parameters: 

1501 l_param: List[Any] = [ 

1502 ( 

1503 flattened_processors[key](compiled_params[key]) 

1504 if key in flattened_processors 

1505 else compiled_params[key] 

1506 ) 

1507 for key in positiontup 

1508 ] 

1509 core_positional_parameters.append( 

1510 dialect.execute_sequence_format(l_param) 

1511 ) 

1512 

1513 self.parameters = core_positional_parameters 

1514 else: 

1515 core_dict_parameters: MutableSequence[Dict[str, Any]] = [] 

1516 escaped_names = compiled.escaped_bind_names 

1517 

1518 # note that currently, "expanded" parameters will be present 

1519 # in self.compiled_parameters in their quoted form. This is 

1520 # slightly inconsistent with the approach taken as of 

1521 # #8056 where self.compiled_parameters is meant to contain unquoted 

1522 # param names. 

1523 d_param: Dict[str, Any] 

1524 for compiled_params in self.compiled_parameters: 

1525 if escaped_names: 

1526 d_param = { 

1527 escaped_names.get(key, key): ( 

1528 flattened_processors[key](compiled_params[key]) 

1529 if key in flattened_processors 

1530 else compiled_params[key] 

1531 ) 

1532 for key in compiled_params 

1533 } 

1534 else: 

1535 d_param = { 

1536 key: ( 

1537 flattened_processors[key](compiled_params[key]) 

1538 if key in flattened_processors 

1539 else compiled_params[key] 

1540 ) 

1541 for key in compiled_params 

1542 } 

1543 

1544 core_dict_parameters.append(d_param) 

1545 

1546 self.parameters = core_dict_parameters 

1547 

1548 return self 

1549 

1550 @classmethod 

1551 def _init_statement( 

1552 cls, 

1553 dialect: Dialect, 

1554 connection: Connection, 

1555 dbapi_connection: PoolProxiedConnection, 

1556 execution_options: _ExecuteOptions, 

1557 statement: str, 

1558 parameters: _DBAPIMultiExecuteParams, 

1559 ) -> ExecutionContext: 

1560 """Initialize execution context for a string SQL statement.""" 

1561 

1562 self = cls.__new__(cls) 

1563 self.root_connection = connection 

1564 self._dbapi_connection = dbapi_connection 

1565 self.dialect = connection.dialect 

1566 self.is_text = True 

1567 

1568 self.execution_options = execution_options 

1569 

1570 if not parameters: 

1571 if self.dialect.positional: 

1572 self.parameters = [dialect.execute_sequence_format()] 

1573 else: 

1574 self.parameters = [self._empty_dict_params] 

1575 elif isinstance(parameters[0], dialect.execute_sequence_format): 

1576 self.parameters = parameters 

1577 elif isinstance(parameters[0], dict): 

1578 self.parameters = parameters 

1579 else: 

1580 self.parameters = [ 

1581 dialect.execute_sequence_format(p) for p in parameters 

1582 ] 

1583 

1584 if len(parameters) > 1: 

1585 self.execute_style = ExecuteStyle.EXECUTEMANY 

1586 

1587 self.statement = self.unicode_statement = statement 

1588 

1589 self.cursor = self.create_cursor() 

1590 return self 

1591 

1592 @classmethod 

1593 def _init_default( 

1594 cls, 

1595 dialect: Dialect, 

1596 connection: Connection, 

1597 dbapi_connection: PoolProxiedConnection, 

1598 execution_options: _ExecuteOptions, 

1599 ) -> ExecutionContext: 

1600 """Initialize execution context for a ColumnDefault construct.""" 

1601 

1602 self = cls.__new__(cls) 

1603 self.root_connection = connection 

1604 self._dbapi_connection = dbapi_connection 

1605 self.dialect = connection.dialect 

1606 

1607 self.execution_options = execution_options 

1608 

1609 self.cursor = self.create_cursor() 

1610 return self 

1611 

1612 def _get_cache_stats(self) -> str: 

1613 if self.compiled is None: 

1614 return "raw sql" 

1615 

1616 now = perf_counter() 

1617 

1618 ch = self.cache_hit 

1619 

1620 gen_time = self.compiled._gen_time 

1621 assert gen_time is not None 

1622 

1623 if ch is NO_CACHE_KEY: 

1624 return "no key %.5fs" % (now - gen_time,) 

1625 elif ch is CACHE_HIT: 

1626 return "cached since %.4gs ago" % (now - gen_time,) 

1627 elif ch is CACHE_MISS: 

1628 return "generated in %.5fs" % (now - gen_time,) 

1629 elif ch is CACHING_DISABLED: 

1630 if "_cache_disable_reason" in self.execution_options: 

1631 return "caching disabled (%s) %.5fs " % ( 

1632 self.execution_options["_cache_disable_reason"], 

1633 now - gen_time, 

1634 ) 

1635 else: 

1636 return "caching disabled %.5fs" % (now - gen_time,) 

1637 elif ch is NO_DIALECT_SUPPORT: 

1638 return "dialect %s+%s does not support caching %.5fs" % ( 

1639 self.dialect.name, 

1640 self.dialect.driver, 

1641 now - gen_time, 

1642 ) 

1643 else: 

1644 return "unknown" 

1645 

1646 @property 

1647 def executemany(self): # type: ignore[override] 

1648 return self.execute_style in ( 

1649 ExecuteStyle.EXECUTEMANY, 

1650 ExecuteStyle.INSERTMANYVALUES, 

1651 ) 

1652 

1653 @util.memoized_property 

1654 def identifier_preparer(self): 

1655 if self.compiled: 

1656 return self.compiled.preparer 

1657 elif "schema_translate_map" in self.execution_options: 

1658 return self.dialect.identifier_preparer._with_schema_translate( 

1659 self.execution_options["schema_translate_map"] 

1660 ) 

1661 else: 

1662 return self.dialect.identifier_preparer 

1663 

1664 @util.memoized_property 

1665 def engine(self): 

1666 return self.root_connection.engine 

1667 

1668 @util.memoized_property 

1669 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1670 if TYPE_CHECKING: 

1671 assert isinstance(self.compiled, SQLCompiler) 

1672 return self.compiled.postfetch 

1673 

1674 @util.memoized_property 

1675 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1676 if TYPE_CHECKING: 

1677 assert isinstance(self.compiled, SQLCompiler) 

1678 if self.isinsert: 

1679 return self.compiled.insert_prefetch 

1680 elif self.isupdate: 

1681 return self.compiled.update_prefetch 

1682 else: 

1683 return () 

1684 

1685 @util.memoized_property 

1686 def no_parameters(self): 

1687 return self.execution_options.get("no_parameters", False) 

1688 

1689 def _execute_scalar( 

1690 self, 

1691 stmt: str, 

1692 type_: Optional[TypeEngine[Any]], 

1693 parameters: Optional[_DBAPISingleExecuteParams] = None, 

1694 ) -> Any: 

1695 """Execute a string statement on the current cursor, returning a 

1696 scalar result. 

1697 

1698 Used to fire off sequences, default phrases, and "select lastrowid" 

1699 types of statements individually or in the context of a parent INSERT 

1700 or UPDATE statement. 

1701 

1702 """ 

1703 

1704 conn = self.root_connection 

1705 

1706 if "schema_translate_map" in self.execution_options: 

1707 schema_translate_map = self.execution_options.get( 

1708 "schema_translate_map", {} 

1709 ) 

1710 

1711 rst = self.identifier_preparer._render_schema_translates 

1712 stmt = rst(stmt, schema_translate_map) 

1713 

1714 if not parameters: 

1715 if self.dialect.positional: 

1716 parameters = self.dialect.execute_sequence_format() 

1717 else: 

1718 parameters = {} 

1719 

1720 conn._cursor_execute(self.cursor, stmt, parameters, context=self) 

1721 row = self.cursor.fetchone() 

1722 if row is not None: 

1723 r = row[0] 

1724 else: 

1725 r = None 

1726 if type_ is not None: 

1727 # apply type post processors to the result 

1728 proc = type_._cached_result_processor( 

1729 self.dialect, self.cursor.description[0][1] 

1730 ) 

1731 if proc: 

1732 return proc(r) 

1733 return r 

1734 

1735 @util.memoized_property 

1736 def connection(self): 

1737 return self.root_connection 

1738 

1739 def _use_server_side_cursor(self): 

1740 if not self.dialect.supports_server_side_cursors: 

1741 return False 

1742 

1743 if self.dialect.server_side_cursors: 

1744 # this is deprecated 

1745 use_server_side = self.execution_options.get( 

1746 "stream_results", True 

1747 ) and ( 

1748 self.compiled 

1749 and isinstance(self.compiled.statement, expression.Selectable) 

1750 or ( 

1751 ( 

1752 not self.compiled 

1753 or isinstance( 

1754 self.compiled.statement, expression.TextClause 

1755 ) 

1756 ) 

1757 and self.unicode_statement 

1758 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement) 

1759 ) 

1760 ) 

1761 else: 

1762 use_server_side = self.execution_options.get( 

1763 "stream_results", False 

1764 ) 

1765 

1766 return use_server_side 

1767 

1768 def create_cursor(self) -> DBAPICursor: 

1769 if ( 

1770 # inlining initial preference checks for SS cursors 

1771 self.dialect.supports_server_side_cursors 

1772 and ( 

1773 self.execution_options.get("stream_results", False) 

1774 or ( 

1775 self.dialect.server_side_cursors 

1776 and self._use_server_side_cursor() 

1777 ) 

1778 ) 

1779 ): 

1780 self._is_server_side = True 

1781 return self.create_server_side_cursor() 

1782 else: 

1783 self._is_server_side = False 

1784 return self.create_default_cursor() 

1785 

1786 def fetchall_for_returning(self, cursor): 

1787 return cursor.fetchall() 

1788 

1789 def create_default_cursor(self) -> DBAPICursor: 

1790 return self._dbapi_connection.cursor() 

1791 

1792 def create_server_side_cursor(self) -> DBAPICursor: 

1793 raise NotImplementedError() 

1794 

1795 def pre_exec(self): 

1796 pass 

1797 

1798 def get_out_parameter_values(self, names): 

1799 raise NotImplementedError( 

1800 "This dialect does not support OUT parameters" 

1801 ) 

1802 

1803 def post_exec(self): 

1804 pass 

1805 

1806 def get_result_processor( 

1807 self, type_: TypeEngine[Any], colname: str, coltype: DBAPIType 

1808 ) -> Optional[_ResultProcessorType[Any]]: 

1809 """Return a 'result processor' for a given type as present in 

1810 cursor.description. 

1811 

1812 This has a default implementation that dialects can override 

1813 for context-sensitive result type handling. 

1814 

1815 """ 

1816 return type_._cached_result_processor(self.dialect, coltype) 

1817 

1818 def get_lastrowid(self) -> int: 

1819 """return self.cursor.lastrowid, or equivalent, after an INSERT. 

1820 

1821 This may involve calling special cursor functions, issuing a new SELECT 

1822 on the cursor (or a new one), or returning a stored value that was 

1823 calculated within post_exec(). 

1824 

1825 This function will only be called for dialects which support "implicit" 

1826 primary key generation, keep preexecute_autoincrement_sequences set to 

1827 False, and when no explicit id value was bound to the statement. 

1828 

1829 The function is called once for an INSERT statement that would need to 

1830 return the last inserted primary key for those dialects that make use 

1831 of the lastrowid concept. In these cases, it is called directly after 

1832 :meth:`.ExecutionContext.post_exec`. 

1833 

1834 """ 

1835 return self.cursor.lastrowid 

1836 

1837 def handle_dbapi_exception(self, e): 

1838 pass 

1839 

1840 @util.non_memoized_property 

1841 def rowcount(self) -> int: 

1842 if self._rowcount is not None: 

1843 return self._rowcount 

1844 else: 

1845 return self.cursor.rowcount 

1846 

1847 @property 

1848 def _has_rowcount(self): 

1849 return self._rowcount is not None 

1850 

1851 def supports_sane_rowcount(self): 

1852 return self.dialect.supports_sane_rowcount 

1853 

1854 def supports_sane_multi_rowcount(self): 

1855 return self.dialect.supports_sane_multi_rowcount 

1856 

1857 def _setup_result_proxy(self): 

1858 exec_opt = self.execution_options 

1859 

1860 if self._rowcount is None and exec_opt.get("preserve_rowcount", False): 

1861 self._rowcount = self.cursor.rowcount 

1862 

1863 yp: Optional[Union[int, bool]] 

1864 if self.is_crud or self.is_text: 

1865 result = self._setup_dml_or_text_result() 

1866 yp = False 

1867 else: 

1868 yp = exec_opt.get("yield_per", None) 

1869 sr = self._is_server_side or exec_opt.get("stream_results", False) 

1870 strategy = self.cursor_fetch_strategy 

1871 if sr and strategy is _cursor._DEFAULT_FETCH: 

1872 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1873 self.cursor, self.execution_options 

1874 ) 

1875 cursor_description: _DBAPICursorDescription = ( 

1876 strategy.alternate_cursor_description 

1877 or self.cursor.description 

1878 ) 

1879 if cursor_description is None: 

1880 strategy = _cursor._NO_CURSOR_DQL 

1881 

1882 result = _cursor.CursorResult(self, strategy, cursor_description) 

1883 

1884 compiled = self.compiled 

1885 

1886 if ( 

1887 compiled 

1888 and not self.isddl 

1889 and cast(SQLCompiler, compiled).has_out_parameters 

1890 ): 

1891 self._setup_out_parameters(result) 

1892 

1893 self._soft_closed = result._soft_closed 

1894 

1895 if yp: 

1896 result = result.yield_per(yp) 

1897 

1898 return result 

1899 

1900 def _setup_out_parameters(self, result): 

1901 compiled = cast(SQLCompiler, self.compiled) 

1902 

1903 out_bindparams = [ 

1904 (param, name) 

1905 for param, name in compiled.bind_names.items() 

1906 if param.isoutparam 

1907 ] 

1908 out_parameters = {} 

1909 

1910 for bindparam, raw_value in zip( 

1911 [param for param, name in out_bindparams], 

1912 self.get_out_parameter_values( 

1913 [name for param, name in out_bindparams] 

1914 ), 

1915 ): 

1916 type_ = bindparam.type 

1917 impl_type = type_.dialect_impl(self.dialect) 

1918 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi) 

1919 result_processor = impl_type.result_processor( 

1920 self.dialect, dbapi_type 

1921 ) 

1922 if result_processor is not None: 

1923 raw_value = result_processor(raw_value) 

1924 out_parameters[bindparam.key] = raw_value 

1925 

1926 result.out_parameters = out_parameters 

1927 

1928 def _setup_dml_or_text_result(self): 

1929 compiled = cast(SQLCompiler, self.compiled) 

1930 

1931 strategy: ResultFetchStrategy = self.cursor_fetch_strategy 

1932 

1933 if self.isinsert: 

1934 if ( 

1935 self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1936 and compiled.effective_returning 

1937 ): 

1938 strategy = _cursor.FullyBufferedCursorFetchStrategy( 

1939 self.cursor, 

1940 initial_buffer=self._insertmanyvalues_rows, 

1941 # maintain alt cursor description if set by the 

1942 # dialect, e.g. mssql preserves it 

1943 alternate_description=( 

1944 strategy.alternate_cursor_description 

1945 ), 

1946 ) 

1947 

1948 if compiled.postfetch_lastrowid: 

1949 self.inserted_primary_key_rows = ( 

1950 self._setup_ins_pk_from_lastrowid() 

1951 ) 

1952 # else if not self._is_implicit_returning, 

1953 # the default inserted_primary_key_rows accessor will 

1954 # return an "empty" primary key collection when accessed. 

1955 

1956 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: 

1957 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1958 self.cursor, self.execution_options 

1959 ) 

1960 

1961 if strategy is _cursor._NO_CURSOR_DML: 

1962 cursor_description = None 

1963 else: 

1964 cursor_description = ( 

1965 strategy.alternate_cursor_description 

1966 or self.cursor.description 

1967 ) 

1968 

1969 if cursor_description is None: 

1970 strategy = _cursor._NO_CURSOR_DML 

1971 elif self._num_sentinel_cols: 

1972 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1973 # the sentinel columns are handled in CursorResult._init_metadata 

1974 # using essentially _reduce 

1975 

1976 result: _cursor.CursorResult[Any] = _cursor.CursorResult( 

1977 self, strategy, cursor_description 

1978 ) 

1979 

1980 if self.isinsert: 

1981 if self._is_implicit_returning: 

1982 rows = result.all() 

1983 

1984 self.returned_default_rows = rows 

1985 

1986 self.inserted_primary_key_rows = ( 

1987 self._setup_ins_pk_from_implicit_returning(result, rows) 

1988 ) 

1989 

1990 # test that it has a cursor metadata that is accurate. the 

1991 # first row will have been fetched and current assumptions 

1992 # are that the result has only one row, until executemany() 

1993 # support is added here. 

1994 assert result._metadata.returns_rows 

1995 

1996 # Insert statement has both return_defaults() and 

1997 # returning(). rewind the result on the list of rows 

1998 # we just used. 

1999 if self._is_supplemental_returning: 

2000 result._rewind(rows) 

2001 else: 

2002 result._soft_close() 

2003 elif not self._is_explicit_returning: 

2004 result._soft_close() 

2005 

2006 # we assume here the result does not return any rows. 

2007 # *usually*, this will be true. However, some dialects 

2008 # such as that of MSSQL/pyodbc need to SELECT a post fetch 

2009 # function so this is not necessarily true. 

2010 # assert not result.returns_rows 

2011 

2012 elif self._is_implicit_returning: 

2013 rows = result.all() 

2014 

2015 if rows: 

2016 self.returned_default_rows = rows 

2017 self._rowcount = len(rows) 

2018 

2019 if self._is_supplemental_returning: 

2020 result._rewind(rows) 

2021 else: 

2022 result._soft_close() 

2023 

2024 # test that it has a cursor metadata that is accurate. 

2025 # the rows have all been fetched however. 

2026 assert result._metadata.returns_rows 

2027 

2028 elif not result._metadata.returns_rows: 

2029 # no results, get rowcount 

2030 # (which requires open cursor on some drivers) 

2031 if self._rowcount is None: 

2032 self._rowcount = self.cursor.rowcount 

2033 result._soft_close() 

2034 elif self.isupdate or self.isdelete: 

2035 if self._rowcount is None: 

2036 self._rowcount = self.cursor.rowcount 

2037 return result 

2038 

2039 @util.memoized_property 

2040 def inserted_primary_key_rows(self): 

2041 # if no specific "get primary key" strategy was set up 

2042 # during execution, return a "default" primary key based 

2043 # on what's in the compiled_parameters and nothing else. 

2044 return self._setup_ins_pk_from_empty() 

2045 

2046 def _setup_ins_pk_from_lastrowid(self): 

2047 getter = cast( 

2048 SQLCompiler, self.compiled 

2049 )._inserted_primary_key_from_lastrowid_getter 

2050 lastrowid = self.get_lastrowid() 

2051 return [getter(lastrowid, self.compiled_parameters[0])] 

2052 

2053 def _setup_ins_pk_from_empty(self): 

2054 getter = cast( 

2055 SQLCompiler, self.compiled 

2056 )._inserted_primary_key_from_lastrowid_getter 

2057 return [getter(None, param) for param in self.compiled_parameters] 

2058 

2059 def _setup_ins_pk_from_implicit_returning(self, result, rows): 

2060 if not rows: 

2061 return [] 

2062 

2063 getter = cast( 

2064 SQLCompiler, self.compiled 

2065 )._inserted_primary_key_from_returning_getter 

2066 compiled_params = self.compiled_parameters 

2067 

2068 return [ 

2069 getter(row, param) for row, param in zip(rows, compiled_params) 

2070 ] 

2071 

2072 def lastrow_has_defaults(self) -> bool: 

2073 return (self.isinsert or self.isupdate) and bool( 

2074 cast(SQLCompiler, self.compiled).postfetch 

2075 ) 

2076 

2077 def _prepare_set_input_sizes( 

2078 self, 

2079 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]: 

2080 """Given a cursor and ClauseParameters, prepare arguments 

2081 in order to call the appropriate 

2082 style of ``setinputsizes()`` on the cursor, using DB-API types 

2083 from the bind parameter's ``TypeEngine`` objects. 

2084 

2085 This method only called by those dialects which set the 

2086 :attr:`.Dialect.bind_typing` attribute to 

2087 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are 

2088 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an 

2089 option. 

2090 

2091 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used 

2092 for pg8000 and asyncpg, which has been changed to inline rendering 

2093 of casts. 

2094 

2095 """ 

2096 if self.isddl or self.is_text: 

2097 return None 

2098 

2099 compiled = cast(SQLCompiler, self.compiled) 

2100 

2101 inputsizes = compiled._get_set_input_sizes_lookup() 

2102 

2103 if inputsizes is None: 

2104 return None 

2105 

2106 dialect = self.dialect 

2107 

2108 # all of the rest of this... cython? 

2109 

2110 if dialect._has_events: 

2111 inputsizes = dict(inputsizes) 

2112 dialect.dispatch.do_setinputsizes( 

2113 inputsizes, self.cursor, self.statement, self.parameters, self 

2114 ) 

2115 

2116 if compiled.escaped_bind_names: 

2117 escaped_bind_names = compiled.escaped_bind_names 

2118 else: 

2119 escaped_bind_names = None 

2120 

2121 if dialect.positional: 

2122 items = [ 

2123 (key, compiled.binds[key]) 

2124 for key in compiled.positiontup or () 

2125 ] 

2126 else: 

2127 items = [ 

2128 (key, bindparam) 

2129 for bindparam, key in compiled.bind_names.items() 

2130 ] 

2131 

2132 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = [] 

2133 for key, bindparam in items: 

2134 if bindparam in compiled.literal_execute_params: 

2135 continue 

2136 

2137 if key in self._expanded_parameters: 

2138 if is_tuple_type(bindparam.type): 

2139 num = len(bindparam.type.types) 

2140 dbtypes = inputsizes[bindparam] 

2141 generic_inputsizes.extend( 

2142 ( 

2143 ( 

2144 escaped_bind_names.get(paramname, paramname) 

2145 if escaped_bind_names is not None 

2146 else paramname 

2147 ), 

2148 dbtypes[idx % num], 

2149 bindparam.type.types[idx % num], 

2150 ) 

2151 for idx, paramname in enumerate( 

2152 self._expanded_parameters[key] 

2153 ) 

2154 ) 

2155 else: 

2156 dbtype = inputsizes.get(bindparam, None) 

2157 generic_inputsizes.extend( 

2158 ( 

2159 ( 

2160 escaped_bind_names.get(paramname, paramname) 

2161 if escaped_bind_names is not None 

2162 else paramname 

2163 ), 

2164 dbtype, 

2165 bindparam.type, 

2166 ) 

2167 for paramname in self._expanded_parameters[key] 

2168 ) 

2169 else: 

2170 dbtype = inputsizes.get(bindparam, None) 

2171 

2172 escaped_name = ( 

2173 escaped_bind_names.get(key, key) 

2174 if escaped_bind_names is not None 

2175 else key 

2176 ) 

2177 

2178 generic_inputsizes.append( 

2179 (escaped_name, dbtype, bindparam.type) 

2180 ) 

2181 

2182 return generic_inputsizes 

2183 

2184 def _exec_default(self, column, default, type_): 

2185 if default.is_sequence: 

2186 return self.fire_sequence(default, type_) 

2187 elif default.is_callable: 

2188 # this codepath is not normally used as it's inlined 

2189 # into _process_execute_defaults 

2190 self.current_column = column 

2191 return default.arg(self) 

2192 elif default.is_clause_element: 

2193 return self._exec_default_clause_element(column, default, type_) 

2194 else: 

2195 # this codepath is not normally used as it's inlined 

2196 # into _process_execute_defaults 

2197 return default.arg 

2198 

2199 def _exec_default_clause_element(self, column, default, type_): 

2200 # execute a default that's a complete clause element. Here, we have 

2201 # to re-implement a miniature version of the compile->parameters-> 

2202 # cursor.execute() sequence, since we don't want to modify the state 

2203 # of the connection / result in progress or create new connection/ 

2204 # result objects etc. 

2205 # .. versionchanged:: 1.4 

2206 

2207 if not default._arg_is_typed: 

2208 default_arg = expression.type_coerce(default.arg, type_) 

2209 else: 

2210 default_arg = default.arg 

2211 compiled = expression.select(default_arg).compile(dialect=self.dialect) 

2212 compiled_params = compiled.construct_params() 

2213 processors = compiled._bind_processors 

2214 if compiled.positional: 

2215 parameters = self.dialect.execute_sequence_format( 

2216 [ 

2217 ( 

2218 processors[key](compiled_params[key]) # type: ignore 

2219 if key in processors 

2220 else compiled_params[key] 

2221 ) 

2222 for key in compiled.positiontup or () 

2223 ] 

2224 ) 

2225 else: 

2226 parameters = { 

2227 key: ( 

2228 processors[key](compiled_params[key]) # type: ignore 

2229 if key in processors 

2230 else compiled_params[key] 

2231 ) 

2232 for key in compiled_params 

2233 } 

2234 return self._execute_scalar( 

2235 str(compiled), type_, parameters=parameters 

2236 ) 

2237 

2238 current_parameters: Optional[_CoreSingleExecuteParams] = None 

2239 """A dictionary of parameters applied to the current row. 

2240 

2241 This attribute is only available in the context of a user-defined default 

2242 generation function, e.g. as described at :ref:`context_default_functions`. 

2243 It consists of a dictionary which includes entries for each column/value 

2244 pair that is to be part of the INSERT or UPDATE statement. The keys of the 

2245 dictionary will be the key value of each :class:`_schema.Column`, 

2246 which is usually 

2247 synonymous with the name. 

2248 

2249 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute 

2250 does not accommodate for the "multi-values" feature of the 

2251 :meth:`_expression.Insert.values` method. The 

2252 :meth:`.DefaultExecutionContext.get_current_parameters` method should be 

2253 preferred. 

2254 

2255 .. seealso:: 

2256 

2257 :meth:`.DefaultExecutionContext.get_current_parameters` 

2258 

2259 :ref:`context_default_functions` 

2260 

2261 """ 

2262 

2263 def get_current_parameters(self, isolate_multiinsert_groups=True): 

2264 """Return a dictionary of parameters applied to the current row. 

2265 

2266 This method can only be used in the context of a user-defined default 

2267 generation function, e.g. as described at 

2268 :ref:`context_default_functions`. When invoked, a dictionary is 

2269 returned which includes entries for each column/value pair that is part 

2270 of the INSERT or UPDATE statement. The keys of the dictionary will be 

2271 the key value of each :class:`_schema.Column`, 

2272 which is usually synonymous 

2273 with the name. 

2274 

2275 :param isolate_multiinsert_groups=True: indicates that multi-valued 

2276 INSERT constructs created using :meth:`_expression.Insert.values` 

2277 should be 

2278 handled by returning only the subset of parameters that are local 

2279 to the current column default invocation. When ``False``, the 

2280 raw parameters of the statement are returned including the 

2281 naming convention used in the case of multi-valued INSERT. 

2282 

2283 .. seealso:: 

2284 

2285 :attr:`.DefaultExecutionContext.current_parameters` 

2286 

2287 :ref:`context_default_functions` 

2288 

2289 """ 

2290 try: 

2291 parameters = self.current_parameters 

2292 column = self.current_column 

2293 except AttributeError: 

2294 raise exc.InvalidRequestError( 

2295 "get_current_parameters() can only be invoked in the " 

2296 "context of a Python side column default function" 

2297 ) 

2298 else: 

2299 assert column is not None 

2300 assert parameters is not None 

2301 compile_state = cast( 

2302 "DMLState", cast(SQLCompiler, self.compiled).compile_state 

2303 ) 

2304 assert compile_state is not None 

2305 if ( 

2306 isolate_multiinsert_groups 

2307 and dml.isinsert(compile_state) 

2308 and compile_state._has_multi_parameters 

2309 ): 

2310 if column._is_multiparam_column: 

2311 index = column.index + 1 

2312 d = {column.original.key: parameters[column.key]} 

2313 else: 

2314 d = {column.key: parameters[column.key]} 

2315 index = 0 

2316 assert compile_state._dict_parameters is not None 

2317 keys = compile_state._dict_parameters.keys() 

2318 d.update( 

2319 (key, parameters["%s_m%d" % (key, index)]) for key in keys 

2320 ) 

2321 return d 

2322 else: 

2323 return parameters 

2324 

2325 def get_insert_default(self, column): 

2326 if column.default is None: 

2327 return None 

2328 else: 

2329 return self._exec_default(column, column.default, column.type) 

2330 

2331 def get_update_default(self, column): 

2332 if column.onupdate is None: 

2333 return None 

2334 else: 

2335 return self._exec_default(column, column.onupdate, column.type) 

2336 

2337 def _process_execute_defaults(self): 

2338 compiled = cast(SQLCompiler, self.compiled) 

2339 

2340 key_getter = compiled._within_exec_param_key_getter 

2341 

2342 sentinel_counter = 0 

2343 

2344 if compiled.insert_prefetch: 

2345 prefetch_recs = [ 

2346 ( 

2347 c, 

2348 key_getter(c), 

2349 c._default_description_tuple, 

2350 self.get_insert_default, 

2351 ) 

2352 for c in compiled.insert_prefetch 

2353 ] 

2354 elif compiled.update_prefetch: 

2355 prefetch_recs = [ 

2356 ( 

2357 c, 

2358 key_getter(c), 

2359 c._onupdate_description_tuple, 

2360 self.get_update_default, 

2361 ) 

2362 for c in compiled.update_prefetch 

2363 ] 

2364 else: 

2365 prefetch_recs = [] 

2366 

2367 for param in self.compiled_parameters: 

2368 self.current_parameters = param 

2369 

2370 for ( 

2371 c, 

2372 param_key, 

2373 (arg, is_scalar, is_callable, is_sentinel), 

2374 fallback, 

2375 ) in prefetch_recs: 

2376 if is_sentinel: 

2377 param[param_key] = sentinel_counter 

2378 sentinel_counter += 1 

2379 elif is_scalar: 

2380 param[param_key] = arg 

2381 elif is_callable: 

2382 self.current_column = c 

2383 param[param_key] = arg(self) 

2384 else: 

2385 val = fallback(c) 

2386 if val is not None: 

2387 param[param_key] = val 

2388 

2389 del self.current_parameters 

2390 

2391 

2392DefaultDialect.execution_ctx_cls = DefaultExecutionContext