Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1043 statements  

1# engine/default.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Default implementations of per-dialect sqlalchemy.engine classes. 

10 

11These are semi-private implementation classes which are only of importance 

12to database dialect authors; dialects will usually use the classes here 

13as the base class for their own corresponding classes. 

14 

15""" 

16 

17from __future__ import annotations 

18 

19import functools 

20import operator 

21import random 

22import re 

23from time import perf_counter 

24import typing 

25from typing import Any 

26from typing import Callable 

27from typing import cast 

28from typing import Dict 

29from typing import List 

30from typing import Mapping 

31from typing import MutableMapping 

32from typing import MutableSequence 

33from typing import Optional 

34from typing import Sequence 

35from typing import Set 

36from typing import Tuple 

37from typing import Type 

38from typing import TYPE_CHECKING 

39from typing import Union 

40import weakref 

41 

42from . import characteristics 

43from . import cursor as _cursor 

44from . import interfaces 

45from .base import Connection 

46from .interfaces import CacheStats 

47from .interfaces import DBAPICursor 

48from .interfaces import Dialect 

49from .interfaces import ExecuteStyle 

50from .interfaces import ExecutionContext 

51from .reflection import ObjectKind 

52from .reflection import ObjectScope 

53from .. import event 

54from .. import exc 

55from .. import pool 

56from .. import util 

57from ..sql import compiler 

58from ..sql import dml 

59from ..sql import expression 

60from ..sql import type_api 

61from ..sql import util as sql_util 

62from ..sql._typing import is_tuple_type 

63from ..sql.base import _NoArg 

64from ..sql.compiler import DDLCompiler 

65from ..sql.compiler import InsertmanyvaluesSentinelOpts 

66from ..sql.compiler import SQLCompiler 

67from ..sql.elements import quoted_name 

68from ..util.typing import Final 

69from ..util.typing import Literal 

70 

71if typing.TYPE_CHECKING: 

72 from types import ModuleType 

73 

74 from .base import Engine 

75 from .cursor import ResultFetchStrategy 

76 from .interfaces import _CoreMultiExecuteParams 

77 from .interfaces import _CoreSingleExecuteParams 

78 from .interfaces import _DBAPICursorDescription 

79 from .interfaces import _DBAPIMultiExecuteParams 

80 from .interfaces import _DBAPISingleExecuteParams 

81 from .interfaces import _ExecuteOptions 

82 from .interfaces import _MutableCoreSingleExecuteParams 

83 from .interfaces import _ParamStyle 

84 from .interfaces import ConnectArgsType 

85 from .interfaces import DBAPIConnection 

86 from .interfaces import IsolationLevel 

87 from .row import Row 

88 from .url import URL 

89 from ..event import _ListenerFnType 

90 from ..pool import Pool 

91 from ..pool import PoolProxiedConnection 

92 from ..sql import Executable 

93 from ..sql.compiler import Compiled 

94 from ..sql.compiler import Linting 

95 from ..sql.compiler import ResultColumnsEntry 

96 from ..sql.dml import DMLState 

97 from ..sql.dml import UpdateBase 

98 from ..sql.elements import BindParameter 

99 from ..sql.schema import Column 

100 from ..sql.type_api import _BindProcessorType 

101 from ..sql.type_api import _ResultProcessorType 

102 from ..sql.type_api import TypeEngine 

103 

104 

105# When we're handed literal SQL, ensure it's a SELECT query 

106SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE) 

107 

108 

109( 

110 CACHE_HIT, 

111 CACHE_MISS, 

112 CACHING_DISABLED, 

113 NO_CACHE_KEY, 

114 NO_DIALECT_SUPPORT, 

115) = list(CacheStats) 

116 

117 

118class DefaultDialect(Dialect): 

119 """Default implementation of Dialect""" 

120 

121 statement_compiler = compiler.SQLCompiler 

122 ddl_compiler = compiler.DDLCompiler 

123 type_compiler_cls = compiler.GenericTypeCompiler 

124 

125 preparer = compiler.IdentifierPreparer 

126 supports_alter = True 

127 supports_comments = False 

128 supports_constraint_comments = False 

129 inline_comments = False 

130 supports_statement_cache = True 

131 

132 div_is_floordiv = True 

133 

134 bind_typing = interfaces.BindTyping.NONE 

135 

136 include_set_input_sizes: Optional[Set[Any]] = None 

137 exclude_set_input_sizes: Optional[Set[Any]] = None 

138 

139 # the first value we'd get for an autoincrement column. 

140 default_sequence_base = 1 

141 

142 # most DBAPIs happy with this for execute(). 

143 # not cx_oracle. 

144 execute_sequence_format = tuple 

145 

146 supports_schemas = True 

147 supports_views = True 

148 supports_sequences = False 

149 sequences_optional = False 

150 preexecute_autoincrement_sequences = False 

151 supports_identity_columns = False 

152 postfetch_lastrowid = True 

153 favor_returning_over_lastrowid = False 

154 insert_null_pk_still_autoincrements = False 

155 update_returning = False 

156 delete_returning = False 

157 update_returning_multifrom = False 

158 delete_returning_multifrom = False 

159 insert_returning = False 

160 

161 cte_follows_insert = False 

162 

163 supports_native_enum = False 

164 supports_native_boolean = False 

165 supports_native_uuid = False 

166 returns_native_bytes = False 

167 

168 non_native_boolean_check_constraint = True 

169 

170 supports_simple_order_by_label = True 

171 

172 tuple_in_values = False 

173 

174 connection_characteristics = util.immutabledict( 

175 { 

176 "isolation_level": characteristics.IsolationLevelCharacteristic(), 

177 "logging_token": characteristics.LoggingTokenCharacteristic(), 

178 } 

179 ) 

180 

181 engine_config_types: Mapping[str, Any] = util.immutabledict( 

182 { 

183 "pool_timeout": util.asint, 

184 "echo": util.bool_or_str("debug"), 

185 "echo_pool": util.bool_or_str("debug"), 

186 "pool_recycle": util.asint, 

187 "pool_size": util.asint, 

188 "max_overflow": util.asint, 

189 "future": util.asbool, 

190 } 

191 ) 

192 

193 # if the NUMERIC type 

194 # returns decimal.Decimal. 

195 # *not* the FLOAT type however. 

196 supports_native_decimal = False 

197 

198 name = "default" 

199 

200 # length at which to truncate 

201 # any identifier. 

202 max_identifier_length = 9999 

203 _user_defined_max_identifier_length: Optional[int] = None 

204 

205 isolation_level: Optional[str] = None 

206 

207 # sub-categories of max_identifier_length. 

208 # currently these accommodate for MySQL which allows alias names 

209 # of 255 but DDL names only of 64. 

210 max_index_name_length: Optional[int] = None 

211 max_constraint_name_length: Optional[int] = None 

212 

213 supports_sane_rowcount = True 

214 supports_sane_multi_rowcount = True 

215 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {} 

216 default_paramstyle = "named" 

217 

218 supports_default_values = False 

219 """dialect supports INSERT... DEFAULT VALUES syntax""" 

220 

221 supports_default_metavalue = False 

222 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

223 

224 default_metavalue_token = "DEFAULT" 

225 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

226 parenthesis.""" 

227 

228 # not sure if this is a real thing but the compiler will deliver it 

229 # if this is the only flag enabled. 

230 supports_empty_insert = True 

231 """dialect supports INSERT () VALUES ()""" 

232 

233 supports_multivalues_insert = False 

234 

235 use_insertmanyvalues: bool = False 

236 

237 use_insertmanyvalues_wo_returning: bool = False 

238 

239 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = ( 

240 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED 

241 ) 

242 

243 insertmanyvalues_page_size: int = 1000 

244 insertmanyvalues_max_parameters = 32700 

245 

246 supports_is_distinct_from = True 

247 

248 supports_server_side_cursors = False 

249 

250 server_side_cursors = False 

251 

252 # extra record-level locking features (#4860) 

253 supports_for_update_of = False 

254 

255 server_version_info = None 

256 

257 default_schema_name: Optional[str] = None 

258 

259 # indicates symbol names are 

260 # UPPERCASED if they are case insensitive 

261 # within the database. 

262 # if this is True, the methods normalize_name() 

263 # and denormalize_name() must be provided. 

264 requires_name_normalize = False 

265 

266 is_async = False 

267 

268 has_terminate = False 

269 

270 # TODO: this is not to be part of 2.0. implement rudimentary binary 

271 # literals for SQLite, PostgreSQL, MySQL only within 

272 # _Binary.literal_processor 

273 _legacy_binary_type_literal_encoding = "utf-8" 

274 

275 @util.deprecated_params( 

276 empty_in_strategy=( 

277 "1.4", 

278 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is " 

279 "deprecated, and no longer has any effect. All IN expressions " 

280 "are now rendered using " 

281 'the "expanding parameter" strategy which renders a set of bound' 

282 'expressions, or an "empty set" SELECT, at statement execution' 

283 "time.", 

284 ), 

285 server_side_cursors=( 

286 "1.4", 

287 "The :paramref:`_sa.create_engine.server_side_cursors` parameter " 

288 "is deprecated and will be removed in a future release. Please " 

289 "use the " 

290 ":paramref:`_engine.Connection.execution_options.stream_results` " 

291 "parameter.", 

292 ), 

293 ) 

294 def __init__( 

295 self, 

296 paramstyle: Optional[_ParamStyle] = None, 

297 isolation_level: Optional[IsolationLevel] = None, 

298 dbapi: Optional[ModuleType] = None, 

299 implicit_returning: Literal[True] = True, 

300 supports_native_boolean: Optional[bool] = None, 

301 max_identifier_length: Optional[int] = None, 

302 label_length: Optional[int] = None, 

303 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG, 

304 use_insertmanyvalues: Optional[bool] = None, 

305 # util.deprecated_params decorator cannot render the 

306 # Linting.NO_LINTING constant 

307 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore 

308 server_side_cursors: bool = False, 

309 **kwargs: Any, 

310 ): 

311 if server_side_cursors: 

312 if not self.supports_server_side_cursors: 

313 raise exc.ArgumentError( 

314 "Dialect %s does not support server side cursors" % self 

315 ) 

316 else: 

317 self.server_side_cursors = True 

318 

319 if getattr(self, "use_setinputsizes", False): 

320 util.warn_deprecated( 

321 "The dialect-level use_setinputsizes attribute is " 

322 "deprecated. Please use " 

323 "bind_typing = BindTyping.SETINPUTSIZES", 

324 "2.0", 

325 ) 

326 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES 

327 

328 self.positional = False 

329 self._ischema = None 

330 

331 self.dbapi = dbapi 

332 

333 if paramstyle is not None: 

334 self.paramstyle = paramstyle 

335 elif self.dbapi is not None: 

336 self.paramstyle = self.dbapi.paramstyle 

337 else: 

338 self.paramstyle = self.default_paramstyle 

339 self.positional = self.paramstyle in ( 

340 "qmark", 

341 "format", 

342 "numeric", 

343 "numeric_dollar", 

344 ) 

345 self.identifier_preparer = self.preparer(self) 

346 self._on_connect_isolation_level = isolation_level 

347 

348 legacy_tt_callable = getattr(self, "type_compiler", None) 

349 if legacy_tt_callable is not None: 

350 tt_callable = cast( 

351 Type[compiler.GenericTypeCompiler], 

352 self.type_compiler, 

353 ) 

354 else: 

355 tt_callable = self.type_compiler_cls 

356 

357 self.type_compiler_instance = self.type_compiler = tt_callable(self) 

358 

359 if supports_native_boolean is not None: 

360 self.supports_native_boolean = supports_native_boolean 

361 

362 self._user_defined_max_identifier_length = max_identifier_length 

363 if self._user_defined_max_identifier_length: 

364 self.max_identifier_length = ( 

365 self._user_defined_max_identifier_length 

366 ) 

367 self.label_length = label_length 

368 self.compiler_linting = compiler_linting 

369 

370 if use_insertmanyvalues is not None: 

371 self.use_insertmanyvalues = use_insertmanyvalues 

372 

373 if insertmanyvalues_page_size is not _NoArg.NO_ARG: 

374 self.insertmanyvalues_page_size = insertmanyvalues_page_size 

375 

376 @property 

377 @util.deprecated( 

378 "2.0", 

379 "full_returning is deprecated, please use insert_returning, " 

380 "update_returning, delete_returning", 

381 ) 

382 def full_returning(self): 

383 return ( 

384 self.insert_returning 

385 and self.update_returning 

386 and self.delete_returning 

387 ) 

388 

389 @util.memoized_property 

390 def insert_executemany_returning(self): 

391 """Default implementation for insert_executemany_returning, if not 

392 otherwise overridden by the specific dialect. 

393 

394 The default dialect determines "insert_executemany_returning" is 

395 available if the dialect in use has opted into using the 

396 "use_insertmanyvalues" feature. If they haven't opted into that, then 

397 this attribute is False, unless the dialect in question overrides this 

398 and provides some other implementation (such as the Oracle Database 

399 dialects). 

400 

401 """ 

402 return self.insert_returning and self.use_insertmanyvalues 

403 

404 @util.memoized_property 

405 def insert_executemany_returning_sort_by_parameter_order(self): 

406 """Default implementation for 

407 insert_executemany_returning_deterministic_order, if not otherwise 

408 overridden by the specific dialect. 

409 

410 The default dialect determines "insert_executemany_returning" can have 

411 deterministic order only if the dialect in use has opted into using the 

412 "use_insertmanyvalues" feature, which implements deterministic ordering 

413 using client side sentinel columns only by default. The 

414 "insertmanyvalues" feature also features alternate forms that can 

415 use server-generated PK values as "sentinels", but those are only 

416 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel` 

417 bitflag enables those alternate SQL forms, which are disabled 

418 by default. 

419 

420 If the dialect in use hasn't opted into that, then this attribute is 

421 False, unless the dialect in question overrides this and provides some 

422 other implementation (such as the Oracle Database dialects). 

423 

424 """ 

425 return self.insert_returning and self.use_insertmanyvalues 

426 

427 update_executemany_returning = False 

428 delete_executemany_returning = False 

429 

430 @util.memoized_property 

431 def loaded_dbapi(self) -> ModuleType: 

432 if self.dbapi is None: 

433 raise exc.InvalidRequestError( 

434 f"Dialect {self} does not have a Python DBAPI established " 

435 "and cannot be used for actual database interaction" 

436 ) 

437 return self.dbapi 

438 

439 @util.memoized_property 

440 def _bind_typing_render_casts(self): 

441 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS 

442 

443 def _ensure_has_table_connection(self, arg: Connection) -> None: 

444 if not isinstance(arg, Connection): 

445 raise exc.ArgumentError( 

446 "The argument passed to Dialect.has_table() should be a " 

447 "%s, got %s. " 

448 "Additionally, the Dialect.has_table() method is for " 

449 "internal dialect " 

450 "use only; please use " 

451 "``inspect(some_engine).has_table(<tablename>>)`` " 

452 "for public API use." % (Connection, type(arg)) 

453 ) 

454 

455 @util.memoized_property 

456 def _supports_statement_cache(self): 

457 ssc = self.__class__.__dict__.get("supports_statement_cache", None) 

458 if ssc is None: 

459 util.warn( 

460 "Dialect %s:%s will not make use of SQL compilation caching " 

461 "as it does not set the 'supports_statement_cache' attribute " 

462 "to ``True``. This can have " 

463 "significant performance implications including some " 

464 "performance degradations in comparison to prior SQLAlchemy " 

465 "versions. Dialect maintainers should seek to set this " 

466 "attribute to True after appropriate development and testing " 

467 "for SQLAlchemy 1.4 caching support. Alternatively, this " 

468 "attribute may be set to False which will disable this " 

469 "warning." % (self.name, self.driver), 

470 code="cprf", 

471 ) 

472 

473 return bool(ssc) 

474 

475 @util.memoized_property 

476 def _type_memos(self): 

477 return weakref.WeakKeyDictionary() 

478 

479 @property 

480 def dialect_description(self): 

481 return self.name + "+" + self.driver 

482 

483 @property 

484 def supports_sane_rowcount_returning(self): 

485 """True if this dialect supports sane rowcount even if RETURNING is 

486 in use. 

487 

488 For dialects that don't support RETURNING, this is synonymous with 

489 ``supports_sane_rowcount``. 

490 

491 """ 

492 return self.supports_sane_rowcount 

493 

494 @classmethod 

495 def get_pool_class(cls, url: URL) -> Type[Pool]: 

496 return getattr(cls, "poolclass", pool.QueuePool) 

497 

498 def get_dialect_pool_class(self, url: URL) -> Type[Pool]: 

499 return self.get_pool_class(url) 

500 

501 @classmethod 

502 def load_provisioning(cls): 

503 package = ".".join(cls.__module__.split(".")[0:-1]) 

504 try: 

505 __import__(package + ".provision") 

506 except ImportError: 

507 pass 

508 

509 def _builtin_onconnect(self) -> Optional[_ListenerFnType]: 

510 if self._on_connect_isolation_level is not None: 

511 

512 def builtin_connect(dbapi_conn, conn_rec): 

513 self._assert_and_set_isolation_level( 

514 dbapi_conn, self._on_connect_isolation_level 

515 ) 

516 

517 return builtin_connect 

518 else: 

519 return None 

520 

521 def initialize(self, connection: Connection) -> None: 

522 try: 

523 self.server_version_info = self._get_server_version_info( 

524 connection 

525 ) 

526 except NotImplementedError: 

527 self.server_version_info = None 

528 try: 

529 self.default_schema_name = self._get_default_schema_name( 

530 connection 

531 ) 

532 except NotImplementedError: 

533 self.default_schema_name = None 

534 

535 try: 

536 self.default_isolation_level = self.get_default_isolation_level( 

537 connection.connection.dbapi_connection 

538 ) 

539 except NotImplementedError: 

540 self.default_isolation_level = None 

541 

542 if not self._user_defined_max_identifier_length: 

543 max_ident_length = self._check_max_identifier_length(connection) 

544 if max_ident_length: 

545 self.max_identifier_length = max_ident_length 

546 

547 if ( 

548 self.label_length 

549 and self.label_length > self.max_identifier_length 

550 ): 

551 raise exc.ArgumentError( 

552 "Label length of %d is greater than this dialect's" 

553 " maximum identifier length of %d" 

554 % (self.label_length, self.max_identifier_length) 

555 ) 

556 

557 def on_connect(self) -> Optional[Callable[[Any], Any]]: 

558 # inherits the docstring from interfaces.Dialect.on_connect 

559 return None 

560 

561 def _check_max_identifier_length(self, connection): 

562 """Perform a connection / server version specific check to determine 

563 the max_identifier_length. 

564 

565 If the dialect's class level max_identifier_length should be used, 

566 can return None. 

567 

568 .. versionadded:: 1.3.9 

569 

570 """ 

571 return None 

572 

573 def get_default_isolation_level(self, dbapi_conn): 

574 """Given a DBAPI connection, return its isolation level, or 

575 a default isolation level if one cannot be retrieved. 

576 

577 May be overridden by subclasses in order to provide a 

578 "fallback" isolation level for databases that cannot reliably 

579 retrieve the actual isolation level. 

580 

581 By default, calls the :meth:`_engine.Interfaces.get_isolation_level` 

582 method, propagating any exceptions raised. 

583 

584 .. versionadded:: 1.3.22 

585 

586 """ 

587 return self.get_isolation_level(dbapi_conn) 

588 

589 def type_descriptor(self, typeobj): 

590 """Provide a database-specific :class:`.TypeEngine` object, given 

591 the generic object which comes from the types module. 

592 

593 This method looks for a dictionary called 

594 ``colspecs`` as a class or instance-level variable, 

595 and passes on to :func:`_types.adapt_type`. 

596 

597 """ 

598 return type_api.adapt_type(typeobj, self.colspecs) 

599 

600 def has_index(self, connection, table_name, index_name, schema=None, **kw): 

601 if not self.has_table(connection, table_name, schema=schema, **kw): 

602 return False 

603 for idx in self.get_indexes( 

604 connection, table_name, schema=schema, **kw 

605 ): 

606 if idx["name"] == index_name: 

607 return True 

608 else: 

609 return False 

610 

611 def has_schema( 

612 self, connection: Connection, schema_name: str, **kw: Any 

613 ) -> bool: 

614 return schema_name in self.get_schema_names(connection, **kw) 

615 

616 def validate_identifier(self, ident: str) -> None: 

617 if len(ident) > self.max_identifier_length: 

618 raise exc.IdentifierError( 

619 "Identifier '%s' exceeds maximum length of %d characters" 

620 % (ident, self.max_identifier_length) 

621 ) 

622 

623 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection: 

624 # inherits the docstring from interfaces.Dialect.connect 

625 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501 

626 

627 def create_connect_args(self, url: URL) -> ConnectArgsType: 

628 # inherits the docstring from interfaces.Dialect.create_connect_args 

629 opts = url.translate_connect_args() 

630 opts.update(url.query) 

631 return ([], opts) 

632 

633 def set_engine_execution_options( 

634 self, engine: Engine, opts: Mapping[str, Any] 

635 ) -> None: 

636 supported_names = set(self.connection_characteristics).intersection( 

637 opts 

638 ) 

639 if supported_names: 

640 characteristics: Mapping[str, Any] = util.immutabledict( 

641 (name, opts[name]) for name in supported_names 

642 ) 

643 

644 @event.listens_for(engine, "engine_connect") 

645 def set_connection_characteristics(connection): 

646 self._set_connection_characteristics( 

647 connection, characteristics 

648 ) 

649 

650 def set_connection_execution_options( 

651 self, connection: Connection, opts: Mapping[str, Any] 

652 ) -> None: 

653 supported_names = set(self.connection_characteristics).intersection( 

654 opts 

655 ) 

656 if supported_names: 

657 characteristics: Mapping[str, Any] = util.immutabledict( 

658 (name, opts[name]) for name in supported_names 

659 ) 

660 self._set_connection_characteristics(connection, characteristics) 

661 

662 def _set_connection_characteristics(self, connection, characteristics): 

663 characteristic_values = [ 

664 (name, self.connection_characteristics[name], value) 

665 for name, value in characteristics.items() 

666 ] 

667 

668 if connection.in_transaction(): 

669 trans_objs = [ 

670 (name, obj) 

671 for name, obj, _ in characteristic_values 

672 if obj.transactional 

673 ] 

674 if trans_objs: 

675 raise exc.InvalidRequestError( 

676 "This connection has already initialized a SQLAlchemy " 

677 "Transaction() object via begin() or autobegin; " 

678 "%s may not be altered unless rollback() or commit() " 

679 "is called first." 

680 % (", ".join(name for name, obj in trans_objs)) 

681 ) 

682 

683 dbapi_connection = connection.connection.dbapi_connection 

684 for _, characteristic, value in characteristic_values: 

685 characteristic.set_connection_characteristic( 

686 self, connection, dbapi_connection, value 

687 ) 

688 connection.connection._connection_record.finalize_callback.append( 

689 functools.partial(self._reset_characteristics, characteristics) 

690 ) 

691 

692 def _reset_characteristics(self, characteristics, dbapi_connection): 

693 for characteristic_name in characteristics: 

694 characteristic = self.connection_characteristics[ 

695 characteristic_name 

696 ] 

697 characteristic.reset_characteristic(self, dbapi_connection) 

698 

699 def do_begin(self, dbapi_connection): 

700 pass 

701 

702 def do_rollback(self, dbapi_connection): 

703 dbapi_connection.rollback() 

704 

705 def do_commit(self, dbapi_connection): 

706 dbapi_connection.commit() 

707 

708 def do_terminate(self, dbapi_connection): 

709 self.do_close(dbapi_connection) 

710 

711 def do_close(self, dbapi_connection): 

712 dbapi_connection.close() 

713 

714 @util.memoized_property 

715 def _dialect_specific_select_one(self): 

716 return str(expression.select(1).compile(dialect=self)) 

717 

718 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: 

719 try: 

720 return self.do_ping(dbapi_connection) 

721 except self.loaded_dbapi.Error as err: 

722 is_disconnect = self.is_disconnect(err, dbapi_connection, None) 

723 

724 if self._has_events: 

725 try: 

726 Connection._handle_dbapi_exception_noconnection( 

727 err, 

728 self, 

729 is_disconnect=is_disconnect, 

730 invalidate_pool_on_disconnect=False, 

731 is_pre_ping=True, 

732 ) 

733 except exc.StatementError as new_err: 

734 is_disconnect = new_err.connection_invalidated 

735 

736 if is_disconnect: 

737 return False 

738 else: 

739 raise 

740 

741 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool: 

742 cursor = dbapi_connection.cursor() 

743 try: 

744 cursor.execute(self._dialect_specific_select_one) 

745 finally: 

746 cursor.close() 

747 return True 

748 

749 def create_xid(self): 

750 """Create a random two-phase transaction ID. 

751 

752 This id will be passed to do_begin_twophase(), do_rollback_twophase(), 

753 do_commit_twophase(). Its format is unspecified. 

754 """ 

755 

756 return "_sa_%032x" % random.randint(0, 2**128) 

757 

758 def do_savepoint(self, connection, name): 

759 connection.execute(expression.SavepointClause(name)) 

760 

761 def do_rollback_to_savepoint(self, connection, name): 

762 connection.execute(expression.RollbackToSavepointClause(name)) 

763 

764 def do_release_savepoint(self, connection, name): 

765 connection.execute(expression.ReleaseSavepointClause(name)) 

766 

767 def _deliver_insertmanyvalues_batches( 

768 self, 

769 connection, 

770 cursor, 

771 statement, 

772 parameters, 

773 generic_setinputsizes, 

774 context, 

775 ): 

776 context = cast(DefaultExecutionContext, context) 

777 compiled = cast(SQLCompiler, context.compiled) 

778 

779 _composite_sentinel_proc: Sequence[ 

780 Optional[_ResultProcessorType[Any]] 

781 ] = () 

782 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None 

783 _sentinel_proc_initialized: bool = False 

784 

785 compiled_parameters = context.compiled_parameters 

786 

787 imv = compiled._insertmanyvalues 

788 assert imv is not None 

789 

790 is_returning: Final[bool] = bool(compiled.effective_returning) 

791 batch_size = context.execution_options.get( 

792 "insertmanyvalues_page_size", self.insertmanyvalues_page_size 

793 ) 

794 

795 if compiled.schema_translate_map: 

796 schema_translate_map = context.execution_options.get( 

797 "schema_translate_map", {} 

798 ) 

799 else: 

800 schema_translate_map = None 

801 

802 if is_returning: 

803 result: Optional[List[Any]] = [] 

804 context._insertmanyvalues_rows = result 

805 

806 sort_by_parameter_order = imv.sort_by_parameter_order 

807 

808 else: 

809 sort_by_parameter_order = False 

810 result = None 

811 

812 for imv_batch in compiled._deliver_insertmanyvalues_batches( 

813 statement, 

814 parameters, 

815 compiled_parameters, 

816 generic_setinputsizes, 

817 batch_size, 

818 sort_by_parameter_order, 

819 schema_translate_map, 

820 ): 

821 yield imv_batch 

822 

823 if is_returning: 

824 

825 try: 

826 rows = context.fetchall_for_returning(cursor) 

827 except BaseException as be: 

828 connection._handle_dbapi_exception( 

829 be, 

830 sql_util._long_statement(imv_batch.replaced_statement), 

831 imv_batch.replaced_parameters, 

832 None, 

833 context, 

834 is_sub_exec=True, 

835 ) 

836 

837 # I would have thought "is_returning: Final[bool]" 

838 # would have assured this but pylance thinks not 

839 assert result is not None 

840 

841 if imv.num_sentinel_columns and not imv_batch.is_downgraded: 

842 composite_sentinel = imv.num_sentinel_columns > 1 

843 if imv.implicit_sentinel: 

844 # for implicit sentinel, which is currently single-col 

845 # integer autoincrement, do a simple sort. 

846 assert not composite_sentinel 

847 result.extend( 

848 sorted(rows, key=operator.itemgetter(-1)) 

849 ) 

850 continue 

851 

852 # otherwise, create dictionaries to match up batches 

853 # with parameters 

854 assert imv.sentinel_param_keys 

855 assert imv.sentinel_columns 

856 

857 _nsc = imv.num_sentinel_columns 

858 

859 if not _sentinel_proc_initialized: 

860 if composite_sentinel: 

861 _composite_sentinel_proc = [ 

862 col.type._cached_result_processor( 

863 self, cursor_desc[1] 

864 ) 

865 for col, cursor_desc in zip( 

866 imv.sentinel_columns, 

867 cursor.description[-_nsc:], 

868 ) 

869 ] 

870 else: 

871 _scalar_sentinel_proc = ( 

872 imv.sentinel_columns[0] 

873 ).type._cached_result_processor( 

874 self, cursor.description[-1][1] 

875 ) 

876 _sentinel_proc_initialized = True 

877 

878 rows_by_sentinel: Union[ 

879 Dict[Tuple[Any, ...], Any], 

880 Dict[Any, Any], 

881 ] 

882 if composite_sentinel: 

883 rows_by_sentinel = { 

884 tuple( 

885 (proc(val) if proc else val) 

886 for val, proc in zip( 

887 row[-_nsc:], _composite_sentinel_proc 

888 ) 

889 ): row 

890 for row in rows 

891 } 

892 elif _scalar_sentinel_proc: 

893 rows_by_sentinel = { 

894 _scalar_sentinel_proc(row[-1]): row for row in rows 

895 } 

896 else: 

897 rows_by_sentinel = {row[-1]: row for row in rows} 

898 

899 if len(rows_by_sentinel) != len(imv_batch.batch): 

900 # see test_insert_exec.py:: 

901 # IMVSentinelTest::test_sentinel_incorrect_rowcount 

902 # for coverage / demonstration 

903 raise exc.InvalidRequestError( 

904 f"Sentinel-keyed result set did not produce " 

905 f"correct number of rows {len(imv_batch.batch)}; " 

906 "produced " 

907 f"{len(rows_by_sentinel)}. Please ensure the " 

908 "sentinel column is fully unique and populated in " 

909 "all cases." 

910 ) 

911 

912 try: 

913 ordered_rows = [ 

914 rows_by_sentinel[sentinel_keys] 

915 for sentinel_keys in imv_batch.sentinel_values 

916 ] 

917 except KeyError as ke: 

918 # see test_insert_exec.py:: 

919 # IMVSentinelTest::test_sentinel_cant_match_keys 

920 # for coverage / demonstration 

921 raise exc.InvalidRequestError( 

922 f"Can't match sentinel values in result set to " 

923 f"parameter sets; key {ke.args[0]!r} was not " 

924 "found. " 

925 "There may be a mismatch between the datatype " 

926 "passed to the DBAPI driver vs. that which it " 

927 "returns in a result row. Ensure the given " 

928 "Python value matches the expected result type " 

929 "*exactly*, taking care to not rely upon implicit " 

930 "conversions which may occur such as when using " 

931 "strings in place of UUID or integer values, etc. " 

932 ) from ke 

933 

934 result.extend(ordered_rows) 

935 

936 else: 

937 result.extend(rows) 

938 

939 def do_executemany(self, cursor, statement, parameters, context=None): 

940 cursor.executemany(statement, parameters) 

941 

942 def do_execute(self, cursor, statement, parameters, context=None): 

943 cursor.execute(statement, parameters) 

944 

945 def do_execute_no_params(self, cursor, statement, context=None): 

946 cursor.execute(statement) 

947 

948 def is_disconnect( 

949 self, 

950 e: Exception, 

951 connection: Union[ 

952 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None 

953 ], 

954 cursor: Optional[interfaces.DBAPICursor], 

955 ) -> bool: 

956 return False 

957 

958 @util.memoized_instancemethod 

959 def _gen_allowed_isolation_levels(self, dbapi_conn): 

960 try: 

961 raw_levels = list(self.get_isolation_level_values(dbapi_conn)) 

962 except NotImplementedError: 

963 return None 

964 else: 

965 normalized_levels = [ 

966 level.replace("_", " ").upper() for level in raw_levels 

967 ] 

968 if raw_levels != normalized_levels: 

969 raise ValueError( 

970 f"Dialect {self.name!r} get_isolation_level_values() " 

971 f"method should return names as UPPERCASE using spaces, " 

972 f"not underscores; got " 

973 f"{sorted(set(raw_levels).difference(normalized_levels))}" 

974 ) 

975 return tuple(normalized_levels) 

976 

977 def _assert_and_set_isolation_level(self, dbapi_conn, level): 

978 level = level.replace("_", " ").upper() 

979 

980 _allowed_isolation_levels = self._gen_allowed_isolation_levels( 

981 dbapi_conn 

982 ) 

983 if ( 

984 _allowed_isolation_levels 

985 and level not in _allowed_isolation_levels 

986 ): 

987 raise exc.ArgumentError( 

988 f"Invalid value {level!r} for isolation_level. " 

989 f"Valid isolation levels for {self.name!r} are " 

990 f"{', '.join(_allowed_isolation_levels)}" 

991 ) 

992 

993 self.set_isolation_level(dbapi_conn, level) 

994 

995 def reset_isolation_level(self, dbapi_conn): 

996 if self._on_connect_isolation_level is not None: 

997 assert ( 

998 self._on_connect_isolation_level == "AUTOCOMMIT" 

999 or self._on_connect_isolation_level 

1000 == self.default_isolation_level 

1001 ) 

1002 self._assert_and_set_isolation_level( 

1003 dbapi_conn, self._on_connect_isolation_level 

1004 ) 

1005 else: 

1006 assert self.default_isolation_level is not None 

1007 self._assert_and_set_isolation_level( 

1008 dbapi_conn, 

1009 self.default_isolation_level, 

1010 ) 

1011 

1012 def normalize_name(self, name): 

1013 if name is None: 

1014 return None 

1015 

1016 name_lower = name.lower() 

1017 name_upper = name.upper() 

1018 

1019 if name_upper == name_lower: 

1020 # name has no upper/lower conversion, e.g. non-european characters. 

1021 # return unchanged 

1022 return name 

1023 elif name_upper == name and not ( 

1024 self.identifier_preparer._requires_quotes 

1025 )(name_lower): 

1026 # name is all uppercase and doesn't require quoting; normalize 

1027 # to all lower case 

1028 return name_lower 

1029 elif name_lower == name: 

1030 # name is all lower case, which if denormalized means we need to 

1031 # force quoting on it 

1032 return quoted_name(name, quote=True) 

1033 else: 

1034 # name is mixed case, means it will be quoted in SQL when used 

1035 # later, no normalizes 

1036 return name 

1037 

1038 def denormalize_name(self, name): 

1039 if name is None: 

1040 return None 

1041 

1042 name_lower = name.lower() 

1043 name_upper = name.upper() 

1044 

1045 if name_upper == name_lower: 

1046 # name has no upper/lower conversion, e.g. non-european characters. 

1047 # return unchanged 

1048 return name 

1049 elif name_lower == name and not ( 

1050 self.identifier_preparer._requires_quotes 

1051 )(name_lower): 

1052 name = name_upper 

1053 return name 

1054 

1055 def get_driver_connection(self, connection): 

1056 return connection 

1057 

1058 def _overrides_default(self, method): 

1059 return ( 

1060 getattr(type(self), method).__code__ 

1061 is not getattr(DefaultDialect, method).__code__ 

1062 ) 

1063 

1064 def _default_multi_reflect( 

1065 self, 

1066 single_tbl_method, 

1067 connection, 

1068 kind, 

1069 schema, 

1070 filter_names, 

1071 scope, 

1072 **kw, 

1073 ): 

1074 names_fns = [] 

1075 temp_names_fns = [] 

1076 if ObjectKind.TABLE in kind: 

1077 names_fns.append(self.get_table_names) 

1078 temp_names_fns.append(self.get_temp_table_names) 

1079 if ObjectKind.VIEW in kind: 

1080 names_fns.append(self.get_view_names) 

1081 temp_names_fns.append(self.get_temp_view_names) 

1082 if ObjectKind.MATERIALIZED_VIEW in kind: 

1083 names_fns.append(self.get_materialized_view_names) 

1084 # no temp materialized view at the moment 

1085 # temp_names_fns.append(self.get_temp_materialized_view_names) 

1086 

1087 unreflectable = kw.pop("unreflectable", {}) 

1088 

1089 if ( 

1090 filter_names 

1091 and scope is ObjectScope.ANY 

1092 and kind is ObjectKind.ANY 

1093 ): 

1094 # if names are given and no qualification on type of table 

1095 # (i.e. the Table(..., autoload) case), take the names as given, 

1096 # don't run names queries. If a table does not exit 

1097 # NoSuchTableError is raised and it's skipped 

1098 

1099 # this also suits the case for mssql where we can reflect 

1100 # individual temp tables but there's no temp_names_fn 

1101 names = filter_names 

1102 else: 

1103 names = [] 

1104 name_kw = {"schema": schema, **kw} 

1105 fns = [] 

1106 if ObjectScope.DEFAULT in scope: 

1107 fns.extend(names_fns) 

1108 if ObjectScope.TEMPORARY in scope: 

1109 fns.extend(temp_names_fns) 

1110 

1111 for fn in fns: 

1112 try: 

1113 names.extend(fn(connection, **name_kw)) 

1114 except NotImplementedError: 

1115 pass 

1116 

1117 if filter_names: 

1118 filter_names = set(filter_names) 

1119 

1120 # iterate over all the tables/views and call the single table method 

1121 for table in names: 

1122 if not filter_names or table in filter_names: 

1123 key = (schema, table) 

1124 try: 

1125 yield ( 

1126 key, 

1127 single_tbl_method( 

1128 connection, table, schema=schema, **kw 

1129 ), 

1130 ) 

1131 except exc.UnreflectableTableError as err: 

1132 if key not in unreflectable: 

1133 unreflectable[key] = err 

1134 except exc.NoSuchTableError: 

1135 pass 

1136 

1137 def get_multi_table_options(self, connection, **kw): 

1138 return self._default_multi_reflect( 

1139 self.get_table_options, connection, **kw 

1140 ) 

1141 

1142 def get_multi_columns(self, connection, **kw): 

1143 return self._default_multi_reflect(self.get_columns, connection, **kw) 

1144 

1145 def get_multi_pk_constraint(self, connection, **kw): 

1146 return self._default_multi_reflect( 

1147 self.get_pk_constraint, connection, **kw 

1148 ) 

1149 

1150 def get_multi_foreign_keys(self, connection, **kw): 

1151 return self._default_multi_reflect( 

1152 self.get_foreign_keys, connection, **kw 

1153 ) 

1154 

1155 def get_multi_indexes(self, connection, **kw): 

1156 return self._default_multi_reflect(self.get_indexes, connection, **kw) 

1157 

1158 def get_multi_unique_constraints(self, connection, **kw): 

1159 return self._default_multi_reflect( 

1160 self.get_unique_constraints, connection, **kw 

1161 ) 

1162 

1163 def get_multi_check_constraints(self, connection, **kw): 

1164 return self._default_multi_reflect( 

1165 self.get_check_constraints, connection, **kw 

1166 ) 

1167 

1168 def get_multi_table_comment(self, connection, **kw): 

1169 return self._default_multi_reflect( 

1170 self.get_table_comment, connection, **kw 

1171 ) 

1172 

1173 

1174class StrCompileDialect(DefaultDialect): 

1175 statement_compiler = compiler.StrSQLCompiler 

1176 ddl_compiler = compiler.DDLCompiler 

1177 type_compiler_cls = compiler.StrSQLTypeCompiler 

1178 preparer = compiler.IdentifierPreparer 

1179 

1180 insert_returning = True 

1181 update_returning = True 

1182 delete_returning = True 

1183 

1184 supports_statement_cache = True 

1185 

1186 supports_identity_columns = True 

1187 

1188 supports_sequences = True 

1189 sequences_optional = True 

1190 preexecute_autoincrement_sequences = False 

1191 

1192 supports_native_boolean = True 

1193 

1194 supports_multivalues_insert = True 

1195 supports_simple_order_by_label = True 

1196 

1197 

1198class DefaultExecutionContext(ExecutionContext): 

1199 isinsert = False 

1200 isupdate = False 

1201 isdelete = False 

1202 is_crud = False 

1203 is_text = False 

1204 isddl = False 

1205 

1206 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE 

1207 

1208 compiled: Optional[Compiled] = None 

1209 result_column_struct: Optional[ 

1210 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool] 

1211 ] = None 

1212 returned_default_rows: Optional[Sequence[Row[Any]]] = None 

1213 

1214 execution_options: _ExecuteOptions = util.EMPTY_DICT 

1215 

1216 cursor_fetch_strategy = _cursor._DEFAULT_FETCH 

1217 

1218 invoked_statement: Optional[Executable] = None 

1219 

1220 _is_implicit_returning = False 

1221 _is_explicit_returning = False 

1222 _is_supplemental_returning = False 

1223 _is_server_side = False 

1224 

1225 _soft_closed = False 

1226 

1227 _rowcount: Optional[int] = None 

1228 

1229 # a hook for SQLite's translation of 

1230 # result column names 

1231 # NOTE: pyhive is using this hook, can't remove it :( 

1232 _translate_colname: Optional[Callable[[str], str]] = None 

1233 

1234 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict() 

1235 """used by set_input_sizes(). 

1236 

1237 This collection comes from ``ExpandedState.parameter_expansion``. 

1238 

1239 """ 

1240 

1241 cache_hit = NO_CACHE_KEY 

1242 

1243 root_connection: Connection 

1244 _dbapi_connection: PoolProxiedConnection 

1245 dialect: Dialect 

1246 unicode_statement: str 

1247 cursor: DBAPICursor 

1248 compiled_parameters: List[_MutableCoreSingleExecuteParams] 

1249 parameters: _DBAPIMultiExecuteParams 

1250 extracted_parameters: Optional[Sequence[BindParameter[Any]]] 

1251 

1252 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) 

1253 

1254 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None 

1255 _num_sentinel_cols: int = 0 

1256 

1257 @classmethod 

1258 def _init_ddl( 

1259 cls, 

1260 dialect: Dialect, 

1261 connection: Connection, 

1262 dbapi_connection: PoolProxiedConnection, 

1263 execution_options: _ExecuteOptions, 

1264 compiled_ddl: DDLCompiler, 

1265 ) -> ExecutionContext: 

1266 """Initialize execution context for an ExecutableDDLElement 

1267 construct.""" 

1268 

1269 self = cls.__new__(cls) 

1270 self.root_connection = connection 

1271 self._dbapi_connection = dbapi_connection 

1272 self.dialect = connection.dialect 

1273 

1274 self.compiled = compiled = compiled_ddl 

1275 self.isddl = True 

1276 

1277 self.execution_options = execution_options 

1278 

1279 self.unicode_statement = str(compiled) 

1280 if compiled.schema_translate_map: 

1281 schema_translate_map = self.execution_options.get( 

1282 "schema_translate_map", {} 

1283 ) 

1284 

1285 rst = compiled.preparer._render_schema_translates 

1286 self.unicode_statement = rst( 

1287 self.unicode_statement, schema_translate_map 

1288 ) 

1289 

1290 self.statement = self.unicode_statement 

1291 

1292 self.cursor = self.create_cursor() 

1293 self.compiled_parameters = [] 

1294 

1295 if dialect.positional: 

1296 self.parameters = [dialect.execute_sequence_format()] 

1297 else: 

1298 self.parameters = [self._empty_dict_params] 

1299 

1300 return self 

1301 

1302 @classmethod 

1303 def _init_compiled( 

1304 cls, 

1305 dialect: Dialect, 

1306 connection: Connection, 

1307 dbapi_connection: PoolProxiedConnection, 

1308 execution_options: _ExecuteOptions, 

1309 compiled: SQLCompiler, 

1310 parameters: _CoreMultiExecuteParams, 

1311 invoked_statement: Executable, 

1312 extracted_parameters: Optional[Sequence[BindParameter[Any]]], 

1313 cache_hit: CacheStats = CacheStats.CACHING_DISABLED, 

1314 ) -> ExecutionContext: 

1315 """Initialize execution context for a Compiled construct.""" 

1316 

1317 self = cls.__new__(cls) 

1318 self.root_connection = connection 

1319 self._dbapi_connection = dbapi_connection 

1320 self.dialect = connection.dialect 

1321 self.extracted_parameters = extracted_parameters 

1322 self.invoked_statement = invoked_statement 

1323 self.compiled = compiled 

1324 self.cache_hit = cache_hit 

1325 

1326 self.execution_options = execution_options 

1327 

1328 self.result_column_struct = ( 

1329 compiled._result_columns, 

1330 compiled._ordered_columns, 

1331 compiled._textual_ordered_columns, 

1332 compiled._ad_hoc_textual, 

1333 compiled._loose_column_name_matching, 

1334 ) 

1335 

1336 self.isinsert = ii = compiled.isinsert 

1337 self.isupdate = iu = compiled.isupdate 

1338 self.isdelete = id_ = compiled.isdelete 

1339 self.is_text = compiled.isplaintext 

1340 

1341 if ii or iu or id_: 

1342 dml_statement = compiled.compile_state.statement # type: ignore 

1343 if TYPE_CHECKING: 

1344 assert isinstance(dml_statement, UpdateBase) 

1345 self.is_crud = True 

1346 self._is_explicit_returning = ier = bool(dml_statement._returning) 

1347 self._is_implicit_returning = iir = bool( 

1348 compiled.implicit_returning 

1349 ) 

1350 if iir and dml_statement._supplemental_returning: 

1351 self._is_supplemental_returning = True 

1352 

1353 # dont mix implicit and explicit returning 

1354 assert not (iir and ier) 

1355 

1356 if (ier or iir) and compiled.for_executemany: 

1357 if ii and not self.dialect.insert_executemany_returning: 

1358 raise exc.InvalidRequestError( 

1359 f"Dialect {self.dialect.dialect_description} with " 

1360 f"current server capabilities does not support " 

1361 "INSERT..RETURNING when executemany is used" 

1362 ) 

1363 elif ( 

1364 ii 

1365 and dml_statement._sort_by_parameter_order 

1366 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501 

1367 ): 

1368 raise exc.InvalidRequestError( 

1369 f"Dialect {self.dialect.dialect_description} with " 

1370 f"current server capabilities does not support " 

1371 "INSERT..RETURNING with deterministic row ordering " 

1372 "when executemany is used" 

1373 ) 

1374 elif ( 

1375 ii 

1376 and self.dialect.use_insertmanyvalues 

1377 and not compiled._insertmanyvalues 

1378 ): 

1379 raise exc.InvalidRequestError( 

1380 'Statement does not have "insertmanyvalues" ' 

1381 "enabled, can't use INSERT..RETURNING with " 

1382 "executemany in this case." 

1383 ) 

1384 elif iu and not self.dialect.update_executemany_returning: 

1385 raise exc.InvalidRequestError( 

1386 f"Dialect {self.dialect.dialect_description} with " 

1387 f"current server capabilities does not support " 

1388 "UPDATE..RETURNING when executemany is used" 

1389 ) 

1390 elif id_ and not self.dialect.delete_executemany_returning: 

1391 raise exc.InvalidRequestError( 

1392 f"Dialect {self.dialect.dialect_description} with " 

1393 f"current server capabilities does not support " 

1394 "DELETE..RETURNING when executemany is used" 

1395 ) 

1396 

1397 if not parameters: 

1398 self.compiled_parameters = [ 

1399 compiled.construct_params( 

1400 extracted_parameters=extracted_parameters, 

1401 escape_names=False, 

1402 ) 

1403 ] 

1404 else: 

1405 self.compiled_parameters = [ 

1406 compiled.construct_params( 

1407 m, 

1408 escape_names=False, 

1409 _group_number=grp, 

1410 extracted_parameters=extracted_parameters, 

1411 ) 

1412 for grp, m in enumerate(parameters) 

1413 ] 

1414 

1415 if len(parameters) > 1: 

1416 if self.isinsert and compiled._insertmanyvalues: 

1417 self.execute_style = ExecuteStyle.INSERTMANYVALUES 

1418 

1419 imv = compiled._insertmanyvalues 

1420 if imv.sentinel_columns is not None: 

1421 self._num_sentinel_cols = imv.num_sentinel_columns 

1422 else: 

1423 self.execute_style = ExecuteStyle.EXECUTEMANY 

1424 

1425 self.unicode_statement = compiled.string 

1426 

1427 self.cursor = self.create_cursor() 

1428 

1429 if self.compiled.insert_prefetch or self.compiled.update_prefetch: 

1430 self._process_execute_defaults() 

1431 

1432 processors = compiled._bind_processors 

1433 

1434 flattened_processors: Mapping[ 

1435 str, _BindProcessorType[Any] 

1436 ] = processors # type: ignore[assignment] 

1437 

1438 if compiled.literal_execute_params or compiled.post_compile_params: 

1439 if self.executemany: 

1440 raise exc.InvalidRequestError( 

1441 "'literal_execute' or 'expanding' parameters can't be " 

1442 "used with executemany()" 

1443 ) 

1444 

1445 expanded_state = compiled._process_parameters_for_postcompile( 

1446 self.compiled_parameters[0] 

1447 ) 

1448 

1449 # re-assign self.unicode_statement 

1450 self.unicode_statement = expanded_state.statement 

1451 

1452 self._expanded_parameters = expanded_state.parameter_expansion 

1453 

1454 flattened_processors = dict(processors) # type: ignore 

1455 flattened_processors.update(expanded_state.processors) 

1456 positiontup = expanded_state.positiontup 

1457 elif compiled.positional: 

1458 positiontup = self.compiled.positiontup 

1459 else: 

1460 positiontup = None 

1461 

1462 if compiled.schema_translate_map: 

1463 schema_translate_map = self.execution_options.get( 

1464 "schema_translate_map", {} 

1465 ) 

1466 rst = compiled.preparer._render_schema_translates 

1467 self.unicode_statement = rst( 

1468 self.unicode_statement, schema_translate_map 

1469 ) 

1470 

1471 # final self.unicode_statement is now assigned, encode if needed 

1472 # by dialect 

1473 self.statement = self.unicode_statement 

1474 

1475 # Convert the dictionary of bind parameter values 

1476 # into a dict or list to be sent to the DBAPI's 

1477 # execute() or executemany() method. 

1478 

1479 if compiled.positional: 

1480 core_positional_parameters: MutableSequence[Sequence[Any]] = [] 

1481 assert positiontup is not None 

1482 for compiled_params in self.compiled_parameters: 

1483 l_param: List[Any] = [ 

1484 ( 

1485 flattened_processors[key](compiled_params[key]) 

1486 if key in flattened_processors 

1487 else compiled_params[key] 

1488 ) 

1489 for key in positiontup 

1490 ] 

1491 core_positional_parameters.append( 

1492 dialect.execute_sequence_format(l_param) 

1493 ) 

1494 

1495 self.parameters = core_positional_parameters 

1496 else: 

1497 core_dict_parameters: MutableSequence[Dict[str, Any]] = [] 

1498 escaped_names = compiled.escaped_bind_names 

1499 

1500 # note that currently, "expanded" parameters will be present 

1501 # in self.compiled_parameters in their quoted form. This is 

1502 # slightly inconsistent with the approach taken as of 

1503 # #8056 where self.compiled_parameters is meant to contain unquoted 

1504 # param names. 

1505 d_param: Dict[str, Any] 

1506 for compiled_params in self.compiled_parameters: 

1507 if escaped_names: 

1508 d_param = { 

1509 escaped_names.get(key, key): ( 

1510 flattened_processors[key](compiled_params[key]) 

1511 if key in flattened_processors 

1512 else compiled_params[key] 

1513 ) 

1514 for key in compiled_params 

1515 } 

1516 else: 

1517 d_param = { 

1518 key: ( 

1519 flattened_processors[key](compiled_params[key]) 

1520 if key in flattened_processors 

1521 else compiled_params[key] 

1522 ) 

1523 for key in compiled_params 

1524 } 

1525 

1526 core_dict_parameters.append(d_param) 

1527 

1528 self.parameters = core_dict_parameters 

1529 

1530 return self 

1531 

1532 @classmethod 

1533 def _init_statement( 

1534 cls, 

1535 dialect: Dialect, 

1536 connection: Connection, 

1537 dbapi_connection: PoolProxiedConnection, 

1538 execution_options: _ExecuteOptions, 

1539 statement: str, 

1540 parameters: _DBAPIMultiExecuteParams, 

1541 ) -> ExecutionContext: 

1542 """Initialize execution context for a string SQL statement.""" 

1543 

1544 self = cls.__new__(cls) 

1545 self.root_connection = connection 

1546 self._dbapi_connection = dbapi_connection 

1547 self.dialect = connection.dialect 

1548 self.is_text = True 

1549 

1550 self.execution_options = execution_options 

1551 

1552 if not parameters: 

1553 if self.dialect.positional: 

1554 self.parameters = [dialect.execute_sequence_format()] 

1555 else: 

1556 self.parameters = [self._empty_dict_params] 

1557 elif isinstance(parameters[0], dialect.execute_sequence_format): 

1558 self.parameters = parameters 

1559 elif isinstance(parameters[0], dict): 

1560 self.parameters = parameters 

1561 else: 

1562 self.parameters = [ 

1563 dialect.execute_sequence_format(p) for p in parameters 

1564 ] 

1565 

1566 if len(parameters) > 1: 

1567 self.execute_style = ExecuteStyle.EXECUTEMANY 

1568 

1569 self.statement = self.unicode_statement = statement 

1570 

1571 self.cursor = self.create_cursor() 

1572 return self 

1573 

1574 @classmethod 

1575 def _init_default( 

1576 cls, 

1577 dialect: Dialect, 

1578 connection: Connection, 

1579 dbapi_connection: PoolProxiedConnection, 

1580 execution_options: _ExecuteOptions, 

1581 ) -> ExecutionContext: 

1582 """Initialize execution context for a ColumnDefault construct.""" 

1583 

1584 self = cls.__new__(cls) 

1585 self.root_connection = connection 

1586 self._dbapi_connection = dbapi_connection 

1587 self.dialect = connection.dialect 

1588 

1589 self.execution_options = execution_options 

1590 

1591 self.cursor = self.create_cursor() 

1592 return self 

1593 

1594 def _get_cache_stats(self) -> str: 

1595 if self.compiled is None: 

1596 return "raw sql" 

1597 

1598 now = perf_counter() 

1599 

1600 ch = self.cache_hit 

1601 

1602 gen_time = self.compiled._gen_time 

1603 assert gen_time is not None 

1604 

1605 if ch is NO_CACHE_KEY: 

1606 return "no key %.5fs" % (now - gen_time,) 

1607 elif ch is CACHE_HIT: 

1608 return "cached since %.4gs ago" % (now - gen_time,) 

1609 elif ch is CACHE_MISS: 

1610 return "generated in %.5fs" % (now - gen_time,) 

1611 elif ch is CACHING_DISABLED: 

1612 if "_cache_disable_reason" in self.execution_options: 

1613 return "caching disabled (%s) %.5fs " % ( 

1614 self.execution_options["_cache_disable_reason"], 

1615 now - gen_time, 

1616 ) 

1617 else: 

1618 return "caching disabled %.5fs" % (now - gen_time,) 

1619 elif ch is NO_DIALECT_SUPPORT: 

1620 return "dialect %s+%s does not support caching %.5fs" % ( 

1621 self.dialect.name, 

1622 self.dialect.driver, 

1623 now - gen_time, 

1624 ) 

1625 else: 

1626 return "unknown" 

1627 

1628 @property 

1629 def executemany(self): 

1630 return self.execute_style in ( 

1631 ExecuteStyle.EXECUTEMANY, 

1632 ExecuteStyle.INSERTMANYVALUES, 

1633 ) 

1634 

1635 @util.memoized_property 

1636 def identifier_preparer(self): 

1637 if self.compiled: 

1638 return self.compiled.preparer 

1639 elif "schema_translate_map" in self.execution_options: 

1640 return self.dialect.identifier_preparer._with_schema_translate( 

1641 self.execution_options["schema_translate_map"] 

1642 ) 

1643 else: 

1644 return self.dialect.identifier_preparer 

1645 

1646 @util.memoized_property 

1647 def engine(self): 

1648 return self.root_connection.engine 

1649 

1650 @util.memoized_property 

1651 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1652 if TYPE_CHECKING: 

1653 assert isinstance(self.compiled, SQLCompiler) 

1654 return self.compiled.postfetch 

1655 

1656 @util.memoized_property 

1657 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1658 if TYPE_CHECKING: 

1659 assert isinstance(self.compiled, SQLCompiler) 

1660 if self.isinsert: 

1661 return self.compiled.insert_prefetch 

1662 elif self.isupdate: 

1663 return self.compiled.update_prefetch 

1664 else: 

1665 return () 

1666 

1667 @util.memoized_property 

1668 def no_parameters(self): 

1669 return self.execution_options.get("no_parameters", False) 

1670 

1671 def _execute_scalar( 

1672 self, 

1673 stmt: str, 

1674 type_: Optional[TypeEngine[Any]], 

1675 parameters: Optional[_DBAPISingleExecuteParams] = None, 

1676 ) -> Any: 

1677 """Execute a string statement on the current cursor, returning a 

1678 scalar result. 

1679 

1680 Used to fire off sequences, default phrases, and "select lastrowid" 

1681 types of statements individually or in the context of a parent INSERT 

1682 or UPDATE statement. 

1683 

1684 """ 

1685 

1686 conn = self.root_connection 

1687 

1688 if "schema_translate_map" in self.execution_options: 

1689 schema_translate_map = self.execution_options.get( 

1690 "schema_translate_map", {} 

1691 ) 

1692 

1693 rst = self.identifier_preparer._render_schema_translates 

1694 stmt = rst(stmt, schema_translate_map) 

1695 

1696 if not parameters: 

1697 if self.dialect.positional: 

1698 parameters = self.dialect.execute_sequence_format() 

1699 else: 

1700 parameters = {} 

1701 

1702 conn._cursor_execute(self.cursor, stmt, parameters, context=self) 

1703 row = self.cursor.fetchone() 

1704 if row is not None: 

1705 r = row[0] 

1706 else: 

1707 r = None 

1708 if type_ is not None: 

1709 # apply type post processors to the result 

1710 proc = type_._cached_result_processor( 

1711 self.dialect, self.cursor.description[0][1] 

1712 ) 

1713 if proc: 

1714 return proc(r) 

1715 return r 

1716 

1717 @util.memoized_property 

1718 def connection(self): 

1719 return self.root_connection 

1720 

1721 def _use_server_side_cursor(self): 

1722 if not self.dialect.supports_server_side_cursors: 

1723 return False 

1724 

1725 if self.dialect.server_side_cursors: 

1726 # this is deprecated 

1727 use_server_side = self.execution_options.get( 

1728 "stream_results", True 

1729 ) and ( 

1730 self.compiled 

1731 and isinstance(self.compiled.statement, expression.Selectable) 

1732 or ( 

1733 ( 

1734 not self.compiled 

1735 or isinstance( 

1736 self.compiled.statement, expression.TextClause 

1737 ) 

1738 ) 

1739 and self.unicode_statement 

1740 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement) 

1741 ) 

1742 ) 

1743 else: 

1744 use_server_side = self.execution_options.get( 

1745 "stream_results", False 

1746 ) 

1747 

1748 return use_server_side 

1749 

1750 def create_cursor(self) -> DBAPICursor: 

1751 if ( 

1752 # inlining initial preference checks for SS cursors 

1753 self.dialect.supports_server_side_cursors 

1754 and ( 

1755 self.execution_options.get("stream_results", False) 

1756 or ( 

1757 self.dialect.server_side_cursors 

1758 and self._use_server_side_cursor() 

1759 ) 

1760 ) 

1761 ): 

1762 self._is_server_side = True 

1763 return self.create_server_side_cursor() 

1764 else: 

1765 self._is_server_side = False 

1766 return self.create_default_cursor() 

1767 

1768 def fetchall_for_returning(self, cursor): 

1769 return cursor.fetchall() 

1770 

1771 def create_default_cursor(self) -> DBAPICursor: 

1772 return self._dbapi_connection.cursor() 

1773 

1774 def create_server_side_cursor(self) -> DBAPICursor: 

1775 raise NotImplementedError() 

1776 

1777 def pre_exec(self): 

1778 pass 

1779 

1780 def get_out_parameter_values(self, names): 

1781 raise NotImplementedError( 

1782 "This dialect does not support OUT parameters" 

1783 ) 

1784 

1785 def post_exec(self): 

1786 pass 

1787 

1788 def get_result_processor(self, type_, colname, coltype): 

1789 """Return a 'result processor' for a given type as present in 

1790 cursor.description. 

1791 

1792 This has a default implementation that dialects can override 

1793 for context-sensitive result type handling. 

1794 

1795 """ 

1796 return type_._cached_result_processor(self.dialect, coltype) 

1797 

1798 def get_lastrowid(self): 

1799 """return self.cursor.lastrowid, or equivalent, after an INSERT. 

1800 

1801 This may involve calling special cursor functions, issuing a new SELECT 

1802 on the cursor (or a new one), or returning a stored value that was 

1803 calculated within post_exec(). 

1804 

1805 This function will only be called for dialects which support "implicit" 

1806 primary key generation, keep preexecute_autoincrement_sequences set to 

1807 False, and when no explicit id value was bound to the statement. 

1808 

1809 The function is called once for an INSERT statement that would need to 

1810 return the last inserted primary key for those dialects that make use 

1811 of the lastrowid concept. In these cases, it is called directly after 

1812 :meth:`.ExecutionContext.post_exec`. 

1813 

1814 """ 

1815 return self.cursor.lastrowid 

1816 

1817 def handle_dbapi_exception(self, e): 

1818 pass 

1819 

1820 @util.non_memoized_property 

1821 def rowcount(self) -> int: 

1822 if self._rowcount is not None: 

1823 return self._rowcount 

1824 else: 

1825 return self.cursor.rowcount 

1826 

1827 @property 

1828 def _has_rowcount(self): 

1829 return self._rowcount is not None 

1830 

1831 def supports_sane_rowcount(self): 

1832 return self.dialect.supports_sane_rowcount 

1833 

1834 def supports_sane_multi_rowcount(self): 

1835 return self.dialect.supports_sane_multi_rowcount 

1836 

1837 def _setup_result_proxy(self): 

1838 exec_opt = self.execution_options 

1839 

1840 if self._rowcount is None and exec_opt.get("preserve_rowcount", False): 

1841 self._rowcount = self.cursor.rowcount 

1842 

1843 if self.is_crud or self.is_text: 

1844 result = self._setup_dml_or_text_result() 

1845 yp = False 

1846 else: 

1847 yp = exec_opt.get("yield_per", None) 

1848 sr = self._is_server_side or exec_opt.get("stream_results", False) 

1849 strategy = self.cursor_fetch_strategy 

1850 if sr and strategy is _cursor._DEFAULT_FETCH: 

1851 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1852 self.cursor, self.execution_options 

1853 ) 

1854 cursor_description: _DBAPICursorDescription = ( 

1855 strategy.alternate_cursor_description 

1856 or self.cursor.description 

1857 ) 

1858 if cursor_description is None: 

1859 strategy = _cursor._NO_CURSOR_DQL 

1860 

1861 result = _cursor.CursorResult(self, strategy, cursor_description) 

1862 

1863 compiled = self.compiled 

1864 

1865 if ( 

1866 compiled 

1867 and not self.isddl 

1868 and cast(SQLCompiler, compiled).has_out_parameters 

1869 ): 

1870 self._setup_out_parameters(result) 

1871 

1872 self._soft_closed = result._soft_closed 

1873 

1874 if yp: 

1875 result = result.yield_per(yp) 

1876 

1877 return result 

1878 

1879 def _setup_out_parameters(self, result): 

1880 compiled = cast(SQLCompiler, self.compiled) 

1881 

1882 out_bindparams = [ 

1883 (param, name) 

1884 for param, name in compiled.bind_names.items() 

1885 if param.isoutparam 

1886 ] 

1887 out_parameters = {} 

1888 

1889 for bindparam, raw_value in zip( 

1890 [param for param, name in out_bindparams], 

1891 self.get_out_parameter_values( 

1892 [name for param, name in out_bindparams] 

1893 ), 

1894 ): 

1895 type_ = bindparam.type 

1896 impl_type = type_.dialect_impl(self.dialect) 

1897 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi) 

1898 result_processor = impl_type.result_processor( 

1899 self.dialect, dbapi_type 

1900 ) 

1901 if result_processor is not None: 

1902 raw_value = result_processor(raw_value) 

1903 out_parameters[bindparam.key] = raw_value 

1904 

1905 result.out_parameters = out_parameters 

1906 

1907 def _setup_dml_or_text_result(self): 

1908 compiled = cast(SQLCompiler, self.compiled) 

1909 

1910 strategy: ResultFetchStrategy = self.cursor_fetch_strategy 

1911 

1912 if self.isinsert: 

1913 if ( 

1914 self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1915 and compiled.effective_returning 

1916 ): 

1917 strategy = _cursor.FullyBufferedCursorFetchStrategy( 

1918 self.cursor, 

1919 initial_buffer=self._insertmanyvalues_rows, 

1920 # maintain alt cursor description if set by the 

1921 # dialect, e.g. mssql preserves it 

1922 alternate_description=( 

1923 strategy.alternate_cursor_description 

1924 ), 

1925 ) 

1926 

1927 if compiled.postfetch_lastrowid: 

1928 self.inserted_primary_key_rows = ( 

1929 self._setup_ins_pk_from_lastrowid() 

1930 ) 

1931 # else if not self._is_implicit_returning, 

1932 # the default inserted_primary_key_rows accessor will 

1933 # return an "empty" primary key collection when accessed. 

1934 

1935 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: 

1936 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1937 self.cursor, self.execution_options 

1938 ) 

1939 

1940 if strategy is _cursor._NO_CURSOR_DML: 

1941 cursor_description = None 

1942 else: 

1943 cursor_description = ( 

1944 strategy.alternate_cursor_description 

1945 or self.cursor.description 

1946 ) 

1947 

1948 if cursor_description is None: 

1949 strategy = _cursor._NO_CURSOR_DML 

1950 elif self._num_sentinel_cols: 

1951 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1952 # strip out the sentinel columns from cursor description 

1953 # a similar logic is done to the rows only in CursorResult 

1954 cursor_description = cursor_description[ 

1955 0 : -self._num_sentinel_cols 

1956 ] 

1957 

1958 result: _cursor.CursorResult[Any] = _cursor.CursorResult( 

1959 self, strategy, cursor_description 

1960 ) 

1961 

1962 if self.isinsert: 

1963 if self._is_implicit_returning: 

1964 rows = result.all() 

1965 

1966 self.returned_default_rows = rows 

1967 

1968 self.inserted_primary_key_rows = ( 

1969 self._setup_ins_pk_from_implicit_returning(result, rows) 

1970 ) 

1971 

1972 # test that it has a cursor metadata that is accurate. the 

1973 # first row will have been fetched and current assumptions 

1974 # are that the result has only one row, until executemany() 

1975 # support is added here. 

1976 assert result._metadata.returns_rows 

1977 

1978 # Insert statement has both return_defaults() and 

1979 # returning(). rewind the result on the list of rows 

1980 # we just used. 

1981 if self._is_supplemental_returning: 

1982 result._rewind(rows) 

1983 else: 

1984 result._soft_close() 

1985 elif not self._is_explicit_returning: 

1986 result._soft_close() 

1987 

1988 # we assume here the result does not return any rows. 

1989 # *usually*, this will be true. However, some dialects 

1990 # such as that of MSSQL/pyodbc need to SELECT a post fetch 

1991 # function so this is not necessarily true. 

1992 # assert not result.returns_rows 

1993 

1994 elif self._is_implicit_returning: 

1995 rows = result.all() 

1996 

1997 if rows: 

1998 self.returned_default_rows = rows 

1999 self._rowcount = len(rows) 

2000 

2001 if self._is_supplemental_returning: 

2002 result._rewind(rows) 

2003 else: 

2004 result._soft_close() 

2005 

2006 # test that it has a cursor metadata that is accurate. 

2007 # the rows have all been fetched however. 

2008 assert result._metadata.returns_rows 

2009 

2010 elif not result._metadata.returns_rows: 

2011 # no results, get rowcount 

2012 # (which requires open cursor on some drivers) 

2013 if self._rowcount is None: 

2014 self._rowcount = self.cursor.rowcount 

2015 result._soft_close() 

2016 elif self.isupdate or self.isdelete: 

2017 if self._rowcount is None: 

2018 self._rowcount = self.cursor.rowcount 

2019 return result 

2020 

2021 @util.memoized_property 

2022 def inserted_primary_key_rows(self): 

2023 # if no specific "get primary key" strategy was set up 

2024 # during execution, return a "default" primary key based 

2025 # on what's in the compiled_parameters and nothing else. 

2026 return self._setup_ins_pk_from_empty() 

2027 

2028 def _setup_ins_pk_from_lastrowid(self): 

2029 getter = cast( 

2030 SQLCompiler, self.compiled 

2031 )._inserted_primary_key_from_lastrowid_getter 

2032 lastrowid = self.get_lastrowid() 

2033 return [getter(lastrowid, self.compiled_parameters[0])] 

2034 

2035 def _setup_ins_pk_from_empty(self): 

2036 getter = cast( 

2037 SQLCompiler, self.compiled 

2038 )._inserted_primary_key_from_lastrowid_getter 

2039 return [getter(None, param) for param in self.compiled_parameters] 

2040 

2041 def _setup_ins_pk_from_implicit_returning(self, result, rows): 

2042 if not rows: 

2043 return [] 

2044 

2045 getter = cast( 

2046 SQLCompiler, self.compiled 

2047 )._inserted_primary_key_from_returning_getter 

2048 compiled_params = self.compiled_parameters 

2049 

2050 return [ 

2051 getter(row, param) for row, param in zip(rows, compiled_params) 

2052 ] 

2053 

2054 def lastrow_has_defaults(self): 

2055 return (self.isinsert or self.isupdate) and bool( 

2056 cast(SQLCompiler, self.compiled).postfetch 

2057 ) 

2058 

2059 def _prepare_set_input_sizes( 

2060 self, 

2061 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]: 

2062 """Given a cursor and ClauseParameters, prepare arguments 

2063 in order to call the appropriate 

2064 style of ``setinputsizes()`` on the cursor, using DB-API types 

2065 from the bind parameter's ``TypeEngine`` objects. 

2066 

2067 This method only called by those dialects which set the 

2068 :attr:`.Dialect.bind_typing` attribute to 

2069 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are 

2070 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an 

2071 option. 

2072 

2073 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used 

2074 for pg8000 and asyncpg, which has been changed to inline rendering 

2075 of casts. 

2076 

2077 """ 

2078 if self.isddl or self.is_text: 

2079 return None 

2080 

2081 compiled = cast(SQLCompiler, self.compiled) 

2082 

2083 inputsizes = compiled._get_set_input_sizes_lookup() 

2084 

2085 if inputsizes is None: 

2086 return None 

2087 

2088 dialect = self.dialect 

2089 

2090 # all of the rest of this... cython? 

2091 

2092 if dialect._has_events: 

2093 inputsizes = dict(inputsizes) 

2094 dialect.dispatch.do_setinputsizes( 

2095 inputsizes, self.cursor, self.statement, self.parameters, self 

2096 ) 

2097 

2098 if compiled.escaped_bind_names: 

2099 escaped_bind_names = compiled.escaped_bind_names 

2100 else: 

2101 escaped_bind_names = None 

2102 

2103 if dialect.positional: 

2104 items = [ 

2105 (key, compiled.binds[key]) 

2106 for key in compiled.positiontup or () 

2107 ] 

2108 else: 

2109 items = [ 

2110 (key, bindparam) 

2111 for bindparam, key in compiled.bind_names.items() 

2112 ] 

2113 

2114 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = [] 

2115 for key, bindparam in items: 

2116 if bindparam in compiled.literal_execute_params: 

2117 continue 

2118 

2119 if key in self._expanded_parameters: 

2120 if is_tuple_type(bindparam.type): 

2121 num = len(bindparam.type.types) 

2122 dbtypes = inputsizes[bindparam] 

2123 generic_inputsizes.extend( 

2124 ( 

2125 ( 

2126 escaped_bind_names.get(paramname, paramname) 

2127 if escaped_bind_names is not None 

2128 else paramname 

2129 ), 

2130 dbtypes[idx % num], 

2131 bindparam.type.types[idx % num], 

2132 ) 

2133 for idx, paramname in enumerate( 

2134 self._expanded_parameters[key] 

2135 ) 

2136 ) 

2137 else: 

2138 dbtype = inputsizes.get(bindparam, None) 

2139 generic_inputsizes.extend( 

2140 ( 

2141 ( 

2142 escaped_bind_names.get(paramname, paramname) 

2143 if escaped_bind_names is not None 

2144 else paramname 

2145 ), 

2146 dbtype, 

2147 bindparam.type, 

2148 ) 

2149 for paramname in self._expanded_parameters[key] 

2150 ) 

2151 else: 

2152 dbtype = inputsizes.get(bindparam, None) 

2153 

2154 escaped_name = ( 

2155 escaped_bind_names.get(key, key) 

2156 if escaped_bind_names is not None 

2157 else key 

2158 ) 

2159 

2160 generic_inputsizes.append( 

2161 (escaped_name, dbtype, bindparam.type) 

2162 ) 

2163 

2164 return generic_inputsizes 

2165 

2166 def _exec_default(self, column, default, type_): 

2167 if default.is_sequence: 

2168 return self.fire_sequence(default, type_) 

2169 elif default.is_callable: 

2170 # this codepath is not normally used as it's inlined 

2171 # into _process_execute_defaults 

2172 self.current_column = column 

2173 return default.arg(self) 

2174 elif default.is_clause_element: 

2175 return self._exec_default_clause_element(column, default, type_) 

2176 else: 

2177 # this codepath is not normally used as it's inlined 

2178 # into _process_execute_defaults 

2179 return default.arg 

2180 

2181 def _exec_default_clause_element(self, column, default, type_): 

2182 # execute a default that's a complete clause element. Here, we have 

2183 # to re-implement a miniature version of the compile->parameters-> 

2184 # cursor.execute() sequence, since we don't want to modify the state 

2185 # of the connection / result in progress or create new connection/ 

2186 # result objects etc. 

2187 # .. versionchanged:: 1.4 

2188 

2189 if not default._arg_is_typed: 

2190 default_arg = expression.type_coerce(default.arg, type_) 

2191 else: 

2192 default_arg = default.arg 

2193 compiled = expression.select(default_arg).compile(dialect=self.dialect) 

2194 compiled_params = compiled.construct_params() 

2195 processors = compiled._bind_processors 

2196 if compiled.positional: 

2197 parameters = self.dialect.execute_sequence_format( 

2198 [ 

2199 ( 

2200 processors[key](compiled_params[key]) # type: ignore 

2201 if key in processors 

2202 else compiled_params[key] 

2203 ) 

2204 for key in compiled.positiontup or () 

2205 ] 

2206 ) 

2207 else: 

2208 parameters = { 

2209 key: ( 

2210 processors[key](compiled_params[key]) # type: ignore 

2211 if key in processors 

2212 else compiled_params[key] 

2213 ) 

2214 for key in compiled_params 

2215 } 

2216 return self._execute_scalar( 

2217 str(compiled), type_, parameters=parameters 

2218 ) 

2219 

2220 current_parameters: Optional[_CoreSingleExecuteParams] = None 

2221 """A dictionary of parameters applied to the current row. 

2222 

2223 This attribute is only available in the context of a user-defined default 

2224 generation function, e.g. as described at :ref:`context_default_functions`. 

2225 It consists of a dictionary which includes entries for each column/value 

2226 pair that is to be part of the INSERT or UPDATE statement. The keys of the 

2227 dictionary will be the key value of each :class:`_schema.Column`, 

2228 which is usually 

2229 synonymous with the name. 

2230 

2231 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute 

2232 does not accommodate for the "multi-values" feature of the 

2233 :meth:`_expression.Insert.values` method. The 

2234 :meth:`.DefaultExecutionContext.get_current_parameters` method should be 

2235 preferred. 

2236 

2237 .. seealso:: 

2238 

2239 :meth:`.DefaultExecutionContext.get_current_parameters` 

2240 

2241 :ref:`context_default_functions` 

2242 

2243 """ 

2244 

2245 def get_current_parameters(self, isolate_multiinsert_groups=True): 

2246 """Return a dictionary of parameters applied to the current row. 

2247 

2248 This method can only be used in the context of a user-defined default 

2249 generation function, e.g. as described at 

2250 :ref:`context_default_functions`. When invoked, a dictionary is 

2251 returned which includes entries for each column/value pair that is part 

2252 of the INSERT or UPDATE statement. The keys of the dictionary will be 

2253 the key value of each :class:`_schema.Column`, 

2254 which is usually synonymous 

2255 with the name. 

2256 

2257 :param isolate_multiinsert_groups=True: indicates that multi-valued 

2258 INSERT constructs created using :meth:`_expression.Insert.values` 

2259 should be 

2260 handled by returning only the subset of parameters that are local 

2261 to the current column default invocation. When ``False``, the 

2262 raw parameters of the statement are returned including the 

2263 naming convention used in the case of multi-valued INSERT. 

2264 

2265 .. versionadded:: 1.2 added 

2266 :meth:`.DefaultExecutionContext.get_current_parameters` 

2267 which provides more functionality over the existing 

2268 :attr:`.DefaultExecutionContext.current_parameters` 

2269 attribute. 

2270 

2271 .. seealso:: 

2272 

2273 :attr:`.DefaultExecutionContext.current_parameters` 

2274 

2275 :ref:`context_default_functions` 

2276 

2277 """ 

2278 try: 

2279 parameters = self.current_parameters 

2280 column = self.current_column 

2281 except AttributeError: 

2282 raise exc.InvalidRequestError( 

2283 "get_current_parameters() can only be invoked in the " 

2284 "context of a Python side column default function" 

2285 ) 

2286 else: 

2287 assert column is not None 

2288 assert parameters is not None 

2289 compile_state = cast( 

2290 "DMLState", cast(SQLCompiler, self.compiled).compile_state 

2291 ) 

2292 assert compile_state is not None 

2293 if ( 

2294 isolate_multiinsert_groups 

2295 and dml.isinsert(compile_state) 

2296 and compile_state._has_multi_parameters 

2297 ): 

2298 if column._is_multiparam_column: 

2299 index = column.index + 1 

2300 d = {column.original.key: parameters[column.key]} 

2301 else: 

2302 d = {column.key: parameters[column.key]} 

2303 index = 0 

2304 assert compile_state._dict_parameters is not None 

2305 keys = compile_state._dict_parameters.keys() 

2306 d.update( 

2307 (key, parameters["%s_m%d" % (key, index)]) for key in keys 

2308 ) 

2309 return d 

2310 else: 

2311 return parameters 

2312 

2313 def get_insert_default(self, column): 

2314 if column.default is None: 

2315 return None 

2316 else: 

2317 return self._exec_default(column, column.default, column.type) 

2318 

2319 def get_update_default(self, column): 

2320 if column.onupdate is None: 

2321 return None 

2322 else: 

2323 return self._exec_default(column, column.onupdate, column.type) 

2324 

2325 def _process_execute_defaults(self): 

2326 compiled = cast(SQLCompiler, self.compiled) 

2327 

2328 key_getter = compiled._within_exec_param_key_getter 

2329 

2330 sentinel_counter = 0 

2331 

2332 if compiled.insert_prefetch: 

2333 prefetch_recs = [ 

2334 ( 

2335 c, 

2336 key_getter(c), 

2337 c._default_description_tuple, 

2338 self.get_insert_default, 

2339 ) 

2340 for c in compiled.insert_prefetch 

2341 ] 

2342 elif compiled.update_prefetch: 

2343 prefetch_recs = [ 

2344 ( 

2345 c, 

2346 key_getter(c), 

2347 c._onupdate_description_tuple, 

2348 self.get_update_default, 

2349 ) 

2350 for c in compiled.update_prefetch 

2351 ] 

2352 else: 

2353 prefetch_recs = [] 

2354 

2355 for param in self.compiled_parameters: 

2356 self.current_parameters = param 

2357 

2358 for ( 

2359 c, 

2360 param_key, 

2361 (arg, is_scalar, is_callable, is_sentinel), 

2362 fallback, 

2363 ) in prefetch_recs: 

2364 if is_sentinel: 

2365 param[param_key] = sentinel_counter 

2366 sentinel_counter += 1 

2367 elif is_scalar: 

2368 param[param_key] = arg 

2369 elif is_callable: 

2370 self.current_column = c 

2371 param[param_key] = arg(self) 

2372 else: 

2373 val = fallback(c) 

2374 if val is not None: 

2375 param[param_key] = val 

2376 

2377 del self.current_parameters 

2378 

2379 

2380DefaultDialect.execution_ctx_cls = DefaultExecutionContext