Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1049 statements  

1# engine/default.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Default implementations of per-dialect sqlalchemy.engine classes. 

10 

11These are semi-private implementation classes which are only of importance 

12to database dialect authors; dialects will usually use the classes here 

13as the base class for their own corresponding classes. 

14 

15""" 

16 

17from __future__ import annotations 

18 

19import functools 

20import operator 

21import random 

22import re 

23from time import perf_counter 

24import typing 

25from typing import Any 

26from typing import Callable 

27from typing import cast 

28from typing import Dict 

29from typing import Final 

30from typing import List 

31from typing import Mapping 

32from typing import MutableMapping 

33from typing import MutableSequence 

34from typing import Optional 

35from typing import Sequence 

36from typing import Set 

37from typing import Tuple 

38from typing import Type 

39from typing import TYPE_CHECKING 

40from typing import Union 

41import weakref 

42 

43from . import characteristics 

44from . import cursor as _cursor 

45from . import interfaces 

46from .base import Connection 

47from .interfaces import CacheStats 

48from .interfaces import DBAPICursor 

49from .interfaces import Dialect 

50from .interfaces import ExecuteStyle 

51from .interfaces import ExecutionContext 

52from .reflection import ObjectKind 

53from .reflection import ObjectScope 

54from .. import event 

55from .. import exc 

56from .. import pool 

57from .. import util 

58from ..sql import compiler 

59from ..sql import dml 

60from ..sql import expression 

61from ..sql import type_api 

62from ..sql import util as sql_util 

63from ..sql._typing import is_tuple_type 

64from ..sql.base import _NoArg 

65from ..sql.compiler import DDLCompiler 

66from ..sql.compiler import InsertmanyvaluesSentinelOpts 

67from ..sql.compiler import SQLCompiler 

68from ..sql.elements import quoted_name 

69from ..util.typing import Literal 

70from ..util.typing import TupleAny 

71from ..util.typing import Unpack 

72 

73 

74if typing.TYPE_CHECKING: 

75 from types import ModuleType 

76 

77 from .base import Engine 

78 from .cursor import ResultFetchStrategy 

79 from .interfaces import _CoreMultiExecuteParams 

80 from .interfaces import _CoreSingleExecuteParams 

81 from .interfaces import _DBAPICursorDescription 

82 from .interfaces import _DBAPIMultiExecuteParams 

83 from .interfaces import _DBAPISingleExecuteParams 

84 from .interfaces import _ExecuteOptions 

85 from .interfaces import _MutableCoreSingleExecuteParams 

86 from .interfaces import _ParamStyle 

87 from .interfaces import ConnectArgsType 

88 from .interfaces import DBAPIConnection 

89 from .interfaces import DBAPIModule 

90 from .interfaces import IsolationLevel 

91 from .row import Row 

92 from .url import URL 

93 from ..event import _ListenerFnType 

94 from ..pool import Pool 

95 from ..pool import PoolProxiedConnection 

96 from ..sql import Executable 

97 from ..sql.compiler import Compiled 

98 from ..sql.compiler import Linting 

99 from ..sql.compiler import ResultColumnsEntry 

100 from ..sql.dml import DMLState 

101 from ..sql.dml import UpdateBase 

102 from ..sql.elements import BindParameter 

103 from ..sql.schema import Column 

104 from ..sql.type_api import _BindProcessorType 

105 from ..sql.type_api import _ResultProcessorType 

106 from ..sql.type_api import TypeEngine 

107 

108 

109# When we're handed literal SQL, ensure it's a SELECT query 

110SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE) 

111 

112 

113( 

114 CACHE_HIT, 

115 CACHE_MISS, 

116 CACHING_DISABLED, 

117 NO_CACHE_KEY, 

118 NO_DIALECT_SUPPORT, 

119) = list(CacheStats) 

120 

121 

122class DefaultDialect(Dialect): 

123 """Default implementation of Dialect""" 

124 

125 statement_compiler = compiler.SQLCompiler 

126 ddl_compiler = compiler.DDLCompiler 

127 type_compiler_cls = compiler.GenericTypeCompiler 

128 

129 preparer = compiler.IdentifierPreparer 

130 supports_alter = True 

131 supports_comments = False 

132 supports_constraint_comments = False 

133 inline_comments = False 

134 supports_statement_cache = True 

135 

136 div_is_floordiv = True 

137 

138 bind_typing = interfaces.BindTyping.NONE 

139 

140 include_set_input_sizes: Optional[Set[Any]] = None 

141 exclude_set_input_sizes: Optional[Set[Any]] = None 

142 

143 # the first value we'd get for an autoincrement column. 

144 default_sequence_base = 1 

145 

146 # most DBAPIs happy with this for execute(). 

147 # not cx_oracle. 

148 execute_sequence_format = tuple 

149 

150 supports_schemas = True 

151 supports_views = True 

152 supports_sequences = False 

153 sequences_optional = False 

154 preexecute_autoincrement_sequences = False 

155 supports_identity_columns = False 

156 postfetch_lastrowid = True 

157 favor_returning_over_lastrowid = False 

158 insert_null_pk_still_autoincrements = False 

159 update_returning = False 

160 delete_returning = False 

161 update_returning_multifrom = False 

162 delete_returning_multifrom = False 

163 insert_returning = False 

164 

165 cte_follows_insert = False 

166 

167 supports_native_enum = False 

168 supports_native_boolean = False 

169 supports_native_uuid = False 

170 returns_native_bytes = False 

171 

172 non_native_boolean_check_constraint = True 

173 

174 supports_simple_order_by_label = True 

175 

176 tuple_in_values = False 

177 

178 connection_characteristics = util.immutabledict( 

179 { 

180 "isolation_level": characteristics.IsolationLevelCharacteristic(), 

181 "logging_token": characteristics.LoggingTokenCharacteristic(), 

182 } 

183 ) 

184 

185 engine_config_types: Mapping[str, Any] = util.immutabledict( 

186 { 

187 "pool_timeout": util.asint, 

188 "echo": util.bool_or_str("debug"), 

189 "echo_pool": util.bool_or_str("debug"), 

190 "pool_recycle": util.asint, 

191 "pool_size": util.asint, 

192 "max_overflow": util.asint, 

193 "future": util.asbool, 

194 } 

195 ) 

196 

197 # if the NUMERIC type 

198 # returns decimal.Decimal. 

199 # *not* the FLOAT type however. 

200 supports_native_decimal = False 

201 

202 name = "default" 

203 

204 # length at which to truncate 

205 # any identifier. 

206 max_identifier_length = 9999 

207 _user_defined_max_identifier_length: Optional[int] = None 

208 

209 isolation_level: Optional[str] = None 

210 

211 # sub-categories of max_identifier_length. 

212 # currently these accommodate for MySQL which allows alias names 

213 # of 255 but DDL names only of 64. 

214 max_index_name_length: Optional[int] = None 

215 max_constraint_name_length: Optional[int] = None 

216 

217 supports_sane_rowcount = True 

218 supports_sane_multi_rowcount = True 

219 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {} 

220 default_paramstyle = "named" 

221 

222 supports_default_values = False 

223 """dialect supports INSERT... DEFAULT VALUES syntax""" 

224 

225 supports_default_metavalue = False 

226 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

227 

228 default_metavalue_token = "DEFAULT" 

229 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

230 parenthesis.""" 

231 

232 # not sure if this is a real thing but the compiler will deliver it 

233 # if this is the only flag enabled. 

234 supports_empty_insert = True 

235 """dialect supports INSERT () VALUES ()""" 

236 

237 supports_multivalues_insert = False 

238 

239 use_insertmanyvalues: bool = False 

240 

241 use_insertmanyvalues_wo_returning: bool = False 

242 

243 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = ( 

244 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED 

245 ) 

246 

247 insertmanyvalues_page_size: int = 1000 

248 insertmanyvalues_max_parameters = 32700 

249 

250 supports_is_distinct_from = True 

251 

252 supports_server_side_cursors = False 

253 

254 server_side_cursors = False 

255 

256 # extra record-level locking features (#4860) 

257 supports_for_update_of = False 

258 

259 server_version_info = None 

260 

261 default_schema_name: Optional[str] = None 

262 

263 # indicates symbol names are 

264 # UPPERCASED if they are case insensitive 

265 # within the database. 

266 # if this is True, the methods normalize_name() 

267 # and denormalize_name() must be provided. 

268 requires_name_normalize = False 

269 

270 is_async = False 

271 

272 has_terminate = False 

273 

274 # TODO: this is not to be part of 2.0. implement rudimentary binary 

275 # literals for SQLite, PostgreSQL, MySQL only within 

276 # _Binary.literal_processor 

277 _legacy_binary_type_literal_encoding = "utf-8" 

278 

279 @util.deprecated_params( 

280 empty_in_strategy=( 

281 "1.4", 

282 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is " 

283 "deprecated, and no longer has any effect. All IN expressions " 

284 "are now rendered using " 

285 'the "expanding parameter" strategy which renders a set of bound' 

286 'expressions, or an "empty set" SELECT, at statement execution' 

287 "time.", 

288 ), 

289 server_side_cursors=( 

290 "1.4", 

291 "The :paramref:`_sa.create_engine.server_side_cursors` parameter " 

292 "is deprecated and will be removed in a future release. Please " 

293 "use the " 

294 ":paramref:`_engine.Connection.execution_options.stream_results` " 

295 "parameter.", 

296 ), 

297 ) 

298 def __init__( 

299 self, 

300 paramstyle: Optional[_ParamStyle] = None, 

301 isolation_level: Optional[IsolationLevel] = None, 

302 dbapi: Optional[ModuleType] = None, 

303 implicit_returning: Literal[True] = True, 

304 supports_native_boolean: Optional[bool] = None, 

305 max_identifier_length: Optional[int] = None, 

306 label_length: Optional[int] = None, 

307 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG, 

308 use_insertmanyvalues: Optional[bool] = None, 

309 # util.deprecated_params decorator cannot render the 

310 # Linting.NO_LINTING constant 

311 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore 

312 server_side_cursors: bool = False, 

313 **kwargs: Any, 

314 ): 

315 if server_side_cursors: 

316 if not self.supports_server_side_cursors: 

317 raise exc.ArgumentError( 

318 "Dialect %s does not support server side cursors" % self 

319 ) 

320 else: 

321 self.server_side_cursors = True 

322 

323 if getattr(self, "use_setinputsizes", False): 

324 util.warn_deprecated( 

325 "The dialect-level use_setinputsizes attribute is " 

326 "deprecated. Please use " 

327 "bind_typing = BindTyping.SETINPUTSIZES", 

328 "2.0", 

329 ) 

330 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES 

331 

332 self.positional = False 

333 self._ischema = None 

334 

335 self.dbapi = dbapi 

336 

337 if paramstyle is not None: 

338 self.paramstyle = paramstyle 

339 elif self.dbapi is not None: 

340 self.paramstyle = self.dbapi.paramstyle 

341 else: 

342 self.paramstyle = self.default_paramstyle 

343 self.positional = self.paramstyle in ( 

344 "qmark", 

345 "format", 

346 "numeric", 

347 "numeric_dollar", 

348 ) 

349 self.identifier_preparer = self.preparer(self) 

350 self._on_connect_isolation_level = isolation_level 

351 

352 legacy_tt_callable = getattr(self, "type_compiler", None) 

353 if legacy_tt_callable is not None: 

354 tt_callable = cast( 

355 Type[compiler.GenericTypeCompiler], 

356 self.type_compiler, 

357 ) 

358 else: 

359 tt_callable = self.type_compiler_cls 

360 

361 self.type_compiler_instance = self.type_compiler = tt_callable(self) 

362 

363 if supports_native_boolean is not None: 

364 self.supports_native_boolean = supports_native_boolean 

365 

366 self._user_defined_max_identifier_length = max_identifier_length 

367 if self._user_defined_max_identifier_length: 

368 self.max_identifier_length = ( 

369 self._user_defined_max_identifier_length 

370 ) 

371 self.label_length = label_length 

372 self.compiler_linting = compiler_linting 

373 

374 if use_insertmanyvalues is not None: 

375 self.use_insertmanyvalues = use_insertmanyvalues 

376 

377 if insertmanyvalues_page_size is not _NoArg.NO_ARG: 

378 self.insertmanyvalues_page_size = insertmanyvalues_page_size 

379 

380 @property 

381 @util.deprecated( 

382 "2.0", 

383 "full_returning is deprecated, please use insert_returning, " 

384 "update_returning, delete_returning", 

385 ) 

386 def full_returning(self): 

387 return ( 

388 self.insert_returning 

389 and self.update_returning 

390 and self.delete_returning 

391 ) 

392 

393 @util.memoized_property 

394 def insert_executemany_returning(self): 

395 """Default implementation for insert_executemany_returning, if not 

396 otherwise overridden by the specific dialect. 

397 

398 The default dialect determines "insert_executemany_returning" is 

399 available if the dialect in use has opted into using the 

400 "use_insertmanyvalues" feature. If they haven't opted into that, then 

401 this attribute is False, unless the dialect in question overrides this 

402 and provides some other implementation (such as the Oracle Database 

403 dialects). 

404 

405 """ 

406 return self.insert_returning and self.use_insertmanyvalues 

407 

408 @util.memoized_property 

409 def insert_executemany_returning_sort_by_parameter_order(self): 

410 """Default implementation for 

411 insert_executemany_returning_deterministic_order, if not otherwise 

412 overridden by the specific dialect. 

413 

414 The default dialect determines "insert_executemany_returning" can have 

415 deterministic order only if the dialect in use has opted into using the 

416 "use_insertmanyvalues" feature, which implements deterministic ordering 

417 using client side sentinel columns only by default. The 

418 "insertmanyvalues" feature also features alternate forms that can 

419 use server-generated PK values as "sentinels", but those are only 

420 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel` 

421 bitflag enables those alternate SQL forms, which are disabled 

422 by default. 

423 

424 If the dialect in use hasn't opted into that, then this attribute is 

425 False, unless the dialect in question overrides this and provides some 

426 other implementation (such as the Oracle Database dialects). 

427 

428 """ 

429 return self.insert_returning and self.use_insertmanyvalues 

430 

431 update_executemany_returning = False 

432 delete_executemany_returning = False 

433 

434 @util.memoized_property 

435 def loaded_dbapi(self) -> DBAPIModule: 

436 if self.dbapi is None: 

437 raise exc.InvalidRequestError( 

438 f"Dialect {self} does not have a Python DBAPI established " 

439 "and cannot be used for actual database interaction" 

440 ) 

441 return self.dbapi 

442 

443 @util.memoized_property 

444 def _bind_typing_render_casts(self): 

445 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS 

446 

447 def _ensure_has_table_connection(self, arg: Connection) -> None: 

448 if not isinstance(arg, Connection): 

449 raise exc.ArgumentError( 

450 "The argument passed to Dialect.has_table() should be a " 

451 "%s, got %s. " 

452 "Additionally, the Dialect.has_table() method is for " 

453 "internal dialect " 

454 "use only; please use " 

455 "``inspect(some_engine).has_table(<tablename>>)`` " 

456 "for public API use." % (Connection, type(arg)) 

457 ) 

458 

459 @util.memoized_property 

460 def _supports_statement_cache(self): 

461 ssc = self.__class__.__dict__.get("supports_statement_cache", None) 

462 if ssc is None: 

463 util.warn( 

464 "Dialect %s:%s will not make use of SQL compilation caching " 

465 "as it does not set the 'supports_statement_cache' attribute " 

466 "to ``True``. This can have " 

467 "significant performance implications including some " 

468 "performance degradations in comparison to prior SQLAlchemy " 

469 "versions. Dialect maintainers should seek to set this " 

470 "attribute to True after appropriate development and testing " 

471 "for SQLAlchemy 1.4 caching support. Alternatively, this " 

472 "attribute may be set to False which will disable this " 

473 "warning." % (self.name, self.driver), 

474 code="cprf", 

475 ) 

476 

477 return bool(ssc) 

478 

479 @util.memoized_property 

480 def _type_memos(self): 

481 return weakref.WeakKeyDictionary() 

482 

483 @property 

484 def dialect_description(self): # type: ignore[override] 

485 return self.name + "+" + self.driver 

486 

487 @property 

488 def supports_sane_rowcount_returning(self): 

489 """True if this dialect supports sane rowcount even if RETURNING is 

490 in use. 

491 

492 For dialects that don't support RETURNING, this is synonymous with 

493 ``supports_sane_rowcount``. 

494 

495 """ 

496 return self.supports_sane_rowcount 

497 

498 @classmethod 

499 def get_pool_class(cls, url: URL) -> Type[Pool]: 

500 default: Type[pool.Pool] 

501 if cls.is_async: 

502 default = pool.AsyncAdaptedQueuePool 

503 else: 

504 default = pool.QueuePool 

505 

506 return getattr(cls, "poolclass", default) 

507 

508 def get_dialect_pool_class(self, url: URL) -> Type[Pool]: 

509 return self.get_pool_class(url) 

510 

511 @classmethod 

512 def load_provisioning(cls): 

513 package = ".".join(cls.__module__.split(".")[0:-1]) 

514 try: 

515 __import__(package + ".provision") 

516 except ImportError: 

517 pass 

518 

519 def _builtin_onconnect(self) -> Optional[_ListenerFnType]: 

520 if self._on_connect_isolation_level is not None: 

521 

522 def builtin_connect(dbapi_conn, conn_rec): 

523 self._assert_and_set_isolation_level( 

524 dbapi_conn, self._on_connect_isolation_level 

525 ) 

526 

527 return builtin_connect 

528 else: 

529 return None 

530 

531 def initialize(self, connection: Connection) -> None: 

532 try: 

533 self.server_version_info = self._get_server_version_info( 

534 connection 

535 ) 

536 except NotImplementedError: 

537 self.server_version_info = None 

538 try: 

539 self.default_schema_name = self._get_default_schema_name( 

540 connection 

541 ) 

542 except NotImplementedError: 

543 self.default_schema_name = None 

544 

545 try: 

546 self.default_isolation_level = self.get_default_isolation_level( 

547 connection.connection.dbapi_connection 

548 ) 

549 except NotImplementedError: 

550 self.default_isolation_level = None 

551 

552 if not self._user_defined_max_identifier_length: 

553 max_ident_length = self._check_max_identifier_length(connection) 

554 if max_ident_length: 

555 self.max_identifier_length = max_ident_length 

556 

557 if ( 

558 self.label_length 

559 and self.label_length > self.max_identifier_length 

560 ): 

561 raise exc.ArgumentError( 

562 "Label length of %d is greater than this dialect's" 

563 " maximum identifier length of %d" 

564 % (self.label_length, self.max_identifier_length) 

565 ) 

566 

567 def on_connect(self) -> Optional[Callable[[Any], None]]: 

568 # inherits the docstring from interfaces.Dialect.on_connect 

569 return None 

570 

571 def _check_max_identifier_length(self, connection): 

572 """Perform a connection / server version specific check to determine 

573 the max_identifier_length. 

574 

575 If the dialect's class level max_identifier_length should be used, 

576 can return None. 

577 

578 """ 

579 return None 

580 

581 def get_default_isolation_level(self, dbapi_conn): 

582 """Given a DBAPI connection, return its isolation level, or 

583 a default isolation level if one cannot be retrieved. 

584 

585 May be overridden by subclasses in order to provide a 

586 "fallback" isolation level for databases that cannot reliably 

587 retrieve the actual isolation level. 

588 

589 By default, calls the :meth:`_engine.Interfaces.get_isolation_level` 

590 method, propagating any exceptions raised. 

591 

592 """ 

593 return self.get_isolation_level(dbapi_conn) 

594 

595 def type_descriptor(self, typeobj): 

596 """Provide a database-specific :class:`.TypeEngine` object, given 

597 the generic object which comes from the types module. 

598 

599 This method looks for a dictionary called 

600 ``colspecs`` as a class or instance-level variable, 

601 and passes on to :func:`_types.adapt_type`. 

602 

603 """ 

604 return type_api.adapt_type(typeobj, self.colspecs) 

605 

606 def has_index(self, connection, table_name, index_name, schema=None, **kw): 

607 if not self.has_table(connection, table_name, schema=schema, **kw): 

608 return False 

609 for idx in self.get_indexes( 

610 connection, table_name, schema=schema, **kw 

611 ): 

612 if idx["name"] == index_name: 

613 return True 

614 else: 

615 return False 

616 

617 def has_schema( 

618 self, connection: Connection, schema_name: str, **kw: Any 

619 ) -> bool: 

620 return schema_name in self.get_schema_names(connection, **kw) 

621 

622 def validate_identifier(self, ident: str) -> None: 

623 if len(ident) > self.max_identifier_length: 

624 raise exc.IdentifierError( 

625 "Identifier '%s' exceeds maximum length of %d characters" 

626 % (ident, self.max_identifier_length) 

627 ) 

628 

629 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection: 

630 # inherits the docstring from interfaces.Dialect.connect 

631 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501 

632 

633 def create_connect_args(self, url: URL) -> ConnectArgsType: 

634 # inherits the docstring from interfaces.Dialect.create_connect_args 

635 opts = url.translate_connect_args() 

636 opts.update(url.query) 

637 return ([], opts) 

638 

639 def set_engine_execution_options( 

640 self, engine: Engine, opts: Mapping[str, Any] 

641 ) -> None: 

642 supported_names = set(self.connection_characteristics).intersection( 

643 opts 

644 ) 

645 if supported_names: 

646 characteristics: Mapping[str, Any] = util.immutabledict( 

647 (name, opts[name]) for name in supported_names 

648 ) 

649 

650 @event.listens_for(engine, "engine_connect") 

651 def set_connection_characteristics(connection): 

652 self._set_connection_characteristics( 

653 connection, characteristics 

654 ) 

655 

656 def set_connection_execution_options( 

657 self, connection: Connection, opts: Mapping[str, Any] 

658 ) -> None: 

659 supported_names = set(self.connection_characteristics).intersection( 

660 opts 

661 ) 

662 if supported_names: 

663 characteristics: Mapping[str, Any] = util.immutabledict( 

664 (name, opts[name]) for name in supported_names 

665 ) 

666 self._set_connection_characteristics(connection, characteristics) 

667 

668 def _set_connection_characteristics(self, connection, characteristics): 

669 characteristic_values = [ 

670 (name, self.connection_characteristics[name], value) 

671 for name, value in characteristics.items() 

672 ] 

673 

674 if connection.in_transaction(): 

675 trans_objs = [ 

676 (name, obj) 

677 for name, obj, _ in characteristic_values 

678 if obj.transactional 

679 ] 

680 if trans_objs: 

681 raise exc.InvalidRequestError( 

682 "This connection has already initialized a SQLAlchemy " 

683 "Transaction() object via begin() or autobegin; " 

684 "%s may not be altered unless rollback() or commit() " 

685 "is called first." 

686 % (", ".join(name for name, obj in trans_objs)) 

687 ) 

688 

689 dbapi_connection = connection.connection.dbapi_connection 

690 for _, characteristic, value in characteristic_values: 

691 characteristic.set_connection_characteristic( 

692 self, connection, dbapi_connection, value 

693 ) 

694 connection.connection._connection_record.finalize_callback.append( 

695 functools.partial(self._reset_characteristics, characteristics) 

696 ) 

697 

698 def _reset_characteristics(self, characteristics, dbapi_connection): 

699 for characteristic_name in characteristics: 

700 characteristic = self.connection_characteristics[ 

701 characteristic_name 

702 ] 

703 characteristic.reset_characteristic(self, dbapi_connection) 

704 

705 def do_begin(self, dbapi_connection): 

706 pass 

707 

708 def do_rollback(self, dbapi_connection): 

709 dbapi_connection.rollback() 

710 

711 def do_commit(self, dbapi_connection): 

712 dbapi_connection.commit() 

713 

714 def do_terminate(self, dbapi_connection): 

715 self.do_close(dbapi_connection) 

716 

717 def do_close(self, dbapi_connection): 

718 dbapi_connection.close() 

719 

720 @util.memoized_property 

721 def _dialect_specific_select_one(self): 

722 return str(expression.select(1).compile(dialect=self)) 

723 

724 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: 

725 try: 

726 return self.do_ping(dbapi_connection) 

727 except self.loaded_dbapi.Error as err: 

728 is_disconnect = self.is_disconnect(err, dbapi_connection, None) 

729 

730 if self._has_events: 

731 try: 

732 Connection._handle_dbapi_exception_noconnection( 

733 err, 

734 self, 

735 is_disconnect=is_disconnect, 

736 invalidate_pool_on_disconnect=False, 

737 is_pre_ping=True, 

738 ) 

739 except exc.StatementError as new_err: 

740 is_disconnect = new_err.connection_invalidated 

741 

742 if is_disconnect: 

743 return False 

744 else: 

745 raise 

746 

747 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool: 

748 cursor = dbapi_connection.cursor() 

749 try: 

750 cursor.execute(self._dialect_specific_select_one) 

751 finally: 

752 cursor.close() 

753 return True 

754 

755 def create_xid(self): 

756 """Create a random two-phase transaction ID. 

757 

758 This id will be passed to do_begin_twophase(), do_rollback_twophase(), 

759 do_commit_twophase(). Its format is unspecified. 

760 """ 

761 

762 return "_sa_%032x" % random.randint(0, 2**128) 

763 

764 def do_savepoint(self, connection, name): 

765 connection.execute(expression.SavepointClause(name)) 

766 

767 def do_rollback_to_savepoint(self, connection, name): 

768 connection.execute(expression.RollbackToSavepointClause(name)) 

769 

770 def do_release_savepoint(self, connection, name): 

771 connection.execute(expression.ReleaseSavepointClause(name)) 

772 

773 def _deliver_insertmanyvalues_batches( 

774 self, 

775 connection, 

776 cursor, 

777 statement, 

778 parameters, 

779 generic_setinputsizes, 

780 context, 

781 ): 

782 context = cast(DefaultExecutionContext, context) 

783 compiled = cast(SQLCompiler, context.compiled) 

784 

785 _composite_sentinel_proc: Sequence[ 

786 Optional[_ResultProcessorType[Any]] 

787 ] = () 

788 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None 

789 _sentinel_proc_initialized: bool = False 

790 

791 compiled_parameters = context.compiled_parameters 

792 

793 imv = compiled._insertmanyvalues 

794 assert imv is not None 

795 

796 is_returning: Final[bool] = bool(compiled.effective_returning) 

797 batch_size = context.execution_options.get( 

798 "insertmanyvalues_page_size", self.insertmanyvalues_page_size 

799 ) 

800 

801 if compiled.schema_translate_map: 

802 schema_translate_map = context.execution_options.get( 

803 "schema_translate_map", {} 

804 ) 

805 else: 

806 schema_translate_map = None 

807 

808 if is_returning: 

809 result: Optional[List[Any]] = [] 

810 context._insertmanyvalues_rows = result 

811 

812 sort_by_parameter_order = imv.sort_by_parameter_order 

813 

814 else: 

815 sort_by_parameter_order = False 

816 result = None 

817 

818 for imv_batch in compiled._deliver_insertmanyvalues_batches( 

819 statement, 

820 parameters, 

821 compiled_parameters, 

822 generic_setinputsizes, 

823 batch_size, 

824 sort_by_parameter_order, 

825 schema_translate_map, 

826 ): 

827 yield imv_batch 

828 

829 if is_returning: 

830 

831 try: 

832 rows = context.fetchall_for_returning(cursor) 

833 except BaseException as be: 

834 connection._handle_dbapi_exception( 

835 be, 

836 sql_util._long_statement(imv_batch.replaced_statement), 

837 imv_batch.replaced_parameters, 

838 None, 

839 context, 

840 is_sub_exec=True, 

841 ) 

842 

843 # I would have thought "is_returning: Final[bool]" 

844 # would have assured this but pylance thinks not 

845 assert result is not None 

846 

847 if imv.num_sentinel_columns and not imv_batch.is_downgraded: 

848 composite_sentinel = imv.num_sentinel_columns > 1 

849 if imv.implicit_sentinel: 

850 # for implicit sentinel, which is currently single-col 

851 # integer autoincrement, do a simple sort. 

852 assert not composite_sentinel 

853 result.extend( 

854 sorted(rows, key=operator.itemgetter(-1)) 

855 ) 

856 continue 

857 

858 # otherwise, create dictionaries to match up batches 

859 # with parameters 

860 assert imv.sentinel_param_keys 

861 assert imv.sentinel_columns 

862 

863 _nsc = imv.num_sentinel_columns 

864 

865 if not _sentinel_proc_initialized: 

866 if composite_sentinel: 

867 _composite_sentinel_proc = [ 

868 col.type._cached_result_processor( 

869 self, cursor_desc[1] 

870 ) 

871 for col, cursor_desc in zip( 

872 imv.sentinel_columns, 

873 cursor.description[-_nsc:], 

874 ) 

875 ] 

876 else: 

877 _scalar_sentinel_proc = ( 

878 imv.sentinel_columns[0] 

879 ).type._cached_result_processor( 

880 self, cursor.description[-1][1] 

881 ) 

882 _sentinel_proc_initialized = True 

883 

884 rows_by_sentinel: Union[ 

885 Dict[Tuple[Any, ...], Any], 

886 Dict[Any, Any], 

887 ] 

888 if composite_sentinel: 

889 rows_by_sentinel = { 

890 tuple( 

891 (proc(val) if proc else val) 

892 for val, proc in zip( 

893 row[-_nsc:], _composite_sentinel_proc 

894 ) 

895 ): row 

896 for row in rows 

897 } 

898 elif _scalar_sentinel_proc: 

899 rows_by_sentinel = { 

900 _scalar_sentinel_proc(row[-1]): row for row in rows 

901 } 

902 else: 

903 rows_by_sentinel = {row[-1]: row for row in rows} 

904 

905 if len(rows_by_sentinel) != len(imv_batch.batch): 

906 # see test_insert_exec.py:: 

907 # IMVSentinelTest::test_sentinel_incorrect_rowcount 

908 # for coverage / demonstration 

909 raise exc.InvalidRequestError( 

910 f"Sentinel-keyed result set did not produce " 

911 f"correct number of rows {len(imv_batch.batch)}; " 

912 "produced " 

913 f"{len(rows_by_sentinel)}. Please ensure the " 

914 "sentinel column is fully unique and populated in " 

915 "all cases." 

916 ) 

917 

918 try: 

919 ordered_rows = [ 

920 rows_by_sentinel[sentinel_keys] 

921 for sentinel_keys in imv_batch.sentinel_values 

922 ] 

923 except KeyError as ke: 

924 # see test_insert_exec.py:: 

925 # IMVSentinelTest::test_sentinel_cant_match_keys 

926 # for coverage / demonstration 

927 raise exc.InvalidRequestError( 

928 f"Can't match sentinel values in result set to " 

929 f"parameter sets; key {ke.args[0]!r} was not " 

930 "found. " 

931 "There may be a mismatch between the datatype " 

932 "passed to the DBAPI driver vs. that which it " 

933 "returns in a result row. Ensure the given " 

934 "Python value matches the expected result type " 

935 "*exactly*, taking care to not rely upon implicit " 

936 "conversions which may occur such as when using " 

937 "strings in place of UUID or integer values, etc. " 

938 ) from ke 

939 

940 result.extend(ordered_rows) 

941 

942 else: 

943 result.extend(rows) 

944 

945 def do_executemany(self, cursor, statement, parameters, context=None): 

946 cursor.executemany(statement, parameters) 

947 

948 def do_execute(self, cursor, statement, parameters, context=None): 

949 cursor.execute(statement, parameters) 

950 

951 def do_execute_no_params(self, cursor, statement, context=None): 

952 cursor.execute(statement) 

953 

954 def is_disconnect( 

955 self, 

956 e: DBAPIModule.Error, 

957 connection: Union[ 

958 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None 

959 ], 

960 cursor: Optional[interfaces.DBAPICursor], 

961 ) -> bool: 

962 return False 

963 

964 @util.memoized_instancemethod 

965 def _gen_allowed_isolation_levels(self, dbapi_conn): 

966 try: 

967 raw_levels = list(self.get_isolation_level_values(dbapi_conn)) 

968 except NotImplementedError: 

969 return None 

970 else: 

971 normalized_levels = [ 

972 level.replace("_", " ").upper() for level in raw_levels 

973 ] 

974 if raw_levels != normalized_levels: 

975 raise ValueError( 

976 f"Dialect {self.name!r} get_isolation_level_values() " 

977 f"method should return names as UPPERCASE using spaces, " 

978 f"not underscores; got " 

979 f"{sorted(set(raw_levels).difference(normalized_levels))}" 

980 ) 

981 return tuple(normalized_levels) 

982 

983 def _assert_and_set_isolation_level(self, dbapi_conn, level): 

984 level = level.replace("_", " ").upper() 

985 

986 _allowed_isolation_levels = self._gen_allowed_isolation_levels( 

987 dbapi_conn 

988 ) 

989 if ( 

990 _allowed_isolation_levels 

991 and level not in _allowed_isolation_levels 

992 ): 

993 raise exc.ArgumentError( 

994 f"Invalid value {level!r} for isolation_level. " 

995 f"Valid isolation levels for {self.name!r} are " 

996 f"{', '.join(_allowed_isolation_levels)}" 

997 ) 

998 

999 self.set_isolation_level(dbapi_conn, level) 

1000 

1001 def reset_isolation_level(self, dbapi_conn): 

1002 if self._on_connect_isolation_level is not None: 

1003 assert ( 

1004 self._on_connect_isolation_level == "AUTOCOMMIT" 

1005 or self._on_connect_isolation_level 

1006 == self.default_isolation_level 

1007 ) 

1008 self._assert_and_set_isolation_level( 

1009 dbapi_conn, self._on_connect_isolation_level 

1010 ) 

1011 else: 

1012 assert self.default_isolation_level is not None 

1013 self._assert_and_set_isolation_level( 

1014 dbapi_conn, 

1015 self.default_isolation_level, 

1016 ) 

1017 

1018 def normalize_name(self, name): 

1019 if name is None: 

1020 return None 

1021 

1022 name_lower = name.lower() 

1023 name_upper = name.upper() 

1024 

1025 if name_upper == name_lower: 

1026 # name has no upper/lower conversion, e.g. non-european characters. 

1027 # return unchanged 

1028 return name 

1029 elif name_upper == name and not ( 

1030 self.identifier_preparer._requires_quotes 

1031 )(name_lower): 

1032 # name is all uppercase and doesn't require quoting; normalize 

1033 # to all lower case 

1034 return name_lower 

1035 elif name_lower == name: 

1036 # name is all lower case, which if denormalized means we need to 

1037 # force quoting on it 

1038 return quoted_name(name, quote=True) 

1039 else: 

1040 # name is mixed case, means it will be quoted in SQL when used 

1041 # later, no normalizes 

1042 return name 

1043 

1044 def denormalize_name(self, name): 

1045 if name is None: 

1046 return None 

1047 

1048 name_lower = name.lower() 

1049 name_upper = name.upper() 

1050 

1051 if name_upper == name_lower: 

1052 # name has no upper/lower conversion, e.g. non-european characters. 

1053 # return unchanged 

1054 return name 

1055 elif name_lower == name and not ( 

1056 self.identifier_preparer._requires_quotes 

1057 )(name_lower): 

1058 name = name_upper 

1059 return name 

1060 

1061 def get_driver_connection(self, connection: DBAPIConnection) -> Any: 

1062 return connection 

1063 

1064 def _overrides_default(self, method): 

1065 return ( 

1066 getattr(type(self), method).__code__ 

1067 is not getattr(DefaultDialect, method).__code__ 

1068 ) 

1069 

1070 def _default_multi_reflect( 

1071 self, 

1072 single_tbl_method, 

1073 connection, 

1074 kind, 

1075 schema, 

1076 filter_names, 

1077 scope, 

1078 **kw, 

1079 ): 

1080 names_fns = [] 

1081 temp_names_fns = [] 

1082 if ObjectKind.TABLE in kind: 

1083 names_fns.append(self.get_table_names) 

1084 temp_names_fns.append(self.get_temp_table_names) 

1085 if ObjectKind.VIEW in kind: 

1086 names_fns.append(self.get_view_names) 

1087 temp_names_fns.append(self.get_temp_view_names) 

1088 if ObjectKind.MATERIALIZED_VIEW in kind: 

1089 names_fns.append(self.get_materialized_view_names) 

1090 # no temp materialized view at the moment 

1091 # temp_names_fns.append(self.get_temp_materialized_view_names) 

1092 

1093 unreflectable = kw.pop("unreflectable", {}) 

1094 

1095 if ( 

1096 filter_names 

1097 and scope is ObjectScope.ANY 

1098 and kind is ObjectKind.ANY 

1099 ): 

1100 # if names are given and no qualification on type of table 

1101 # (i.e. the Table(..., autoload) case), take the names as given, 

1102 # don't run names queries. If a table does not exit 

1103 # NoSuchTableError is raised and it's skipped 

1104 

1105 # this also suits the case for mssql where we can reflect 

1106 # individual temp tables but there's no temp_names_fn 

1107 names = filter_names 

1108 else: 

1109 names = [] 

1110 name_kw = {"schema": schema, **kw} 

1111 fns = [] 

1112 if ObjectScope.DEFAULT in scope: 

1113 fns.extend(names_fns) 

1114 if ObjectScope.TEMPORARY in scope: 

1115 fns.extend(temp_names_fns) 

1116 

1117 for fn in fns: 

1118 try: 

1119 names.extend(fn(connection, **name_kw)) 

1120 except NotImplementedError: 

1121 pass 

1122 

1123 if filter_names: 

1124 filter_names = set(filter_names) 

1125 

1126 # iterate over all the tables/views and call the single table method 

1127 for table in names: 

1128 if not filter_names or table in filter_names: 

1129 key = (schema, table) 

1130 try: 

1131 yield ( 

1132 key, 

1133 single_tbl_method( 

1134 connection, table, schema=schema, **kw 

1135 ), 

1136 ) 

1137 except exc.UnreflectableTableError as err: 

1138 if key not in unreflectable: 

1139 unreflectable[key] = err 

1140 except exc.NoSuchTableError: 

1141 pass 

1142 

1143 def get_multi_table_options(self, connection, **kw): 

1144 return self._default_multi_reflect( 

1145 self.get_table_options, connection, **kw 

1146 ) 

1147 

1148 def get_multi_columns(self, connection, **kw): 

1149 return self._default_multi_reflect(self.get_columns, connection, **kw) 

1150 

1151 def get_multi_pk_constraint(self, connection, **kw): 

1152 return self._default_multi_reflect( 

1153 self.get_pk_constraint, connection, **kw 

1154 ) 

1155 

1156 def get_multi_foreign_keys(self, connection, **kw): 

1157 return self._default_multi_reflect( 

1158 self.get_foreign_keys, connection, **kw 

1159 ) 

1160 

1161 def get_multi_indexes(self, connection, **kw): 

1162 return self._default_multi_reflect(self.get_indexes, connection, **kw) 

1163 

1164 def get_multi_unique_constraints(self, connection, **kw): 

1165 return self._default_multi_reflect( 

1166 self.get_unique_constraints, connection, **kw 

1167 ) 

1168 

1169 def get_multi_check_constraints(self, connection, **kw): 

1170 return self._default_multi_reflect( 

1171 self.get_check_constraints, connection, **kw 

1172 ) 

1173 

1174 def get_multi_table_comment(self, connection, **kw): 

1175 return self._default_multi_reflect( 

1176 self.get_table_comment, connection, **kw 

1177 ) 

1178 

1179 

1180class StrCompileDialect(DefaultDialect): 

1181 statement_compiler = compiler.StrSQLCompiler 

1182 ddl_compiler = compiler.DDLCompiler 

1183 type_compiler_cls = compiler.StrSQLTypeCompiler 

1184 preparer = compiler.IdentifierPreparer 

1185 

1186 insert_returning = True 

1187 update_returning = True 

1188 delete_returning = True 

1189 

1190 supports_statement_cache = True 

1191 

1192 supports_identity_columns = True 

1193 

1194 supports_sequences = True 

1195 sequences_optional = True 

1196 preexecute_autoincrement_sequences = False 

1197 

1198 supports_native_boolean = True 

1199 

1200 supports_multivalues_insert = True 

1201 supports_simple_order_by_label = True 

1202 

1203 

1204class DefaultExecutionContext(ExecutionContext): 

1205 isinsert = False 

1206 isupdate = False 

1207 isdelete = False 

1208 is_crud = False 

1209 is_text = False 

1210 isddl = False 

1211 

1212 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE 

1213 

1214 compiled: Optional[Compiled] = None 

1215 result_column_struct: Optional[ 

1216 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool] 

1217 ] = None 

1218 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None 

1219 

1220 execution_options: _ExecuteOptions = util.EMPTY_DICT 

1221 

1222 cursor_fetch_strategy = _cursor._DEFAULT_FETCH 

1223 

1224 invoked_statement: Optional[Executable] = None 

1225 

1226 _is_implicit_returning = False 

1227 _is_explicit_returning = False 

1228 _is_supplemental_returning = False 

1229 _is_server_side = False 

1230 

1231 _soft_closed = False 

1232 

1233 _rowcount: Optional[int] = None 

1234 

1235 # a hook for SQLite's translation of 

1236 # result column names 

1237 # NOTE: pyhive is using this hook, can't remove it :( 

1238 _translate_colname: Optional[Callable[[str], str]] = None 

1239 

1240 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict() 

1241 """used by set_input_sizes(). 

1242 

1243 This collection comes from ``ExpandedState.parameter_expansion``. 

1244 

1245 """ 

1246 

1247 cache_hit = NO_CACHE_KEY 

1248 

1249 root_connection: Connection 

1250 _dbapi_connection: PoolProxiedConnection 

1251 dialect: Dialect 

1252 unicode_statement: str 

1253 cursor: DBAPICursor 

1254 compiled_parameters: List[_MutableCoreSingleExecuteParams] 

1255 parameters: _DBAPIMultiExecuteParams 

1256 extracted_parameters: Optional[Sequence[BindParameter[Any]]] 

1257 

1258 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) 

1259 

1260 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None 

1261 _num_sentinel_cols: int = 0 

1262 

1263 @classmethod 

1264 def _init_ddl( 

1265 cls, 

1266 dialect: Dialect, 

1267 connection: Connection, 

1268 dbapi_connection: PoolProxiedConnection, 

1269 execution_options: _ExecuteOptions, 

1270 compiled_ddl: DDLCompiler, 

1271 ) -> ExecutionContext: 

1272 """Initialize execution context for an ExecutableDDLElement 

1273 construct.""" 

1274 

1275 self = cls.__new__(cls) 

1276 self.root_connection = connection 

1277 self._dbapi_connection = dbapi_connection 

1278 self.dialect = connection.dialect 

1279 

1280 self.compiled = compiled = compiled_ddl 

1281 self.isddl = True 

1282 

1283 self.execution_options = execution_options 

1284 

1285 self.unicode_statement = str(compiled) 

1286 if compiled.schema_translate_map: 

1287 schema_translate_map = self.execution_options.get( 

1288 "schema_translate_map", {} 

1289 ) 

1290 

1291 rst = compiled.preparer._render_schema_translates 

1292 self.unicode_statement = rst( 

1293 self.unicode_statement, schema_translate_map 

1294 ) 

1295 

1296 self.statement = self.unicode_statement 

1297 

1298 self.cursor = self.create_cursor() 

1299 self.compiled_parameters = [] 

1300 

1301 if dialect.positional: 

1302 self.parameters = [dialect.execute_sequence_format()] 

1303 else: 

1304 self.parameters = [self._empty_dict_params] 

1305 

1306 return self 

1307 

1308 @classmethod 

1309 def _init_compiled( 

1310 cls, 

1311 dialect: Dialect, 

1312 connection: Connection, 

1313 dbapi_connection: PoolProxiedConnection, 

1314 execution_options: _ExecuteOptions, 

1315 compiled: SQLCompiler, 

1316 parameters: _CoreMultiExecuteParams, 

1317 invoked_statement: Executable, 

1318 extracted_parameters: Optional[Sequence[BindParameter[Any]]], 

1319 cache_hit: CacheStats = CacheStats.CACHING_DISABLED, 

1320 ) -> ExecutionContext: 

1321 """Initialize execution context for a Compiled construct.""" 

1322 

1323 self = cls.__new__(cls) 

1324 self.root_connection = connection 

1325 self._dbapi_connection = dbapi_connection 

1326 self.dialect = connection.dialect 

1327 self.extracted_parameters = extracted_parameters 

1328 self.invoked_statement = invoked_statement 

1329 self.compiled = compiled 

1330 self.cache_hit = cache_hit 

1331 

1332 self.execution_options = execution_options 

1333 

1334 self.result_column_struct = ( 

1335 compiled._result_columns, 

1336 compiled._ordered_columns, 

1337 compiled._textual_ordered_columns, 

1338 compiled._ad_hoc_textual, 

1339 compiled._loose_column_name_matching, 

1340 ) 

1341 

1342 self.isinsert = ii = compiled.isinsert 

1343 self.isupdate = iu = compiled.isupdate 

1344 self.isdelete = id_ = compiled.isdelete 

1345 self.is_text = compiled.isplaintext 

1346 

1347 if ii or iu or id_: 

1348 dml_statement = compiled.compile_state.statement # type: ignore 

1349 if TYPE_CHECKING: 

1350 assert isinstance(dml_statement, UpdateBase) 

1351 self.is_crud = True 

1352 self._is_explicit_returning = ier = bool(dml_statement._returning) 

1353 self._is_implicit_returning = iir = bool( 

1354 compiled.implicit_returning 

1355 ) 

1356 if iir and dml_statement._supplemental_returning: 

1357 self._is_supplemental_returning = True 

1358 

1359 # dont mix implicit and explicit returning 

1360 assert not (iir and ier) 

1361 

1362 if (ier or iir) and compiled.for_executemany: 

1363 if ii and not self.dialect.insert_executemany_returning: 

1364 raise exc.InvalidRequestError( 

1365 f"Dialect {self.dialect.dialect_description} with " 

1366 f"current server capabilities does not support " 

1367 "INSERT..RETURNING when executemany is used" 

1368 ) 

1369 elif ( 

1370 ii 

1371 and dml_statement._sort_by_parameter_order 

1372 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501 

1373 ): 

1374 raise exc.InvalidRequestError( 

1375 f"Dialect {self.dialect.dialect_description} with " 

1376 f"current server capabilities does not support " 

1377 "INSERT..RETURNING with deterministic row ordering " 

1378 "when executemany is used" 

1379 ) 

1380 elif ( 

1381 ii 

1382 and self.dialect.use_insertmanyvalues 

1383 and not compiled._insertmanyvalues 

1384 ): 

1385 raise exc.InvalidRequestError( 

1386 'Statement does not have "insertmanyvalues" ' 

1387 "enabled, can't use INSERT..RETURNING with " 

1388 "executemany in this case." 

1389 ) 

1390 elif iu and not self.dialect.update_executemany_returning: 

1391 raise exc.InvalidRequestError( 

1392 f"Dialect {self.dialect.dialect_description} with " 

1393 f"current server capabilities does not support " 

1394 "UPDATE..RETURNING when executemany is used" 

1395 ) 

1396 elif id_ and not self.dialect.delete_executemany_returning: 

1397 raise exc.InvalidRequestError( 

1398 f"Dialect {self.dialect.dialect_description} with " 

1399 f"current server capabilities does not support " 

1400 "DELETE..RETURNING when executemany is used" 

1401 ) 

1402 

1403 if not parameters: 

1404 self.compiled_parameters = [ 

1405 compiled.construct_params( 

1406 extracted_parameters=extracted_parameters, 

1407 escape_names=False, 

1408 ) 

1409 ] 

1410 else: 

1411 self.compiled_parameters = [ 

1412 compiled.construct_params( 

1413 m, 

1414 escape_names=False, 

1415 _group_number=grp, 

1416 extracted_parameters=extracted_parameters, 

1417 ) 

1418 for grp, m in enumerate(parameters) 

1419 ] 

1420 

1421 if len(parameters) > 1: 

1422 if self.isinsert and compiled._insertmanyvalues: 

1423 self.execute_style = ExecuteStyle.INSERTMANYVALUES 

1424 

1425 imv = compiled._insertmanyvalues 

1426 if imv.sentinel_columns is not None: 

1427 self._num_sentinel_cols = imv.num_sentinel_columns 

1428 else: 

1429 self.execute_style = ExecuteStyle.EXECUTEMANY 

1430 

1431 self.unicode_statement = compiled.string 

1432 

1433 self.cursor = self.create_cursor() 

1434 

1435 if self.compiled.insert_prefetch or self.compiled.update_prefetch: 

1436 self._process_execute_defaults() 

1437 

1438 processors = compiled._bind_processors 

1439 

1440 flattened_processors: Mapping[ 

1441 str, _BindProcessorType[Any] 

1442 ] = processors # type: ignore[assignment] 

1443 

1444 if compiled.literal_execute_params or compiled.post_compile_params: 

1445 if self.executemany: 

1446 raise exc.InvalidRequestError( 

1447 "'literal_execute' or 'expanding' parameters can't be " 

1448 "used with executemany()" 

1449 ) 

1450 

1451 expanded_state = compiled._process_parameters_for_postcompile( 

1452 self.compiled_parameters[0] 

1453 ) 

1454 

1455 # re-assign self.unicode_statement 

1456 self.unicode_statement = expanded_state.statement 

1457 

1458 self._expanded_parameters = expanded_state.parameter_expansion 

1459 

1460 flattened_processors = dict(processors) # type: ignore 

1461 flattened_processors.update(expanded_state.processors) 

1462 positiontup = expanded_state.positiontup 

1463 elif compiled.positional: 

1464 positiontup = self.compiled.positiontup 

1465 else: 

1466 positiontup = None 

1467 

1468 if compiled.schema_translate_map: 

1469 schema_translate_map = self.execution_options.get( 

1470 "schema_translate_map", {} 

1471 ) 

1472 rst = compiled.preparer._render_schema_translates 

1473 self.unicode_statement = rst( 

1474 self.unicode_statement, schema_translate_map 

1475 ) 

1476 

1477 # final self.unicode_statement is now assigned, encode if needed 

1478 # by dialect 

1479 self.statement = self.unicode_statement 

1480 

1481 # Convert the dictionary of bind parameter values 

1482 # into a dict or list to be sent to the DBAPI's 

1483 # execute() or executemany() method. 

1484 

1485 if compiled.positional: 

1486 core_positional_parameters: MutableSequence[Sequence[Any]] = [] 

1487 assert positiontup is not None 

1488 for compiled_params in self.compiled_parameters: 

1489 l_param: List[Any] = [ 

1490 ( 

1491 flattened_processors[key](compiled_params[key]) 

1492 if key in flattened_processors 

1493 else compiled_params[key] 

1494 ) 

1495 for key in positiontup 

1496 ] 

1497 core_positional_parameters.append( 

1498 dialect.execute_sequence_format(l_param) 

1499 ) 

1500 

1501 self.parameters = core_positional_parameters 

1502 else: 

1503 core_dict_parameters: MutableSequence[Dict[str, Any]] = [] 

1504 escaped_names = compiled.escaped_bind_names 

1505 

1506 # note that currently, "expanded" parameters will be present 

1507 # in self.compiled_parameters in their quoted form. This is 

1508 # slightly inconsistent with the approach taken as of 

1509 # #8056 where self.compiled_parameters is meant to contain unquoted 

1510 # param names. 

1511 d_param: Dict[str, Any] 

1512 for compiled_params in self.compiled_parameters: 

1513 if escaped_names: 

1514 d_param = { 

1515 escaped_names.get(key, key): ( 

1516 flattened_processors[key](compiled_params[key]) 

1517 if key in flattened_processors 

1518 else compiled_params[key] 

1519 ) 

1520 for key in compiled_params 

1521 } 

1522 else: 

1523 d_param = { 

1524 key: ( 

1525 flattened_processors[key](compiled_params[key]) 

1526 if key in flattened_processors 

1527 else compiled_params[key] 

1528 ) 

1529 for key in compiled_params 

1530 } 

1531 

1532 core_dict_parameters.append(d_param) 

1533 

1534 self.parameters = core_dict_parameters 

1535 

1536 return self 

1537 

1538 @classmethod 

1539 def _init_statement( 

1540 cls, 

1541 dialect: Dialect, 

1542 connection: Connection, 

1543 dbapi_connection: PoolProxiedConnection, 

1544 execution_options: _ExecuteOptions, 

1545 statement: str, 

1546 parameters: _DBAPIMultiExecuteParams, 

1547 ) -> ExecutionContext: 

1548 """Initialize execution context for a string SQL statement.""" 

1549 

1550 self = cls.__new__(cls) 

1551 self.root_connection = connection 

1552 self._dbapi_connection = dbapi_connection 

1553 self.dialect = connection.dialect 

1554 self.is_text = True 

1555 

1556 self.execution_options = execution_options 

1557 

1558 if not parameters: 

1559 if self.dialect.positional: 

1560 self.parameters = [dialect.execute_sequence_format()] 

1561 else: 

1562 self.parameters = [self._empty_dict_params] 

1563 elif isinstance(parameters[0], dialect.execute_sequence_format): 

1564 self.parameters = parameters 

1565 elif isinstance(parameters[0], dict): 

1566 self.parameters = parameters 

1567 else: 

1568 self.parameters = [ 

1569 dialect.execute_sequence_format(p) for p in parameters 

1570 ] 

1571 

1572 if len(parameters) > 1: 

1573 self.execute_style = ExecuteStyle.EXECUTEMANY 

1574 

1575 self.statement = self.unicode_statement = statement 

1576 

1577 self.cursor = self.create_cursor() 

1578 return self 

1579 

1580 @classmethod 

1581 def _init_default( 

1582 cls, 

1583 dialect: Dialect, 

1584 connection: Connection, 

1585 dbapi_connection: PoolProxiedConnection, 

1586 execution_options: _ExecuteOptions, 

1587 ) -> ExecutionContext: 

1588 """Initialize execution context for a ColumnDefault construct.""" 

1589 

1590 self = cls.__new__(cls) 

1591 self.root_connection = connection 

1592 self._dbapi_connection = dbapi_connection 

1593 self.dialect = connection.dialect 

1594 

1595 self.execution_options = execution_options 

1596 

1597 self.cursor = self.create_cursor() 

1598 return self 

1599 

1600 def _get_cache_stats(self) -> str: 

1601 if self.compiled is None: 

1602 return "raw sql" 

1603 

1604 now = perf_counter() 

1605 

1606 ch = self.cache_hit 

1607 

1608 gen_time = self.compiled._gen_time 

1609 assert gen_time is not None 

1610 

1611 if ch is NO_CACHE_KEY: 

1612 return "no key %.5fs" % (now - gen_time,) 

1613 elif ch is CACHE_HIT: 

1614 return "cached since %.4gs ago" % (now - gen_time,) 

1615 elif ch is CACHE_MISS: 

1616 return "generated in %.5fs" % (now - gen_time,) 

1617 elif ch is CACHING_DISABLED: 

1618 if "_cache_disable_reason" in self.execution_options: 

1619 return "caching disabled (%s) %.5fs " % ( 

1620 self.execution_options["_cache_disable_reason"], 

1621 now - gen_time, 

1622 ) 

1623 else: 

1624 return "caching disabled %.5fs" % (now - gen_time,) 

1625 elif ch is NO_DIALECT_SUPPORT: 

1626 return "dialect %s+%s does not support caching %.5fs" % ( 

1627 self.dialect.name, 

1628 self.dialect.driver, 

1629 now - gen_time, 

1630 ) 

1631 else: 

1632 return "unknown" 

1633 

1634 @property 

1635 def executemany(self): # type: ignore[override] 

1636 return self.execute_style in ( 

1637 ExecuteStyle.EXECUTEMANY, 

1638 ExecuteStyle.INSERTMANYVALUES, 

1639 ) 

1640 

1641 @util.memoized_property 

1642 def identifier_preparer(self): 

1643 if self.compiled: 

1644 return self.compiled.preparer 

1645 elif "schema_translate_map" in self.execution_options: 

1646 return self.dialect.identifier_preparer._with_schema_translate( 

1647 self.execution_options["schema_translate_map"] 

1648 ) 

1649 else: 

1650 return self.dialect.identifier_preparer 

1651 

1652 @util.memoized_property 

1653 def engine(self): 

1654 return self.root_connection.engine 

1655 

1656 @util.memoized_property 

1657 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1658 if TYPE_CHECKING: 

1659 assert isinstance(self.compiled, SQLCompiler) 

1660 return self.compiled.postfetch 

1661 

1662 @util.memoized_property 

1663 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1664 if TYPE_CHECKING: 

1665 assert isinstance(self.compiled, SQLCompiler) 

1666 if self.isinsert: 

1667 return self.compiled.insert_prefetch 

1668 elif self.isupdate: 

1669 return self.compiled.update_prefetch 

1670 else: 

1671 return () 

1672 

1673 @util.memoized_property 

1674 def no_parameters(self): 

1675 return self.execution_options.get("no_parameters", False) 

1676 

1677 def _execute_scalar( 

1678 self, 

1679 stmt: str, 

1680 type_: Optional[TypeEngine[Any]], 

1681 parameters: Optional[_DBAPISingleExecuteParams] = None, 

1682 ) -> Any: 

1683 """Execute a string statement on the current cursor, returning a 

1684 scalar result. 

1685 

1686 Used to fire off sequences, default phrases, and "select lastrowid" 

1687 types of statements individually or in the context of a parent INSERT 

1688 or UPDATE statement. 

1689 

1690 """ 

1691 

1692 conn = self.root_connection 

1693 

1694 if "schema_translate_map" in self.execution_options: 

1695 schema_translate_map = self.execution_options.get( 

1696 "schema_translate_map", {} 

1697 ) 

1698 

1699 rst = self.identifier_preparer._render_schema_translates 

1700 stmt = rst(stmt, schema_translate_map) 

1701 

1702 if not parameters: 

1703 if self.dialect.positional: 

1704 parameters = self.dialect.execute_sequence_format() 

1705 else: 

1706 parameters = {} 

1707 

1708 conn._cursor_execute(self.cursor, stmt, parameters, context=self) 

1709 row = self.cursor.fetchone() 

1710 if row is not None: 

1711 r = row[0] 

1712 else: 

1713 r = None 

1714 if type_ is not None: 

1715 # apply type post processors to the result 

1716 proc = type_._cached_result_processor( 

1717 self.dialect, self.cursor.description[0][1] 

1718 ) 

1719 if proc: 

1720 return proc(r) 

1721 return r 

1722 

1723 @util.memoized_property 

1724 def connection(self): 

1725 return self.root_connection 

1726 

1727 def _use_server_side_cursor(self): 

1728 if not self.dialect.supports_server_side_cursors: 

1729 return False 

1730 

1731 if self.dialect.server_side_cursors: 

1732 # this is deprecated 

1733 use_server_side = self.execution_options.get( 

1734 "stream_results", True 

1735 ) and ( 

1736 self.compiled 

1737 and isinstance(self.compiled.statement, expression.Selectable) 

1738 or ( 

1739 ( 

1740 not self.compiled 

1741 or isinstance( 

1742 self.compiled.statement, expression.TextClause 

1743 ) 

1744 ) 

1745 and self.unicode_statement 

1746 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement) 

1747 ) 

1748 ) 

1749 else: 

1750 use_server_side = self.execution_options.get( 

1751 "stream_results", False 

1752 ) 

1753 

1754 return use_server_side 

1755 

1756 def create_cursor(self) -> DBAPICursor: 

1757 if ( 

1758 # inlining initial preference checks for SS cursors 

1759 self.dialect.supports_server_side_cursors 

1760 and ( 

1761 self.execution_options.get("stream_results", False) 

1762 or ( 

1763 self.dialect.server_side_cursors 

1764 and self._use_server_side_cursor() 

1765 ) 

1766 ) 

1767 ): 

1768 self._is_server_side = True 

1769 return self.create_server_side_cursor() 

1770 else: 

1771 self._is_server_side = False 

1772 return self.create_default_cursor() 

1773 

1774 def fetchall_for_returning(self, cursor): 

1775 return cursor.fetchall() 

1776 

1777 def create_default_cursor(self) -> DBAPICursor: 

1778 return self._dbapi_connection.cursor() 

1779 

1780 def create_server_side_cursor(self) -> DBAPICursor: 

1781 raise NotImplementedError() 

1782 

1783 def pre_exec(self): 

1784 pass 

1785 

1786 def get_out_parameter_values(self, names): 

1787 raise NotImplementedError( 

1788 "This dialect does not support OUT parameters" 

1789 ) 

1790 

1791 def post_exec(self): 

1792 pass 

1793 

1794 def get_result_processor(self, type_, colname, coltype): 

1795 """Return a 'result processor' for a given type as present in 

1796 cursor.description. 

1797 

1798 This has a default implementation that dialects can override 

1799 for context-sensitive result type handling. 

1800 

1801 """ 

1802 return type_._cached_result_processor(self.dialect, coltype) 

1803 

1804 def get_lastrowid(self): 

1805 """return self.cursor.lastrowid, or equivalent, after an INSERT. 

1806 

1807 This may involve calling special cursor functions, issuing a new SELECT 

1808 on the cursor (or a new one), or returning a stored value that was 

1809 calculated within post_exec(). 

1810 

1811 This function will only be called for dialects which support "implicit" 

1812 primary key generation, keep preexecute_autoincrement_sequences set to 

1813 False, and when no explicit id value was bound to the statement. 

1814 

1815 The function is called once for an INSERT statement that would need to 

1816 return the last inserted primary key for those dialects that make use 

1817 of the lastrowid concept. In these cases, it is called directly after 

1818 :meth:`.ExecutionContext.post_exec`. 

1819 

1820 """ 

1821 return self.cursor.lastrowid 

1822 

1823 def handle_dbapi_exception(self, e): 

1824 pass 

1825 

1826 @util.non_memoized_property 

1827 def rowcount(self) -> int: 

1828 if self._rowcount is not None: 

1829 return self._rowcount 

1830 else: 

1831 return self.cursor.rowcount 

1832 

1833 @property 

1834 def _has_rowcount(self): 

1835 return self._rowcount is not None 

1836 

1837 def supports_sane_rowcount(self): 

1838 return self.dialect.supports_sane_rowcount 

1839 

1840 def supports_sane_multi_rowcount(self): 

1841 return self.dialect.supports_sane_multi_rowcount 

1842 

1843 def _setup_result_proxy(self): 

1844 exec_opt = self.execution_options 

1845 

1846 if self._rowcount is None and exec_opt.get("preserve_rowcount", False): 

1847 self._rowcount = self.cursor.rowcount 

1848 

1849 yp: Optional[Union[int, bool]] 

1850 if self.is_crud or self.is_text: 

1851 result = self._setup_dml_or_text_result() 

1852 yp = False 

1853 else: 

1854 yp = exec_opt.get("yield_per", None) 

1855 sr = self._is_server_side or exec_opt.get("stream_results", False) 

1856 strategy = self.cursor_fetch_strategy 

1857 if sr and strategy is _cursor._DEFAULT_FETCH: 

1858 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1859 self.cursor, self.execution_options 

1860 ) 

1861 cursor_description: _DBAPICursorDescription = ( 

1862 strategy.alternate_cursor_description 

1863 or self.cursor.description 

1864 ) 

1865 if cursor_description is None: 

1866 strategy = _cursor._NO_CURSOR_DQL 

1867 

1868 result = _cursor.CursorResult(self, strategy, cursor_description) 

1869 

1870 compiled = self.compiled 

1871 

1872 if ( 

1873 compiled 

1874 and not self.isddl 

1875 and cast(SQLCompiler, compiled).has_out_parameters 

1876 ): 

1877 self._setup_out_parameters(result) 

1878 

1879 self._soft_closed = result._soft_closed 

1880 

1881 if yp: 

1882 result = result.yield_per(yp) 

1883 

1884 return result 

1885 

1886 def _setup_out_parameters(self, result): 

1887 compiled = cast(SQLCompiler, self.compiled) 

1888 

1889 out_bindparams = [ 

1890 (param, name) 

1891 for param, name in compiled.bind_names.items() 

1892 if param.isoutparam 

1893 ] 

1894 out_parameters = {} 

1895 

1896 for bindparam, raw_value in zip( 

1897 [param for param, name in out_bindparams], 

1898 self.get_out_parameter_values( 

1899 [name for param, name in out_bindparams] 

1900 ), 

1901 ): 

1902 type_ = bindparam.type 

1903 impl_type = type_.dialect_impl(self.dialect) 

1904 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi) 

1905 result_processor = impl_type.result_processor( 

1906 self.dialect, dbapi_type 

1907 ) 

1908 if result_processor is not None: 

1909 raw_value = result_processor(raw_value) 

1910 out_parameters[bindparam.key] = raw_value 

1911 

1912 result.out_parameters = out_parameters 

1913 

1914 def _setup_dml_or_text_result(self): 

1915 compiled = cast(SQLCompiler, self.compiled) 

1916 

1917 strategy: ResultFetchStrategy = self.cursor_fetch_strategy 

1918 

1919 if self.isinsert: 

1920 if ( 

1921 self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1922 and compiled.effective_returning 

1923 ): 

1924 strategy = _cursor.FullyBufferedCursorFetchStrategy( 

1925 self.cursor, 

1926 initial_buffer=self._insertmanyvalues_rows, 

1927 # maintain alt cursor description if set by the 

1928 # dialect, e.g. mssql preserves it 

1929 alternate_description=( 

1930 strategy.alternate_cursor_description 

1931 ), 

1932 ) 

1933 

1934 if compiled.postfetch_lastrowid: 

1935 self.inserted_primary_key_rows = ( 

1936 self._setup_ins_pk_from_lastrowid() 

1937 ) 

1938 # else if not self._is_implicit_returning, 

1939 # the default inserted_primary_key_rows accessor will 

1940 # return an "empty" primary key collection when accessed. 

1941 

1942 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: 

1943 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1944 self.cursor, self.execution_options 

1945 ) 

1946 

1947 if strategy is _cursor._NO_CURSOR_DML: 

1948 cursor_description = None 

1949 else: 

1950 cursor_description = ( 

1951 strategy.alternate_cursor_description 

1952 or self.cursor.description 

1953 ) 

1954 

1955 if cursor_description is None: 

1956 strategy = _cursor._NO_CURSOR_DML 

1957 elif self._num_sentinel_cols: 

1958 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1959 # strip out the sentinel columns from cursor description 

1960 # a similar logic is done to the rows only in CursorResult 

1961 cursor_description = cursor_description[ 

1962 0 : -self._num_sentinel_cols 

1963 ] 

1964 

1965 result: _cursor.CursorResult[Any] = _cursor.CursorResult( 

1966 self, strategy, cursor_description 

1967 ) 

1968 

1969 if self.isinsert: 

1970 if self._is_implicit_returning: 

1971 rows = result.all() 

1972 

1973 self.returned_default_rows = rows 

1974 

1975 self.inserted_primary_key_rows = ( 

1976 self._setup_ins_pk_from_implicit_returning(result, rows) 

1977 ) 

1978 

1979 # test that it has a cursor metadata that is accurate. the 

1980 # first row will have been fetched and current assumptions 

1981 # are that the result has only one row, until executemany() 

1982 # support is added here. 

1983 assert result._metadata.returns_rows 

1984 

1985 # Insert statement has both return_defaults() and 

1986 # returning(). rewind the result on the list of rows 

1987 # we just used. 

1988 if self._is_supplemental_returning: 

1989 result._rewind(rows) 

1990 else: 

1991 result._soft_close() 

1992 elif not self._is_explicit_returning: 

1993 result._soft_close() 

1994 

1995 # we assume here the result does not return any rows. 

1996 # *usually*, this will be true. However, some dialects 

1997 # such as that of MSSQL/pyodbc need to SELECT a post fetch 

1998 # function so this is not necessarily true. 

1999 # assert not result.returns_rows 

2000 

2001 elif self._is_implicit_returning: 

2002 rows = result.all() 

2003 

2004 if rows: 

2005 self.returned_default_rows = rows 

2006 self._rowcount = len(rows) 

2007 

2008 if self._is_supplemental_returning: 

2009 result._rewind(rows) 

2010 else: 

2011 result._soft_close() 

2012 

2013 # test that it has a cursor metadata that is accurate. 

2014 # the rows have all been fetched however. 

2015 assert result._metadata.returns_rows 

2016 

2017 elif not result._metadata.returns_rows: 

2018 # no results, get rowcount 

2019 # (which requires open cursor on some drivers) 

2020 if self._rowcount is None: 

2021 self._rowcount = self.cursor.rowcount 

2022 result._soft_close() 

2023 elif self.isupdate or self.isdelete: 

2024 if self._rowcount is None: 

2025 self._rowcount = self.cursor.rowcount 

2026 return result 

2027 

2028 @util.memoized_property 

2029 def inserted_primary_key_rows(self): 

2030 # if no specific "get primary key" strategy was set up 

2031 # during execution, return a "default" primary key based 

2032 # on what's in the compiled_parameters and nothing else. 

2033 return self._setup_ins_pk_from_empty() 

2034 

2035 def _setup_ins_pk_from_lastrowid(self): 

2036 getter = cast( 

2037 SQLCompiler, self.compiled 

2038 )._inserted_primary_key_from_lastrowid_getter 

2039 lastrowid = self.get_lastrowid() 

2040 return [getter(lastrowid, self.compiled_parameters[0])] 

2041 

2042 def _setup_ins_pk_from_empty(self): 

2043 getter = cast( 

2044 SQLCompiler, self.compiled 

2045 )._inserted_primary_key_from_lastrowid_getter 

2046 return [getter(None, param) for param in self.compiled_parameters] 

2047 

2048 def _setup_ins_pk_from_implicit_returning(self, result, rows): 

2049 if not rows: 

2050 return [] 

2051 

2052 getter = cast( 

2053 SQLCompiler, self.compiled 

2054 )._inserted_primary_key_from_returning_getter 

2055 compiled_params = self.compiled_parameters 

2056 

2057 return [ 

2058 getter(row, param) for row, param in zip(rows, compiled_params) 

2059 ] 

2060 

2061 def lastrow_has_defaults(self): 

2062 return (self.isinsert or self.isupdate) and bool( 

2063 cast(SQLCompiler, self.compiled).postfetch 

2064 ) 

2065 

2066 def _prepare_set_input_sizes( 

2067 self, 

2068 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]: 

2069 """Given a cursor and ClauseParameters, prepare arguments 

2070 in order to call the appropriate 

2071 style of ``setinputsizes()`` on the cursor, using DB-API types 

2072 from the bind parameter's ``TypeEngine`` objects. 

2073 

2074 This method only called by those dialects which set the 

2075 :attr:`.Dialect.bind_typing` attribute to 

2076 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are 

2077 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an 

2078 option. 

2079 

2080 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used 

2081 for pg8000 and asyncpg, which has been changed to inline rendering 

2082 of casts. 

2083 

2084 """ 

2085 if self.isddl or self.is_text: 

2086 return None 

2087 

2088 compiled = cast(SQLCompiler, self.compiled) 

2089 

2090 inputsizes = compiled._get_set_input_sizes_lookup() 

2091 

2092 if inputsizes is None: 

2093 return None 

2094 

2095 dialect = self.dialect 

2096 

2097 # all of the rest of this... cython? 

2098 

2099 if dialect._has_events: 

2100 inputsizes = dict(inputsizes) 

2101 dialect.dispatch.do_setinputsizes( 

2102 inputsizes, self.cursor, self.statement, self.parameters, self 

2103 ) 

2104 

2105 if compiled.escaped_bind_names: 

2106 escaped_bind_names = compiled.escaped_bind_names 

2107 else: 

2108 escaped_bind_names = None 

2109 

2110 if dialect.positional: 

2111 items = [ 

2112 (key, compiled.binds[key]) 

2113 for key in compiled.positiontup or () 

2114 ] 

2115 else: 

2116 items = [ 

2117 (key, bindparam) 

2118 for bindparam, key in compiled.bind_names.items() 

2119 ] 

2120 

2121 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = [] 

2122 for key, bindparam in items: 

2123 if bindparam in compiled.literal_execute_params: 

2124 continue 

2125 

2126 if key in self._expanded_parameters: 

2127 if is_tuple_type(bindparam.type): 

2128 num = len(bindparam.type.types) 

2129 dbtypes = inputsizes[bindparam] 

2130 generic_inputsizes.extend( 

2131 ( 

2132 ( 

2133 escaped_bind_names.get(paramname, paramname) 

2134 if escaped_bind_names is not None 

2135 else paramname 

2136 ), 

2137 dbtypes[idx % num], 

2138 bindparam.type.types[idx % num], 

2139 ) 

2140 for idx, paramname in enumerate( 

2141 self._expanded_parameters[key] 

2142 ) 

2143 ) 

2144 else: 

2145 dbtype = inputsizes.get(bindparam, None) 

2146 generic_inputsizes.extend( 

2147 ( 

2148 ( 

2149 escaped_bind_names.get(paramname, paramname) 

2150 if escaped_bind_names is not None 

2151 else paramname 

2152 ), 

2153 dbtype, 

2154 bindparam.type, 

2155 ) 

2156 for paramname in self._expanded_parameters[key] 

2157 ) 

2158 else: 

2159 dbtype = inputsizes.get(bindparam, None) 

2160 

2161 escaped_name = ( 

2162 escaped_bind_names.get(key, key) 

2163 if escaped_bind_names is not None 

2164 else key 

2165 ) 

2166 

2167 generic_inputsizes.append( 

2168 (escaped_name, dbtype, bindparam.type) 

2169 ) 

2170 

2171 return generic_inputsizes 

2172 

2173 def _exec_default(self, column, default, type_): 

2174 if default.is_sequence: 

2175 return self.fire_sequence(default, type_) 

2176 elif default.is_callable: 

2177 # this codepath is not normally used as it's inlined 

2178 # into _process_execute_defaults 

2179 self.current_column = column 

2180 return default.arg(self) 

2181 elif default.is_clause_element: 

2182 return self._exec_default_clause_element(column, default, type_) 

2183 else: 

2184 # this codepath is not normally used as it's inlined 

2185 # into _process_execute_defaults 

2186 return default.arg 

2187 

2188 def _exec_default_clause_element(self, column, default, type_): 

2189 # execute a default that's a complete clause element. Here, we have 

2190 # to re-implement a miniature version of the compile->parameters-> 

2191 # cursor.execute() sequence, since we don't want to modify the state 

2192 # of the connection / result in progress or create new connection/ 

2193 # result objects etc. 

2194 # .. versionchanged:: 1.4 

2195 

2196 if not default._arg_is_typed: 

2197 default_arg = expression.type_coerce(default.arg, type_) 

2198 else: 

2199 default_arg = default.arg 

2200 compiled = expression.select(default_arg).compile(dialect=self.dialect) 

2201 compiled_params = compiled.construct_params() 

2202 processors = compiled._bind_processors 

2203 if compiled.positional: 

2204 parameters = self.dialect.execute_sequence_format( 

2205 [ 

2206 ( 

2207 processors[key](compiled_params[key]) # type: ignore 

2208 if key in processors 

2209 else compiled_params[key] 

2210 ) 

2211 for key in compiled.positiontup or () 

2212 ] 

2213 ) 

2214 else: 

2215 parameters = { 

2216 key: ( 

2217 processors[key](compiled_params[key]) # type: ignore 

2218 if key in processors 

2219 else compiled_params[key] 

2220 ) 

2221 for key in compiled_params 

2222 } 

2223 return self._execute_scalar( 

2224 str(compiled), type_, parameters=parameters 

2225 ) 

2226 

2227 current_parameters: Optional[_CoreSingleExecuteParams] = None 

2228 """A dictionary of parameters applied to the current row. 

2229 

2230 This attribute is only available in the context of a user-defined default 

2231 generation function, e.g. as described at :ref:`context_default_functions`. 

2232 It consists of a dictionary which includes entries for each column/value 

2233 pair that is to be part of the INSERT or UPDATE statement. The keys of the 

2234 dictionary will be the key value of each :class:`_schema.Column`, 

2235 which is usually 

2236 synonymous with the name. 

2237 

2238 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute 

2239 does not accommodate for the "multi-values" feature of the 

2240 :meth:`_expression.Insert.values` method. The 

2241 :meth:`.DefaultExecutionContext.get_current_parameters` method should be 

2242 preferred. 

2243 

2244 .. seealso:: 

2245 

2246 :meth:`.DefaultExecutionContext.get_current_parameters` 

2247 

2248 :ref:`context_default_functions` 

2249 

2250 """ 

2251 

2252 def get_current_parameters(self, isolate_multiinsert_groups=True): 

2253 """Return a dictionary of parameters applied to the current row. 

2254 

2255 This method can only be used in the context of a user-defined default 

2256 generation function, e.g. as described at 

2257 :ref:`context_default_functions`. When invoked, a dictionary is 

2258 returned which includes entries for each column/value pair that is part 

2259 of the INSERT or UPDATE statement. The keys of the dictionary will be 

2260 the key value of each :class:`_schema.Column`, 

2261 which is usually synonymous 

2262 with the name. 

2263 

2264 :param isolate_multiinsert_groups=True: indicates that multi-valued 

2265 INSERT constructs created using :meth:`_expression.Insert.values` 

2266 should be 

2267 handled by returning only the subset of parameters that are local 

2268 to the current column default invocation. When ``False``, the 

2269 raw parameters of the statement are returned including the 

2270 naming convention used in the case of multi-valued INSERT. 

2271 

2272 .. seealso:: 

2273 

2274 :attr:`.DefaultExecutionContext.current_parameters` 

2275 

2276 :ref:`context_default_functions` 

2277 

2278 """ 

2279 try: 

2280 parameters = self.current_parameters 

2281 column = self.current_column 

2282 except AttributeError: 

2283 raise exc.InvalidRequestError( 

2284 "get_current_parameters() can only be invoked in the " 

2285 "context of a Python side column default function" 

2286 ) 

2287 else: 

2288 assert column is not None 

2289 assert parameters is not None 

2290 compile_state = cast( 

2291 "DMLState", cast(SQLCompiler, self.compiled).compile_state 

2292 ) 

2293 assert compile_state is not None 

2294 if ( 

2295 isolate_multiinsert_groups 

2296 and dml.isinsert(compile_state) 

2297 and compile_state._has_multi_parameters 

2298 ): 

2299 if column._is_multiparam_column: 

2300 index = column.index + 1 

2301 d = {column.original.key: parameters[column.key]} 

2302 else: 

2303 d = {column.key: parameters[column.key]} 

2304 index = 0 

2305 assert compile_state._dict_parameters is not None 

2306 keys = compile_state._dict_parameters.keys() 

2307 d.update( 

2308 (key, parameters["%s_m%d" % (key, index)]) for key in keys 

2309 ) 

2310 return d 

2311 else: 

2312 return parameters 

2313 

2314 def get_insert_default(self, column): 

2315 if column.default is None: 

2316 return None 

2317 else: 

2318 return self._exec_default(column, column.default, column.type) 

2319 

2320 def get_update_default(self, column): 

2321 if column.onupdate is None: 

2322 return None 

2323 else: 

2324 return self._exec_default(column, column.onupdate, column.type) 

2325 

2326 def _process_execute_defaults(self): 

2327 compiled = cast(SQLCompiler, self.compiled) 

2328 

2329 key_getter = compiled._within_exec_param_key_getter 

2330 

2331 sentinel_counter = 0 

2332 

2333 if compiled.insert_prefetch: 

2334 prefetch_recs = [ 

2335 ( 

2336 c, 

2337 key_getter(c), 

2338 c._default_description_tuple, 

2339 self.get_insert_default, 

2340 ) 

2341 for c in compiled.insert_prefetch 

2342 ] 

2343 elif compiled.update_prefetch: 

2344 prefetch_recs = [ 

2345 ( 

2346 c, 

2347 key_getter(c), 

2348 c._onupdate_description_tuple, 

2349 self.get_update_default, 

2350 ) 

2351 for c in compiled.update_prefetch 

2352 ] 

2353 else: 

2354 prefetch_recs = [] 

2355 

2356 for param in self.compiled_parameters: 

2357 self.current_parameters = param 

2358 

2359 for ( 

2360 c, 

2361 param_key, 

2362 (arg, is_scalar, is_callable, is_sentinel), 

2363 fallback, 

2364 ) in prefetch_recs: 

2365 if is_sentinel: 

2366 param[param_key] = sentinel_counter 

2367 sentinel_counter += 1 

2368 elif is_scalar: 

2369 param[param_key] = arg 

2370 elif is_callable: 

2371 self.current_column = c 

2372 param[param_key] = arg(self) 

2373 else: 

2374 val = fallback(c) 

2375 if val is not None: 

2376 param[param_key] = val 

2377 

2378 del self.current_parameters 

2379 

2380 

2381DefaultDialect.execution_ctx_cls = DefaultExecutionContext