Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1040 statements  

1# engine/default.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Default implementations of per-dialect sqlalchemy.engine classes. 

10 

11These are semi-private implementation classes which are only of importance 

12to database dialect authors; dialects will usually use the classes here 

13as the base class for their own corresponding classes. 

14 

15""" 

16 

17from __future__ import annotations 

18 

19import functools 

20import operator 

21import random 

22import re 

23from time import perf_counter 

24import typing 

25from typing import Any 

26from typing import Callable 

27from typing import cast 

28from typing import Dict 

29from typing import Final 

30from typing import List 

31from typing import Mapping 

32from typing import MutableMapping 

33from typing import MutableSequence 

34from typing import Optional 

35from typing import Sequence 

36from typing import Set 

37from typing import Tuple 

38from typing import Type 

39from typing import TYPE_CHECKING 

40from typing import Union 

41import weakref 

42 

43from . import characteristics 

44from . import cursor as _cursor 

45from . import interfaces 

46from .base import Connection 

47from .interfaces import CacheStats 

48from .interfaces import DBAPICursor 

49from .interfaces import Dialect 

50from .interfaces import ExecuteStyle 

51from .interfaces import ExecutionContext 

52from .reflection import ObjectKind 

53from .reflection import ObjectScope 

54from .. import event 

55from .. import exc 

56from .. import pool 

57from .. import util 

58from ..sql import compiler 

59from ..sql import dml 

60from ..sql import expression 

61from ..sql import type_api 

62from ..sql import util as sql_util 

63from ..sql._typing import is_tuple_type 

64from ..sql.base import _NoArg 

65from ..sql.compiler import DDLCompiler 

66from ..sql.compiler import InsertmanyvaluesSentinelOpts 

67from ..sql.compiler import SQLCompiler 

68from ..sql.elements import quoted_name 

69from ..util.typing import Literal 

70from ..util.typing import TupleAny 

71from ..util.typing import Unpack 

72 

73 

74if typing.TYPE_CHECKING: 

75 from types import ModuleType 

76 

77 from .base import Engine 

78 from .cursor import ResultFetchStrategy 

79 from .interfaces import _CoreMultiExecuteParams 

80 from .interfaces import _CoreSingleExecuteParams 

81 from .interfaces import _DBAPICursorDescription 

82 from .interfaces import _DBAPIMultiExecuteParams 

83 from .interfaces import _ExecuteOptions 

84 from .interfaces import _MutableCoreSingleExecuteParams 

85 from .interfaces import _ParamStyle 

86 from .interfaces import DBAPIConnection 

87 from .interfaces import IsolationLevel 

88 from .row import Row 

89 from .url import URL 

90 from ..event import _ListenerFnType 

91 from ..pool import Pool 

92 from ..pool import PoolProxiedConnection 

93 from ..sql import Executable 

94 from ..sql.compiler import Compiled 

95 from ..sql.compiler import Linting 

96 from ..sql.compiler import ResultColumnsEntry 

97 from ..sql.dml import DMLState 

98 from ..sql.dml import UpdateBase 

99 from ..sql.elements import BindParameter 

100 from ..sql.schema import Column 

101 from ..sql.type_api import _BindProcessorType 

102 from ..sql.type_api import _ResultProcessorType 

103 from ..sql.type_api import TypeEngine 

104 

105# When we're handed literal SQL, ensure it's a SELECT query 

106SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE) 

107 

108 

109( 

110 CACHE_HIT, 

111 CACHE_MISS, 

112 CACHING_DISABLED, 

113 NO_CACHE_KEY, 

114 NO_DIALECT_SUPPORT, 

115) = list(CacheStats) 

116 

117 

118class DefaultDialect(Dialect): 

119 """Default implementation of Dialect""" 

120 

121 statement_compiler = compiler.SQLCompiler 

122 ddl_compiler = compiler.DDLCompiler 

123 type_compiler_cls = compiler.GenericTypeCompiler 

124 

125 preparer = compiler.IdentifierPreparer 

126 supports_alter = True 

127 supports_comments = False 

128 supports_constraint_comments = False 

129 inline_comments = False 

130 supports_statement_cache = True 

131 

132 div_is_floordiv = True 

133 

134 bind_typing = interfaces.BindTyping.NONE 

135 

136 include_set_input_sizes: Optional[Set[Any]] = None 

137 exclude_set_input_sizes: Optional[Set[Any]] = None 

138 

139 # the first value we'd get for an autoincrement column. 

140 default_sequence_base = 1 

141 

142 # most DBAPIs happy with this for execute(). 

143 # not cx_oracle. 

144 execute_sequence_format = tuple 

145 

146 supports_schemas = True 

147 supports_views = True 

148 supports_sequences = False 

149 sequences_optional = False 

150 preexecute_autoincrement_sequences = False 

151 supports_identity_columns = False 

152 postfetch_lastrowid = True 

153 favor_returning_over_lastrowid = False 

154 insert_null_pk_still_autoincrements = False 

155 update_returning = False 

156 delete_returning = False 

157 update_returning_multifrom = False 

158 delete_returning_multifrom = False 

159 insert_returning = False 

160 

161 cte_follows_insert = False 

162 

163 supports_native_enum = False 

164 supports_native_boolean = False 

165 supports_native_uuid = False 

166 returns_native_bytes = False 

167 

168 non_native_boolean_check_constraint = True 

169 

170 supports_simple_order_by_label = True 

171 

172 tuple_in_values = False 

173 

174 connection_characteristics = util.immutabledict( 

175 { 

176 "isolation_level": characteristics.IsolationLevelCharacteristic(), 

177 "logging_token": characteristics.LoggingTokenCharacteristic(), 

178 } 

179 ) 

180 

181 engine_config_types: Mapping[str, Any] = util.immutabledict( 

182 { 

183 "pool_timeout": util.asint, 

184 "echo": util.bool_or_str("debug"), 

185 "echo_pool": util.bool_or_str("debug"), 

186 "pool_recycle": util.asint, 

187 "pool_size": util.asint, 

188 "max_overflow": util.asint, 

189 "future": util.asbool, 

190 } 

191 ) 

192 

193 # if the NUMERIC type 

194 # returns decimal.Decimal. 

195 # *not* the FLOAT type however. 

196 supports_native_decimal = False 

197 

198 name = "default" 

199 

200 # length at which to truncate 

201 # any identifier. 

202 max_identifier_length = 9999 

203 _user_defined_max_identifier_length: Optional[int] = None 

204 

205 isolation_level: Optional[str] = None 

206 

207 # sub-categories of max_identifier_length. 

208 # currently these accommodate for MySQL which allows alias names 

209 # of 255 but DDL names only of 64. 

210 max_index_name_length: Optional[int] = None 

211 max_constraint_name_length: Optional[int] = None 

212 

213 supports_sane_rowcount = True 

214 supports_sane_multi_rowcount = True 

215 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {} 

216 default_paramstyle = "named" 

217 

218 supports_default_values = False 

219 """dialect supports INSERT... DEFAULT VALUES syntax""" 

220 

221 supports_default_metavalue = False 

222 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

223 

224 default_metavalue_token = "DEFAULT" 

225 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

226 parenthesis.""" 

227 

228 # not sure if this is a real thing but the compiler will deliver it 

229 # if this is the only flag enabled. 

230 supports_empty_insert = True 

231 """dialect supports INSERT () VALUES ()""" 

232 

233 supports_multivalues_insert = False 

234 

235 use_insertmanyvalues: bool = False 

236 

237 use_insertmanyvalues_wo_returning: bool = False 

238 

239 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = ( 

240 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED 

241 ) 

242 

243 insertmanyvalues_page_size: int = 1000 

244 insertmanyvalues_max_parameters = 32700 

245 

246 supports_is_distinct_from = True 

247 

248 supports_server_side_cursors = False 

249 

250 server_side_cursors = False 

251 

252 # extra record-level locking features (#4860) 

253 supports_for_update_of = False 

254 

255 server_version_info = None 

256 

257 default_schema_name: Optional[str] = None 

258 

259 # indicates symbol names are 

260 # UPPERCASEd if they are case insensitive 

261 # within the database. 

262 # if this is True, the methods normalize_name() 

263 # and denormalize_name() must be provided. 

264 requires_name_normalize = False 

265 

266 is_async = False 

267 

268 has_terminate = False 

269 

270 # TODO: this is not to be part of 2.0. implement rudimentary binary 

271 # literals for SQLite, PostgreSQL, MySQL only within 

272 # _Binary.literal_processor 

273 _legacy_binary_type_literal_encoding = "utf-8" 

274 

275 @util.deprecated_params( 

276 empty_in_strategy=( 

277 "1.4", 

278 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is " 

279 "deprecated, and no longer has any effect. All IN expressions " 

280 "are now rendered using " 

281 'the "expanding parameter" strategy which renders a set of bound' 

282 'expressions, or an "empty set" SELECT, at statement execution' 

283 "time.", 

284 ), 

285 server_side_cursors=( 

286 "1.4", 

287 "The :paramref:`_sa.create_engine.server_side_cursors` parameter " 

288 "is deprecated and will be removed in a future release. Please " 

289 "use the " 

290 ":paramref:`_engine.Connection.execution_options.stream_results` " 

291 "parameter.", 

292 ), 

293 ) 

294 def __init__( 

295 self, 

296 paramstyle: Optional[_ParamStyle] = None, 

297 isolation_level: Optional[IsolationLevel] = None, 

298 dbapi: Optional[ModuleType] = None, 

299 implicit_returning: Literal[True] = True, 

300 supports_native_boolean: Optional[bool] = None, 

301 max_identifier_length: Optional[int] = None, 

302 label_length: Optional[int] = None, 

303 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG, 

304 use_insertmanyvalues: Optional[bool] = None, 

305 # util.deprecated_params decorator cannot render the 

306 # Linting.NO_LINTING constant 

307 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore 

308 server_side_cursors: bool = False, 

309 **kwargs: Any, 

310 ): 

311 if server_side_cursors: 

312 if not self.supports_server_side_cursors: 

313 raise exc.ArgumentError( 

314 "Dialect %s does not support server side cursors" % self 

315 ) 

316 else: 

317 self.server_side_cursors = True 

318 

319 if getattr(self, "use_setinputsizes", False): 

320 util.warn_deprecated( 

321 "The dialect-level use_setinputsizes attribute is " 

322 "deprecated. Please use " 

323 "bind_typing = BindTyping.SETINPUTSIZES", 

324 "2.0", 

325 ) 

326 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES 

327 

328 self.positional = False 

329 self._ischema = None 

330 

331 self.dbapi = dbapi 

332 

333 if paramstyle is not None: 

334 self.paramstyle = paramstyle 

335 elif self.dbapi is not None: 

336 self.paramstyle = self.dbapi.paramstyle 

337 else: 

338 self.paramstyle = self.default_paramstyle 

339 self.positional = self.paramstyle in ( 

340 "qmark", 

341 "format", 

342 "numeric", 

343 "numeric_dollar", 

344 ) 

345 self.identifier_preparer = self.preparer(self) 

346 self._on_connect_isolation_level = isolation_level 

347 

348 legacy_tt_callable = getattr(self, "type_compiler", None) 

349 if legacy_tt_callable is not None: 

350 tt_callable = cast( 

351 Type[compiler.GenericTypeCompiler], 

352 self.type_compiler, 

353 ) 

354 else: 

355 tt_callable = self.type_compiler_cls 

356 

357 self.type_compiler_instance = self.type_compiler = tt_callable(self) 

358 

359 if supports_native_boolean is not None: 

360 self.supports_native_boolean = supports_native_boolean 

361 

362 self._user_defined_max_identifier_length = max_identifier_length 

363 if self._user_defined_max_identifier_length: 

364 self.max_identifier_length = ( 

365 self._user_defined_max_identifier_length 

366 ) 

367 self.label_length = label_length 

368 self.compiler_linting = compiler_linting 

369 

370 if use_insertmanyvalues is not None: 

371 self.use_insertmanyvalues = use_insertmanyvalues 

372 

373 if insertmanyvalues_page_size is not _NoArg.NO_ARG: 

374 self.insertmanyvalues_page_size = insertmanyvalues_page_size 

375 

376 @property 

377 @util.deprecated( 

378 "2.0", 

379 "full_returning is deprecated, please use insert_returning, " 

380 "update_returning, delete_returning", 

381 ) 

382 def full_returning(self): 

383 return ( 

384 self.insert_returning 

385 and self.update_returning 

386 and self.delete_returning 

387 ) 

388 

389 @util.memoized_property 

390 def insert_executemany_returning(self): 

391 """Default implementation for insert_executemany_returning, if not 

392 otherwise overridden by the specific dialect. 

393 

394 The default dialect determines "insert_executemany_returning" is 

395 available if the dialect in use has opted into using the 

396 "use_insertmanyvalues" feature. If they haven't opted into that, then 

397 this attribute is False, unless the dialect in question overrides this 

398 and provides some other implementation (such as the Oracle dialect). 

399 

400 """ 

401 return self.insert_returning and self.use_insertmanyvalues 

402 

403 @util.memoized_property 

404 def insert_executemany_returning_sort_by_parameter_order(self): 

405 """Default implementation for 

406 insert_executemany_returning_deterministic_order, if not otherwise 

407 overridden by the specific dialect. 

408 

409 The default dialect determines "insert_executemany_returning" can have 

410 deterministic order only if the dialect in use has opted into using the 

411 "use_insertmanyvalues" feature, which implements deterministic ordering 

412 using client side sentinel columns only by default. The 

413 "insertmanyvalues" feature also features alternate forms that can 

414 use server-generated PK values as "sentinels", but those are only 

415 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel` 

416 bitflag enables those alternate SQL forms, which are disabled 

417 by default. 

418 

419 If the dialect in use hasn't opted into that, then this attribute is 

420 False, unless the dialect in question overrides this and provides some 

421 other implementation (such as the Oracle dialect). 

422 

423 """ 

424 return self.insert_returning and self.use_insertmanyvalues 

425 

426 update_executemany_returning = False 

427 delete_executemany_returning = False 

428 

429 @util.memoized_property 

430 def loaded_dbapi(self) -> ModuleType: 

431 if self.dbapi is None: 

432 raise exc.InvalidRequestError( 

433 f"Dialect {self} does not have a Python DBAPI established " 

434 "and cannot be used for actual database interaction" 

435 ) 

436 return self.dbapi 

437 

438 @util.memoized_property 

439 def _bind_typing_render_casts(self): 

440 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS 

441 

442 def _ensure_has_table_connection(self, arg): 

443 if not isinstance(arg, Connection): 

444 raise exc.ArgumentError( 

445 "The argument passed to Dialect.has_table() should be a " 

446 "%s, got %s. " 

447 "Additionally, the Dialect.has_table() method is for " 

448 "internal dialect " 

449 "use only; please use " 

450 "``inspect(some_engine).has_table(<tablename>>)`` " 

451 "for public API use." % (Connection, type(arg)) 

452 ) 

453 

454 @util.memoized_property 

455 def _supports_statement_cache(self): 

456 ssc = self.__class__.__dict__.get("supports_statement_cache", None) 

457 if ssc is None: 

458 util.warn( 

459 "Dialect %s:%s will not make use of SQL compilation caching " 

460 "as it does not set the 'supports_statement_cache' attribute " 

461 "to ``True``. This can have " 

462 "significant performance implications including some " 

463 "performance degradations in comparison to prior SQLAlchemy " 

464 "versions. Dialect maintainers should seek to set this " 

465 "attribute to True after appropriate development and testing " 

466 "for SQLAlchemy 1.4 caching support. Alternatively, this " 

467 "attribute may be set to False which will disable this " 

468 "warning." % (self.name, self.driver), 

469 code="cprf", 

470 ) 

471 

472 return bool(ssc) 

473 

474 @util.memoized_property 

475 def _type_memos(self): 

476 return weakref.WeakKeyDictionary() 

477 

478 @property 

479 def dialect_description(self): 

480 return self.name + "+" + self.driver 

481 

482 @property 

483 def supports_sane_rowcount_returning(self): 

484 """True if this dialect supports sane rowcount even if RETURNING is 

485 in use. 

486 

487 For dialects that don't support RETURNING, this is synonymous with 

488 ``supports_sane_rowcount``. 

489 

490 """ 

491 return self.supports_sane_rowcount 

492 

493 @classmethod 

494 def get_pool_class(cls, url: URL) -> Type[Pool]: 

495 default: Type[pool.Pool] 

496 if cls.is_async: 

497 default = pool.AsyncAdaptedQueuePool 

498 else: 

499 default = pool.QueuePool 

500 

501 return getattr(cls, "poolclass", default) 

502 

503 def get_dialect_pool_class(self, url: URL) -> Type[Pool]: 

504 return self.get_pool_class(url) 

505 

506 @classmethod 

507 def load_provisioning(cls): 

508 package = ".".join(cls.__module__.split(".")[0:-1]) 

509 try: 

510 __import__(package + ".provision") 

511 except ImportError: 

512 pass 

513 

514 def _builtin_onconnect(self) -> Optional[_ListenerFnType]: 

515 if self._on_connect_isolation_level is not None: 

516 

517 def builtin_connect(dbapi_conn, conn_rec): 

518 self._assert_and_set_isolation_level( 

519 dbapi_conn, self._on_connect_isolation_level 

520 ) 

521 

522 return builtin_connect 

523 else: 

524 return None 

525 

526 def initialize(self, connection): 

527 try: 

528 self.server_version_info = self._get_server_version_info( 

529 connection 

530 ) 

531 except NotImplementedError: 

532 self.server_version_info = None 

533 try: 

534 self.default_schema_name = self._get_default_schema_name( 

535 connection 

536 ) 

537 except NotImplementedError: 

538 self.default_schema_name = None 

539 

540 try: 

541 self.default_isolation_level = self.get_default_isolation_level( 

542 connection.connection.dbapi_connection 

543 ) 

544 except NotImplementedError: 

545 self.default_isolation_level = None 

546 

547 if not self._user_defined_max_identifier_length: 

548 max_ident_length = self._check_max_identifier_length(connection) 

549 if max_ident_length: 

550 self.max_identifier_length = max_ident_length 

551 

552 if ( 

553 self.label_length 

554 and self.label_length > self.max_identifier_length 

555 ): 

556 raise exc.ArgumentError( 

557 "Label length of %d is greater than this dialect's" 

558 " maximum identifier length of %d" 

559 % (self.label_length, self.max_identifier_length) 

560 ) 

561 

562 def on_connect(self): 

563 # inherits the docstring from interfaces.Dialect.on_connect 

564 return None 

565 

566 def _check_max_identifier_length(self, connection): 

567 """Perform a connection / server version specific check to determine 

568 the max_identifier_length. 

569 

570 If the dialect's class level max_identifier_length should be used, 

571 can return None. 

572 

573 .. versionadded:: 1.3.9 

574 

575 """ 

576 return None 

577 

578 def get_default_isolation_level(self, dbapi_conn): 

579 """Given a DBAPI connection, return its isolation level, or 

580 a default isolation level if one cannot be retrieved. 

581 

582 May be overridden by subclasses in order to provide a 

583 "fallback" isolation level for databases that cannot reliably 

584 retrieve the actual isolation level. 

585 

586 By default, calls the :meth:`_engine.Interfaces.get_isolation_level` 

587 method, propagating any exceptions raised. 

588 

589 .. versionadded:: 1.3.22 

590 

591 """ 

592 return self.get_isolation_level(dbapi_conn) 

593 

594 def type_descriptor(self, typeobj): 

595 """Provide a database-specific :class:`.TypeEngine` object, given 

596 the generic object which comes from the types module. 

597 

598 This method looks for a dictionary called 

599 ``colspecs`` as a class or instance-level variable, 

600 and passes on to :func:`_types.adapt_type`. 

601 

602 """ 

603 return type_api.adapt_type(typeobj, self.colspecs) 

604 

605 def has_index(self, connection, table_name, index_name, schema=None, **kw): 

606 if not self.has_table(connection, table_name, schema=schema, **kw): 

607 return False 

608 for idx in self.get_indexes( 

609 connection, table_name, schema=schema, **kw 

610 ): 

611 if idx["name"] == index_name: 

612 return True 

613 else: 

614 return False 

615 

616 def has_schema( 

617 self, connection: Connection, schema_name: str, **kw: Any 

618 ) -> bool: 

619 return schema_name in self.get_schema_names(connection, **kw) 

620 

621 def validate_identifier(self, ident): 

622 if len(ident) > self.max_identifier_length: 

623 raise exc.IdentifierError( 

624 "Identifier '%s' exceeds maximum length of %d characters" 

625 % (ident, self.max_identifier_length) 

626 ) 

627 

628 def connect(self, *cargs, **cparams): 

629 # inherits the docstring from interfaces.Dialect.connect 

630 return self.loaded_dbapi.connect(*cargs, **cparams) 

631 

632 def create_connect_args(self, url): 

633 # inherits the docstring from interfaces.Dialect.create_connect_args 

634 opts = url.translate_connect_args() 

635 opts.update(url.query) 

636 return ([], opts) 

637 

638 def set_engine_execution_options( 

639 self, engine: Engine, opts: Mapping[str, Any] 

640 ) -> None: 

641 supported_names = set(self.connection_characteristics).intersection( 

642 opts 

643 ) 

644 if supported_names: 

645 characteristics: Mapping[str, Any] = util.immutabledict( 

646 (name, opts[name]) for name in supported_names 

647 ) 

648 

649 @event.listens_for(engine, "engine_connect") 

650 def set_connection_characteristics(connection): 

651 self._set_connection_characteristics( 

652 connection, characteristics 

653 ) 

654 

655 def set_connection_execution_options( 

656 self, connection: Connection, opts: Mapping[str, Any] 

657 ) -> None: 

658 supported_names = set(self.connection_characteristics).intersection( 

659 opts 

660 ) 

661 if supported_names: 

662 characteristics: Mapping[str, Any] = util.immutabledict( 

663 (name, opts[name]) for name in supported_names 

664 ) 

665 self._set_connection_characteristics(connection, characteristics) 

666 

667 def _set_connection_characteristics(self, connection, characteristics): 

668 characteristic_values = [ 

669 (name, self.connection_characteristics[name], value) 

670 for name, value in characteristics.items() 

671 ] 

672 

673 if connection.in_transaction(): 

674 trans_objs = [ 

675 (name, obj) 

676 for name, obj, _ in characteristic_values 

677 if obj.transactional 

678 ] 

679 if trans_objs: 

680 raise exc.InvalidRequestError( 

681 "This connection has already initialized a SQLAlchemy " 

682 "Transaction() object via begin() or autobegin; " 

683 "%s may not be altered unless rollback() or commit() " 

684 "is called first." 

685 % (", ".join(name for name, obj in trans_objs)) 

686 ) 

687 

688 dbapi_connection = connection.connection.dbapi_connection 

689 for _, characteristic, value in characteristic_values: 

690 characteristic.set_connection_characteristic( 

691 self, connection, dbapi_connection, value 

692 ) 

693 connection.connection._connection_record.finalize_callback.append( 

694 functools.partial(self._reset_characteristics, characteristics) 

695 ) 

696 

697 def _reset_characteristics(self, characteristics, dbapi_connection): 

698 for characteristic_name in characteristics: 

699 characteristic = self.connection_characteristics[ 

700 characteristic_name 

701 ] 

702 characteristic.reset_characteristic(self, dbapi_connection) 

703 

704 def do_begin(self, dbapi_connection): 

705 pass 

706 

707 def do_rollback(self, dbapi_connection): 

708 dbapi_connection.rollback() 

709 

710 def do_commit(self, dbapi_connection): 

711 dbapi_connection.commit() 

712 

713 def do_terminate(self, dbapi_connection): 

714 self.do_close(dbapi_connection) 

715 

716 def do_close(self, dbapi_connection): 

717 dbapi_connection.close() 

718 

719 @util.memoized_property 

720 def _dialect_specific_select_one(self): 

721 return str(expression.select(1).compile(dialect=self)) 

722 

723 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: 

724 try: 

725 return self.do_ping(dbapi_connection) 

726 except self.loaded_dbapi.Error as err: 

727 is_disconnect = self.is_disconnect(err, dbapi_connection, None) 

728 

729 if self._has_events: 

730 try: 

731 Connection._handle_dbapi_exception_noconnection( 

732 err, 

733 self, 

734 is_disconnect=is_disconnect, 

735 invalidate_pool_on_disconnect=False, 

736 is_pre_ping=True, 

737 ) 

738 except exc.StatementError as new_err: 

739 is_disconnect = new_err.connection_invalidated 

740 

741 if is_disconnect: 

742 return False 

743 else: 

744 raise 

745 

746 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool: 

747 cursor = None 

748 

749 cursor = dbapi_connection.cursor() 

750 try: 

751 cursor.execute(self._dialect_specific_select_one) 

752 finally: 

753 cursor.close() 

754 return True 

755 

756 def create_xid(self): 

757 """Create a random two-phase transaction ID. 

758 

759 This id will be passed to do_begin_twophase(), do_rollback_twophase(), 

760 do_commit_twophase(). Its format is unspecified. 

761 """ 

762 

763 return "_sa_%032x" % random.randint(0, 2**128) 

764 

765 def do_savepoint(self, connection, name): 

766 connection.execute(expression.SavepointClause(name)) 

767 

768 def do_rollback_to_savepoint(self, connection, name): 

769 connection.execute(expression.RollbackToSavepointClause(name)) 

770 

771 def do_release_savepoint(self, connection, name): 

772 connection.execute(expression.ReleaseSavepointClause(name)) 

773 

774 def _deliver_insertmanyvalues_batches( 

775 self, 

776 connection, 

777 cursor, 

778 statement, 

779 parameters, 

780 generic_setinputsizes, 

781 context, 

782 ): 

783 context = cast(DefaultExecutionContext, context) 

784 compiled = cast(SQLCompiler, context.compiled) 

785 

786 _composite_sentinel_proc: Sequence[ 

787 Optional[_ResultProcessorType[Any]] 

788 ] = () 

789 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None 

790 _sentinel_proc_initialized: bool = False 

791 

792 compiled_parameters = context.compiled_parameters 

793 

794 imv = compiled._insertmanyvalues 

795 assert imv is not None 

796 

797 is_returning: Final[bool] = bool(compiled.effective_returning) 

798 batch_size = context.execution_options.get( 

799 "insertmanyvalues_page_size", self.insertmanyvalues_page_size 

800 ) 

801 

802 if compiled.schema_translate_map: 

803 schema_translate_map = context.execution_options.get( 

804 "schema_translate_map", {} 

805 ) 

806 else: 

807 schema_translate_map = None 

808 

809 if is_returning: 

810 result: Optional[List[Any]] = [] 

811 context._insertmanyvalues_rows = result 

812 

813 sort_by_parameter_order = imv.sort_by_parameter_order 

814 

815 else: 

816 sort_by_parameter_order = False 

817 result = None 

818 

819 for imv_batch in compiled._deliver_insertmanyvalues_batches( 

820 statement, 

821 parameters, 

822 compiled_parameters, 

823 generic_setinputsizes, 

824 batch_size, 

825 sort_by_parameter_order, 

826 schema_translate_map, 

827 ): 

828 yield imv_batch 

829 

830 if is_returning: 

831 

832 try: 

833 rows = context.fetchall_for_returning(cursor) 

834 except BaseException as be: 

835 connection._handle_dbapi_exception( 

836 be, 

837 sql_util._long_statement(imv_batch.replaced_statement), 

838 imv_batch.replaced_parameters, 

839 None, 

840 context, 

841 is_sub_exec=True, 

842 ) 

843 

844 # I would have thought "is_returning: Final[bool]" 

845 # would have assured this but pylance thinks not 

846 assert result is not None 

847 

848 if imv.num_sentinel_columns and not imv_batch.is_downgraded: 

849 composite_sentinel = imv.num_sentinel_columns > 1 

850 if imv.implicit_sentinel: 

851 # for implicit sentinel, which is currently single-col 

852 # integer autoincrement, do a simple sort. 

853 assert not composite_sentinel 

854 result.extend( 

855 sorted(rows, key=operator.itemgetter(-1)) 

856 ) 

857 continue 

858 

859 # otherwise, create dictionaries to match up batches 

860 # with parameters 

861 assert imv.sentinel_param_keys 

862 assert imv.sentinel_columns 

863 

864 _nsc = imv.num_sentinel_columns 

865 

866 if not _sentinel_proc_initialized: 

867 if composite_sentinel: 

868 _composite_sentinel_proc = [ 

869 col.type._cached_result_processor( 

870 self, cursor_desc[1] 

871 ) 

872 for col, cursor_desc in zip( 

873 imv.sentinel_columns, 

874 cursor.description[-_nsc:], 

875 ) 

876 ] 

877 else: 

878 _scalar_sentinel_proc = ( 

879 imv.sentinel_columns[0] 

880 ).type._cached_result_processor( 

881 self, cursor.description[-1][1] 

882 ) 

883 _sentinel_proc_initialized = True 

884 

885 rows_by_sentinel: Union[ 

886 Dict[Tuple[Any, ...], Any], 

887 Dict[Any, Any], 

888 ] 

889 if composite_sentinel: 

890 rows_by_sentinel = { 

891 tuple( 

892 (proc(val) if proc else val) 

893 for val, proc in zip( 

894 row[-_nsc:], _composite_sentinel_proc 

895 ) 

896 ): row 

897 for row in rows 

898 } 

899 elif _scalar_sentinel_proc: 

900 rows_by_sentinel = { 

901 _scalar_sentinel_proc(row[-1]): row for row in rows 

902 } 

903 else: 

904 rows_by_sentinel = {row[-1]: row for row in rows} 

905 

906 if len(rows_by_sentinel) != len(imv_batch.batch): 

907 # see test_insert_exec.py:: 

908 # IMVSentinelTest::test_sentinel_incorrect_rowcount 

909 # for coverage / demonstration 

910 raise exc.InvalidRequestError( 

911 f"Sentinel-keyed result set did not produce " 

912 f"correct number of rows {len(imv_batch.batch)}; " 

913 "produced " 

914 f"{len(rows_by_sentinel)}. Please ensure the " 

915 "sentinel column is fully unique and populated in " 

916 "all cases." 

917 ) 

918 

919 try: 

920 ordered_rows = [ 

921 rows_by_sentinel[sentinel_keys] 

922 for sentinel_keys in imv_batch.sentinel_values 

923 ] 

924 except KeyError as ke: 

925 # see test_insert_exec.py:: 

926 # IMVSentinelTest::test_sentinel_cant_match_keys 

927 # for coverage / demonstration 

928 raise exc.InvalidRequestError( 

929 f"Can't match sentinel values in result set to " 

930 f"parameter sets; key {ke.args[0]!r} was not " 

931 "found. " 

932 "There may be a mismatch between the datatype " 

933 "passed to the DBAPI driver vs. that which it " 

934 "returns in a result row. Ensure the given " 

935 "Python value matches the expected result type " 

936 "*exactly*, taking care to not rely upon implicit " 

937 "conversions which may occur such as when using " 

938 "strings in place of UUID or integer values, etc. " 

939 ) from ke 

940 

941 result.extend(ordered_rows) 

942 

943 else: 

944 result.extend(rows) 

945 

946 def do_executemany(self, cursor, statement, parameters, context=None): 

947 cursor.executemany(statement, parameters) 

948 

949 def do_execute(self, cursor, statement, parameters, context=None): 

950 cursor.execute(statement, parameters) 

951 

952 def do_execute_no_params(self, cursor, statement, context=None): 

953 cursor.execute(statement) 

954 

955 def is_disconnect(self, e, connection, cursor): 

956 return False 

957 

958 @util.memoized_instancemethod 

959 def _gen_allowed_isolation_levels(self, dbapi_conn): 

960 try: 

961 raw_levels = list(self.get_isolation_level_values(dbapi_conn)) 

962 except NotImplementedError: 

963 return None 

964 else: 

965 normalized_levels = [ 

966 level.replace("_", " ").upper() for level in raw_levels 

967 ] 

968 if raw_levels != normalized_levels: 

969 raise ValueError( 

970 f"Dialect {self.name!r} get_isolation_level_values() " 

971 f"method should return names as UPPERCASE using spaces, " 

972 f"not underscores; got " 

973 f"{sorted(set(raw_levels).difference(normalized_levels))}" 

974 ) 

975 return tuple(normalized_levels) 

976 

977 def _assert_and_set_isolation_level(self, dbapi_conn, level): 

978 level = level.replace("_", " ").upper() 

979 

980 _allowed_isolation_levels = self._gen_allowed_isolation_levels( 

981 dbapi_conn 

982 ) 

983 if ( 

984 _allowed_isolation_levels 

985 and level not in _allowed_isolation_levels 

986 ): 

987 raise exc.ArgumentError( 

988 f"Invalid value {level!r} for isolation_level. " 

989 f"Valid isolation levels for {self.name!r} are " 

990 f"{', '.join(_allowed_isolation_levels)}" 

991 ) 

992 

993 self.set_isolation_level(dbapi_conn, level) 

994 

995 def reset_isolation_level(self, dbapi_conn): 

996 if self._on_connect_isolation_level is not None: 

997 assert ( 

998 self._on_connect_isolation_level == "AUTOCOMMIT" 

999 or self._on_connect_isolation_level 

1000 == self.default_isolation_level 

1001 ) 

1002 self._assert_and_set_isolation_level( 

1003 dbapi_conn, self._on_connect_isolation_level 

1004 ) 

1005 else: 

1006 assert self.default_isolation_level is not None 

1007 self._assert_and_set_isolation_level( 

1008 dbapi_conn, 

1009 self.default_isolation_level, 

1010 ) 

1011 

1012 def normalize_name(self, name): 

1013 if name is None: 

1014 return None 

1015 

1016 name_lower = name.lower() 

1017 name_upper = name.upper() 

1018 

1019 if name_upper == name_lower: 

1020 # name has no upper/lower conversion, e.g. non-european characters. 

1021 # return unchanged 

1022 return name 

1023 elif name_upper == name and not ( 

1024 self.identifier_preparer._requires_quotes 

1025 )(name_lower): 

1026 # name is all uppercase and doesn't require quoting; normalize 

1027 # to all lower case 

1028 return name_lower 

1029 elif name_lower == name: 

1030 # name is all lower case, which if denormalized means we need to 

1031 # force quoting on it 

1032 return quoted_name(name, quote=True) 

1033 else: 

1034 # name is mixed case, means it will be quoted in SQL when used 

1035 # later, no normalizes 

1036 return name 

1037 

1038 def denormalize_name(self, name): 

1039 if name is None: 

1040 return None 

1041 

1042 name_lower = name.lower() 

1043 name_upper = name.upper() 

1044 

1045 if name_upper == name_lower: 

1046 # name has no upper/lower conversion, e.g. non-european characters. 

1047 # return unchanged 

1048 return name 

1049 elif name_lower == name and not ( 

1050 self.identifier_preparer._requires_quotes 

1051 )(name_lower): 

1052 name = name_upper 

1053 return name 

1054 

1055 def get_driver_connection(self, connection): 

1056 return connection 

1057 

1058 def _overrides_default(self, method): 

1059 return ( 

1060 getattr(type(self), method).__code__ 

1061 is not getattr(DefaultDialect, method).__code__ 

1062 ) 

1063 

1064 def _default_multi_reflect( 

1065 self, 

1066 single_tbl_method, 

1067 connection, 

1068 kind, 

1069 schema, 

1070 filter_names, 

1071 scope, 

1072 **kw, 

1073 ): 

1074 names_fns = [] 

1075 temp_names_fns = [] 

1076 if ObjectKind.TABLE in kind: 

1077 names_fns.append(self.get_table_names) 

1078 temp_names_fns.append(self.get_temp_table_names) 

1079 if ObjectKind.VIEW in kind: 

1080 names_fns.append(self.get_view_names) 

1081 temp_names_fns.append(self.get_temp_view_names) 

1082 if ObjectKind.MATERIALIZED_VIEW in kind: 

1083 names_fns.append(self.get_materialized_view_names) 

1084 # no temp materialized view at the moment 

1085 # temp_names_fns.append(self.get_temp_materialized_view_names) 

1086 

1087 unreflectable = kw.pop("unreflectable", {}) 

1088 

1089 if ( 

1090 filter_names 

1091 and scope is ObjectScope.ANY 

1092 and kind is ObjectKind.ANY 

1093 ): 

1094 # if names are given and no qualification on type of table 

1095 # (i.e. the Table(..., autoload) case), take the names as given, 

1096 # don't run names queries. If a table does not exit 

1097 # NoSuchTableError is raised and it's skipped 

1098 

1099 # this also suits the case for mssql where we can reflect 

1100 # individual temp tables but there's no temp_names_fn 

1101 names = filter_names 

1102 else: 

1103 names = [] 

1104 name_kw = {"schema": schema, **kw} 

1105 fns = [] 

1106 if ObjectScope.DEFAULT in scope: 

1107 fns.extend(names_fns) 

1108 if ObjectScope.TEMPORARY in scope: 

1109 fns.extend(temp_names_fns) 

1110 

1111 for fn in fns: 

1112 try: 

1113 names.extend(fn(connection, **name_kw)) 

1114 except NotImplementedError: 

1115 pass 

1116 

1117 if filter_names: 

1118 filter_names = set(filter_names) 

1119 

1120 # iterate over all the tables/views and call the single table method 

1121 for table in names: 

1122 if not filter_names or table in filter_names: 

1123 key = (schema, table) 

1124 try: 

1125 yield ( 

1126 key, 

1127 single_tbl_method( 

1128 connection, table, schema=schema, **kw 

1129 ), 

1130 ) 

1131 except exc.UnreflectableTableError as err: 

1132 if key not in unreflectable: 

1133 unreflectable[key] = err 

1134 except exc.NoSuchTableError: 

1135 pass 

1136 

1137 def get_multi_table_options(self, connection, **kw): 

1138 return self._default_multi_reflect( 

1139 self.get_table_options, connection, **kw 

1140 ) 

1141 

1142 def get_multi_columns(self, connection, **kw): 

1143 return self._default_multi_reflect(self.get_columns, connection, **kw) 

1144 

1145 def get_multi_pk_constraint(self, connection, **kw): 

1146 return self._default_multi_reflect( 

1147 self.get_pk_constraint, connection, **kw 

1148 ) 

1149 

1150 def get_multi_foreign_keys(self, connection, **kw): 

1151 return self._default_multi_reflect( 

1152 self.get_foreign_keys, connection, **kw 

1153 ) 

1154 

1155 def get_multi_indexes(self, connection, **kw): 

1156 return self._default_multi_reflect(self.get_indexes, connection, **kw) 

1157 

1158 def get_multi_unique_constraints(self, connection, **kw): 

1159 return self._default_multi_reflect( 

1160 self.get_unique_constraints, connection, **kw 

1161 ) 

1162 

1163 def get_multi_check_constraints(self, connection, **kw): 

1164 return self._default_multi_reflect( 

1165 self.get_check_constraints, connection, **kw 

1166 ) 

1167 

1168 def get_multi_table_comment(self, connection, **kw): 

1169 return self._default_multi_reflect( 

1170 self.get_table_comment, connection, **kw 

1171 ) 

1172 

1173 

1174class StrCompileDialect(DefaultDialect): 

1175 statement_compiler = compiler.StrSQLCompiler 

1176 ddl_compiler = compiler.DDLCompiler 

1177 type_compiler_cls = compiler.StrSQLTypeCompiler 

1178 preparer = compiler.IdentifierPreparer 

1179 

1180 insert_returning = True 

1181 update_returning = True 

1182 delete_returning = True 

1183 

1184 supports_statement_cache = True 

1185 

1186 supports_identity_columns = True 

1187 

1188 supports_sequences = True 

1189 sequences_optional = True 

1190 preexecute_autoincrement_sequences = False 

1191 

1192 supports_native_boolean = True 

1193 

1194 supports_multivalues_insert = True 

1195 supports_simple_order_by_label = True 

1196 

1197 

1198class DefaultExecutionContext(ExecutionContext): 

1199 isinsert = False 

1200 isupdate = False 

1201 isdelete = False 

1202 is_crud = False 

1203 is_text = False 

1204 isddl = False 

1205 

1206 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE 

1207 

1208 compiled: Optional[Compiled] = None 

1209 result_column_struct: Optional[ 

1210 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool] 

1211 ] = None 

1212 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None 

1213 

1214 execution_options: _ExecuteOptions = util.EMPTY_DICT 

1215 

1216 cursor_fetch_strategy = _cursor._DEFAULT_FETCH 

1217 

1218 invoked_statement: Optional[Executable] = None 

1219 

1220 _is_implicit_returning = False 

1221 _is_explicit_returning = False 

1222 _is_supplemental_returning = False 

1223 _is_server_side = False 

1224 

1225 _soft_closed = False 

1226 

1227 _rowcount: Optional[int] = None 

1228 

1229 # a hook for SQLite's translation of 

1230 # result column names 

1231 # NOTE: pyhive is using this hook, can't remove it :( 

1232 _translate_colname: Optional[Callable[[str], str]] = None 

1233 

1234 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict() 

1235 """used by set_input_sizes(). 

1236 

1237 This collection comes from ``ExpandedState.parameter_expansion``. 

1238 

1239 """ 

1240 

1241 cache_hit = NO_CACHE_KEY 

1242 

1243 root_connection: Connection 

1244 _dbapi_connection: PoolProxiedConnection 

1245 dialect: Dialect 

1246 unicode_statement: str 

1247 cursor: DBAPICursor 

1248 compiled_parameters: List[_MutableCoreSingleExecuteParams] 

1249 parameters: _DBAPIMultiExecuteParams 

1250 extracted_parameters: Optional[Sequence[BindParameter[Any]]] 

1251 

1252 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) 

1253 

1254 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None 

1255 _num_sentinel_cols: int = 0 

1256 

1257 @classmethod 

1258 def _init_ddl( 

1259 cls, 

1260 dialect: Dialect, 

1261 connection: Connection, 

1262 dbapi_connection: PoolProxiedConnection, 

1263 execution_options: _ExecuteOptions, 

1264 compiled_ddl: DDLCompiler, 

1265 ) -> ExecutionContext: 

1266 """Initialize execution context for an ExecutableDDLElement 

1267 construct.""" 

1268 

1269 self = cls.__new__(cls) 

1270 self.root_connection = connection 

1271 self._dbapi_connection = dbapi_connection 

1272 self.dialect = connection.dialect 

1273 

1274 self.compiled = compiled = compiled_ddl 

1275 self.isddl = True 

1276 

1277 self.execution_options = execution_options 

1278 

1279 self.unicode_statement = str(compiled) 

1280 if compiled.schema_translate_map: 

1281 schema_translate_map = self.execution_options.get( 

1282 "schema_translate_map", {} 

1283 ) 

1284 

1285 rst = compiled.preparer._render_schema_translates 

1286 self.unicode_statement = rst( 

1287 self.unicode_statement, schema_translate_map 

1288 ) 

1289 

1290 self.statement = self.unicode_statement 

1291 

1292 self.cursor = self.create_cursor() 

1293 self.compiled_parameters = [] 

1294 

1295 if dialect.positional: 

1296 self.parameters = [dialect.execute_sequence_format()] 

1297 else: 

1298 self.parameters = [self._empty_dict_params] 

1299 

1300 return self 

1301 

1302 @classmethod 

1303 def _init_compiled( 

1304 cls, 

1305 dialect: Dialect, 

1306 connection: Connection, 

1307 dbapi_connection: PoolProxiedConnection, 

1308 execution_options: _ExecuteOptions, 

1309 compiled: SQLCompiler, 

1310 parameters: _CoreMultiExecuteParams, 

1311 invoked_statement: Executable, 

1312 extracted_parameters: Optional[Sequence[BindParameter[Any]]], 

1313 cache_hit: CacheStats = CacheStats.CACHING_DISABLED, 

1314 ) -> ExecutionContext: 

1315 """Initialize execution context for a Compiled construct.""" 

1316 

1317 self = cls.__new__(cls) 

1318 self.root_connection = connection 

1319 self._dbapi_connection = dbapi_connection 

1320 self.dialect = connection.dialect 

1321 self.extracted_parameters = extracted_parameters 

1322 self.invoked_statement = invoked_statement 

1323 self.compiled = compiled 

1324 self.cache_hit = cache_hit 

1325 

1326 self.execution_options = execution_options 

1327 

1328 self.result_column_struct = ( 

1329 compiled._result_columns, 

1330 compiled._ordered_columns, 

1331 compiled._textual_ordered_columns, 

1332 compiled._ad_hoc_textual, 

1333 compiled._loose_column_name_matching, 

1334 ) 

1335 

1336 self.isinsert = ii = compiled.isinsert 

1337 self.isupdate = iu = compiled.isupdate 

1338 self.isdelete = id_ = compiled.isdelete 

1339 self.is_text = compiled.isplaintext 

1340 

1341 if ii or iu or id_: 

1342 dml_statement = compiled.compile_state.statement # type: ignore 

1343 if TYPE_CHECKING: 

1344 assert isinstance(dml_statement, UpdateBase) 

1345 self.is_crud = True 

1346 self._is_explicit_returning = ier = bool(dml_statement._returning) 

1347 self._is_implicit_returning = iir = bool( 

1348 compiled.implicit_returning 

1349 ) 

1350 if iir and dml_statement._supplemental_returning: 

1351 self._is_supplemental_returning = True 

1352 

1353 # dont mix implicit and explicit returning 

1354 assert not (iir and ier) 

1355 

1356 if (ier or iir) and compiled.for_executemany: 

1357 if ii and not self.dialect.insert_executemany_returning: 

1358 raise exc.InvalidRequestError( 

1359 f"Dialect {self.dialect.dialect_description} with " 

1360 f"current server capabilities does not support " 

1361 "INSERT..RETURNING when executemany is used" 

1362 ) 

1363 elif ( 

1364 ii 

1365 and dml_statement._sort_by_parameter_order 

1366 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501 

1367 ): 

1368 raise exc.InvalidRequestError( 

1369 f"Dialect {self.dialect.dialect_description} with " 

1370 f"current server capabilities does not support " 

1371 "INSERT..RETURNING with deterministic row ordering " 

1372 "when executemany is used" 

1373 ) 

1374 elif ( 

1375 ii 

1376 and self.dialect.use_insertmanyvalues 

1377 and not compiled._insertmanyvalues 

1378 ): 

1379 raise exc.InvalidRequestError( 

1380 'Statement does not have "insertmanyvalues" ' 

1381 "enabled, can't use INSERT..RETURNING with " 

1382 "executemany in this case." 

1383 ) 

1384 elif iu and not self.dialect.update_executemany_returning: 

1385 raise exc.InvalidRequestError( 

1386 f"Dialect {self.dialect.dialect_description} with " 

1387 f"current server capabilities does not support " 

1388 "UPDATE..RETURNING when executemany is used" 

1389 ) 

1390 elif id_ and not self.dialect.delete_executemany_returning: 

1391 raise exc.InvalidRequestError( 

1392 f"Dialect {self.dialect.dialect_description} with " 

1393 f"current server capabilities does not support " 

1394 "DELETE..RETURNING when executemany is used" 

1395 ) 

1396 

1397 if not parameters: 

1398 self.compiled_parameters = [ 

1399 compiled.construct_params( 

1400 extracted_parameters=extracted_parameters, 

1401 escape_names=False, 

1402 ) 

1403 ] 

1404 else: 

1405 self.compiled_parameters = [ 

1406 compiled.construct_params( 

1407 m, 

1408 escape_names=False, 

1409 _group_number=grp, 

1410 extracted_parameters=extracted_parameters, 

1411 ) 

1412 for grp, m in enumerate(parameters) 

1413 ] 

1414 

1415 if len(parameters) > 1: 

1416 if self.isinsert and compiled._insertmanyvalues: 

1417 self.execute_style = ExecuteStyle.INSERTMANYVALUES 

1418 

1419 imv = compiled._insertmanyvalues 

1420 if imv.sentinel_columns is not None: 

1421 self._num_sentinel_cols = imv.num_sentinel_columns 

1422 else: 

1423 self.execute_style = ExecuteStyle.EXECUTEMANY 

1424 

1425 self.unicode_statement = compiled.string 

1426 

1427 self.cursor = self.create_cursor() 

1428 

1429 if self.compiled.insert_prefetch or self.compiled.update_prefetch: 

1430 self._process_execute_defaults() 

1431 

1432 processors = compiled._bind_processors 

1433 

1434 flattened_processors: Mapping[ 

1435 str, _BindProcessorType[Any] 

1436 ] = processors # type: ignore[assignment] 

1437 

1438 if compiled.literal_execute_params or compiled.post_compile_params: 

1439 if self.executemany: 

1440 raise exc.InvalidRequestError( 

1441 "'literal_execute' or 'expanding' parameters can't be " 

1442 "used with executemany()" 

1443 ) 

1444 

1445 expanded_state = compiled._process_parameters_for_postcompile( 

1446 self.compiled_parameters[0] 

1447 ) 

1448 

1449 # re-assign self.unicode_statement 

1450 self.unicode_statement = expanded_state.statement 

1451 

1452 self._expanded_parameters = expanded_state.parameter_expansion 

1453 

1454 flattened_processors = dict(processors) # type: ignore 

1455 flattened_processors.update(expanded_state.processors) 

1456 positiontup = expanded_state.positiontup 

1457 elif compiled.positional: 

1458 positiontup = self.compiled.positiontup 

1459 else: 

1460 positiontup = None 

1461 

1462 if compiled.schema_translate_map: 

1463 schema_translate_map = self.execution_options.get( 

1464 "schema_translate_map", {} 

1465 ) 

1466 rst = compiled.preparer._render_schema_translates 

1467 self.unicode_statement = rst( 

1468 self.unicode_statement, schema_translate_map 

1469 ) 

1470 

1471 # final self.unicode_statement is now assigned, encode if needed 

1472 # by dialect 

1473 self.statement = self.unicode_statement 

1474 

1475 # Convert the dictionary of bind parameter values 

1476 # into a dict or list to be sent to the DBAPI's 

1477 # execute() or executemany() method. 

1478 

1479 if compiled.positional: 

1480 core_positional_parameters: MutableSequence[Sequence[Any]] = [] 

1481 assert positiontup is not None 

1482 for compiled_params in self.compiled_parameters: 

1483 l_param: List[Any] = [ 

1484 ( 

1485 flattened_processors[key](compiled_params[key]) 

1486 if key in flattened_processors 

1487 else compiled_params[key] 

1488 ) 

1489 for key in positiontup 

1490 ] 

1491 core_positional_parameters.append( 

1492 dialect.execute_sequence_format(l_param) 

1493 ) 

1494 

1495 self.parameters = core_positional_parameters 

1496 else: 

1497 core_dict_parameters: MutableSequence[Dict[str, Any]] = [] 

1498 escaped_names = compiled.escaped_bind_names 

1499 

1500 # note that currently, "expanded" parameters will be present 

1501 # in self.compiled_parameters in their quoted form. This is 

1502 # slightly inconsistent with the approach taken as of 

1503 # #8056 where self.compiled_parameters is meant to contain unquoted 

1504 # param names. 

1505 d_param: Dict[str, Any] 

1506 for compiled_params in self.compiled_parameters: 

1507 if escaped_names: 

1508 d_param = { 

1509 escaped_names.get(key, key): ( 

1510 flattened_processors[key](compiled_params[key]) 

1511 if key in flattened_processors 

1512 else compiled_params[key] 

1513 ) 

1514 for key in compiled_params 

1515 } 

1516 else: 

1517 d_param = { 

1518 key: ( 

1519 flattened_processors[key](compiled_params[key]) 

1520 if key in flattened_processors 

1521 else compiled_params[key] 

1522 ) 

1523 for key in compiled_params 

1524 } 

1525 

1526 core_dict_parameters.append(d_param) 

1527 

1528 self.parameters = core_dict_parameters 

1529 

1530 return self 

1531 

1532 @classmethod 

1533 def _init_statement( 

1534 cls, 

1535 dialect: Dialect, 

1536 connection: Connection, 

1537 dbapi_connection: PoolProxiedConnection, 

1538 execution_options: _ExecuteOptions, 

1539 statement: str, 

1540 parameters: _DBAPIMultiExecuteParams, 

1541 ) -> ExecutionContext: 

1542 """Initialize execution context for a string SQL statement.""" 

1543 

1544 self = cls.__new__(cls) 

1545 self.root_connection = connection 

1546 self._dbapi_connection = dbapi_connection 

1547 self.dialect = connection.dialect 

1548 self.is_text = True 

1549 

1550 self.execution_options = execution_options 

1551 

1552 if not parameters: 

1553 if self.dialect.positional: 

1554 self.parameters = [dialect.execute_sequence_format()] 

1555 else: 

1556 self.parameters = [self._empty_dict_params] 

1557 elif isinstance(parameters[0], dialect.execute_sequence_format): 

1558 self.parameters = parameters 

1559 elif isinstance(parameters[0], dict): 

1560 self.parameters = parameters 

1561 else: 

1562 self.parameters = [ 

1563 dialect.execute_sequence_format(p) for p in parameters 

1564 ] 

1565 

1566 if len(parameters) > 1: 

1567 self.execute_style = ExecuteStyle.EXECUTEMANY 

1568 

1569 self.statement = self.unicode_statement = statement 

1570 

1571 self.cursor = self.create_cursor() 

1572 return self 

1573 

1574 @classmethod 

1575 def _init_default( 

1576 cls, 

1577 dialect: Dialect, 

1578 connection: Connection, 

1579 dbapi_connection: PoolProxiedConnection, 

1580 execution_options: _ExecuteOptions, 

1581 ) -> ExecutionContext: 

1582 """Initialize execution context for a ColumnDefault construct.""" 

1583 

1584 self = cls.__new__(cls) 

1585 self.root_connection = connection 

1586 self._dbapi_connection = dbapi_connection 

1587 self.dialect = connection.dialect 

1588 

1589 self.execution_options = execution_options 

1590 

1591 self.cursor = self.create_cursor() 

1592 return self 

1593 

1594 def _get_cache_stats(self) -> str: 

1595 if self.compiled is None: 

1596 return "raw sql" 

1597 

1598 now = perf_counter() 

1599 

1600 ch = self.cache_hit 

1601 

1602 gen_time = self.compiled._gen_time 

1603 assert gen_time is not None 

1604 

1605 if ch is NO_CACHE_KEY: 

1606 return "no key %.5fs" % (now - gen_time,) 

1607 elif ch is CACHE_HIT: 

1608 return "cached since %.4gs ago" % (now - gen_time,) 

1609 elif ch is CACHE_MISS: 

1610 return "generated in %.5fs" % (now - gen_time,) 

1611 elif ch is CACHING_DISABLED: 

1612 if "_cache_disable_reason" in self.execution_options: 

1613 return "caching disabled (%s) %.5fs " % ( 

1614 self.execution_options["_cache_disable_reason"], 

1615 now - gen_time, 

1616 ) 

1617 else: 

1618 return "caching disabled %.5fs" % (now - gen_time,) 

1619 elif ch is NO_DIALECT_SUPPORT: 

1620 return "dialect %s+%s does not support caching %.5fs" % ( 

1621 self.dialect.name, 

1622 self.dialect.driver, 

1623 now - gen_time, 

1624 ) 

1625 else: 

1626 return "unknown" 

1627 

1628 @property 

1629 def executemany(self): 

1630 return self.execute_style in ( 

1631 ExecuteStyle.EXECUTEMANY, 

1632 ExecuteStyle.INSERTMANYVALUES, 

1633 ) 

1634 

1635 @util.memoized_property 

1636 def identifier_preparer(self): 

1637 if self.compiled: 

1638 return self.compiled.preparer 

1639 elif "schema_translate_map" in self.execution_options: 

1640 return self.dialect.identifier_preparer._with_schema_translate( 

1641 self.execution_options["schema_translate_map"] 

1642 ) 

1643 else: 

1644 return self.dialect.identifier_preparer 

1645 

1646 @util.memoized_property 

1647 def engine(self): 

1648 return self.root_connection.engine 

1649 

1650 @util.memoized_property 

1651 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1652 if TYPE_CHECKING: 

1653 assert isinstance(self.compiled, SQLCompiler) 

1654 return self.compiled.postfetch 

1655 

1656 @util.memoized_property 

1657 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1658 if TYPE_CHECKING: 

1659 assert isinstance(self.compiled, SQLCompiler) 

1660 if self.isinsert: 

1661 return self.compiled.insert_prefetch 

1662 elif self.isupdate: 

1663 return self.compiled.update_prefetch 

1664 else: 

1665 return () 

1666 

1667 @util.memoized_property 

1668 def no_parameters(self): 

1669 return self.execution_options.get("no_parameters", False) 

1670 

1671 def _execute_scalar(self, stmt, type_, parameters=None): 

1672 """Execute a string statement on the current cursor, returning a 

1673 scalar result. 

1674 

1675 Used to fire off sequences, default phrases, and "select lastrowid" 

1676 types of statements individually or in the context of a parent INSERT 

1677 or UPDATE statement. 

1678 

1679 """ 

1680 

1681 conn = self.root_connection 

1682 

1683 if "schema_translate_map" in self.execution_options: 

1684 schema_translate_map = self.execution_options.get( 

1685 "schema_translate_map", {} 

1686 ) 

1687 

1688 rst = self.identifier_preparer._render_schema_translates 

1689 stmt = rst(stmt, schema_translate_map) 

1690 

1691 if not parameters: 

1692 if self.dialect.positional: 

1693 parameters = self.dialect.execute_sequence_format() 

1694 else: 

1695 parameters = {} 

1696 

1697 conn._cursor_execute(self.cursor, stmt, parameters, context=self) 

1698 row = self.cursor.fetchone() 

1699 if row is not None: 

1700 r = row[0] 

1701 else: 

1702 r = None 

1703 if type_ is not None: 

1704 # apply type post processors to the result 

1705 proc = type_._cached_result_processor( 

1706 self.dialect, self.cursor.description[0][1] 

1707 ) 

1708 if proc: 

1709 return proc(r) 

1710 return r 

1711 

1712 @util.memoized_property 

1713 def connection(self): 

1714 return self.root_connection 

1715 

1716 def _use_server_side_cursor(self): 

1717 if not self.dialect.supports_server_side_cursors: 

1718 return False 

1719 

1720 if self.dialect.server_side_cursors: 

1721 # this is deprecated 

1722 use_server_side = self.execution_options.get( 

1723 "stream_results", True 

1724 ) and ( 

1725 self.compiled 

1726 and isinstance(self.compiled.statement, expression.Selectable) 

1727 or ( 

1728 ( 

1729 not self.compiled 

1730 or isinstance( 

1731 self.compiled.statement, expression.TextClause 

1732 ) 

1733 ) 

1734 and self.unicode_statement 

1735 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement) 

1736 ) 

1737 ) 

1738 else: 

1739 use_server_side = self.execution_options.get( 

1740 "stream_results", False 

1741 ) 

1742 

1743 return use_server_side 

1744 

1745 def create_cursor(self): 

1746 if ( 

1747 # inlining initial preference checks for SS cursors 

1748 self.dialect.supports_server_side_cursors 

1749 and ( 

1750 self.execution_options.get("stream_results", False) 

1751 or ( 

1752 self.dialect.server_side_cursors 

1753 and self._use_server_side_cursor() 

1754 ) 

1755 ) 

1756 ): 

1757 self._is_server_side = True 

1758 return self.create_server_side_cursor() 

1759 else: 

1760 self._is_server_side = False 

1761 return self.create_default_cursor() 

1762 

1763 def fetchall_for_returning(self, cursor): 

1764 return cursor.fetchall() 

1765 

1766 def create_default_cursor(self): 

1767 return self._dbapi_connection.cursor() 

1768 

1769 def create_server_side_cursor(self): 

1770 raise NotImplementedError() 

1771 

1772 def pre_exec(self): 

1773 pass 

1774 

1775 def get_out_parameter_values(self, names): 

1776 raise NotImplementedError( 

1777 "This dialect does not support OUT parameters" 

1778 ) 

1779 

1780 def post_exec(self): 

1781 pass 

1782 

1783 def get_result_processor(self, type_, colname, coltype): 

1784 """Return a 'result processor' for a given type as present in 

1785 cursor.description. 

1786 

1787 This has a default implementation that dialects can override 

1788 for context-sensitive result type handling. 

1789 

1790 """ 

1791 return type_._cached_result_processor(self.dialect, coltype) 

1792 

1793 def get_lastrowid(self): 

1794 """return self.cursor.lastrowid, or equivalent, after an INSERT. 

1795 

1796 This may involve calling special cursor functions, issuing a new SELECT 

1797 on the cursor (or a new one), or returning a stored value that was 

1798 calculated within post_exec(). 

1799 

1800 This function will only be called for dialects which support "implicit" 

1801 primary key generation, keep preexecute_autoincrement_sequences set to 

1802 False, and when no explicit id value was bound to the statement. 

1803 

1804 The function is called once for an INSERT statement that would need to 

1805 return the last inserted primary key for those dialects that make use 

1806 of the lastrowid concept. In these cases, it is called directly after 

1807 :meth:`.ExecutionContext.post_exec`. 

1808 

1809 """ 

1810 return self.cursor.lastrowid 

1811 

1812 def handle_dbapi_exception(self, e): 

1813 pass 

1814 

1815 @util.non_memoized_property 

1816 def rowcount(self) -> int: 

1817 if self._rowcount is not None: 

1818 return self._rowcount 

1819 else: 

1820 return self.cursor.rowcount 

1821 

1822 @property 

1823 def _has_rowcount(self): 

1824 return self._rowcount is not None 

1825 

1826 def supports_sane_rowcount(self): 

1827 return self.dialect.supports_sane_rowcount 

1828 

1829 def supports_sane_multi_rowcount(self): 

1830 return self.dialect.supports_sane_multi_rowcount 

1831 

1832 def _setup_result_proxy(self): 

1833 exec_opt = self.execution_options 

1834 

1835 if self._rowcount is None and exec_opt.get("preserve_rowcount", False): 

1836 self._rowcount = self.cursor.rowcount 

1837 

1838 if self.is_crud or self.is_text: 

1839 result = self._setup_dml_or_text_result() 

1840 yp = sr = False 

1841 else: 

1842 yp = exec_opt.get("yield_per", None) 

1843 sr = self._is_server_side or exec_opt.get("stream_results", False) 

1844 strategy = self.cursor_fetch_strategy 

1845 if sr and strategy is _cursor._DEFAULT_FETCH: 

1846 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1847 self.cursor, self.execution_options 

1848 ) 

1849 cursor_description: _DBAPICursorDescription = ( 

1850 strategy.alternate_cursor_description 

1851 or self.cursor.description 

1852 ) 

1853 if cursor_description is None: 

1854 strategy = _cursor._NO_CURSOR_DQL 

1855 

1856 result = _cursor.CursorResult(self, strategy, cursor_description) 

1857 

1858 compiled = self.compiled 

1859 

1860 if ( 

1861 compiled 

1862 and not self.isddl 

1863 and cast(SQLCompiler, compiled).has_out_parameters 

1864 ): 

1865 self._setup_out_parameters(result) 

1866 

1867 self._soft_closed = result._soft_closed 

1868 

1869 if yp: 

1870 result = result.yield_per(yp) 

1871 

1872 return result 

1873 

1874 def _setup_out_parameters(self, result): 

1875 compiled = cast(SQLCompiler, self.compiled) 

1876 

1877 out_bindparams = [ 

1878 (param, name) 

1879 for param, name in compiled.bind_names.items() 

1880 if param.isoutparam 

1881 ] 

1882 out_parameters = {} 

1883 

1884 for bindparam, raw_value in zip( 

1885 [param for param, name in out_bindparams], 

1886 self.get_out_parameter_values( 

1887 [name for param, name in out_bindparams] 

1888 ), 

1889 ): 

1890 type_ = bindparam.type 

1891 impl_type = type_.dialect_impl(self.dialect) 

1892 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi) 

1893 result_processor = impl_type.result_processor( 

1894 self.dialect, dbapi_type 

1895 ) 

1896 if result_processor is not None: 

1897 raw_value = result_processor(raw_value) 

1898 out_parameters[bindparam.key] = raw_value 

1899 

1900 result.out_parameters = out_parameters 

1901 

1902 def _setup_dml_or_text_result(self): 

1903 compiled = cast(SQLCompiler, self.compiled) 

1904 

1905 strategy: ResultFetchStrategy = self.cursor_fetch_strategy 

1906 

1907 if self.isinsert: 

1908 if ( 

1909 self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1910 and compiled.effective_returning 

1911 ): 

1912 strategy = _cursor.FullyBufferedCursorFetchStrategy( 

1913 self.cursor, 

1914 initial_buffer=self._insertmanyvalues_rows, 

1915 # maintain alt cursor description if set by the 

1916 # dialect, e.g. mssql preserves it 

1917 alternate_description=( 

1918 strategy.alternate_cursor_description 

1919 ), 

1920 ) 

1921 

1922 if compiled.postfetch_lastrowid: 

1923 self.inserted_primary_key_rows = ( 

1924 self._setup_ins_pk_from_lastrowid() 

1925 ) 

1926 # else if not self._is_implicit_returning, 

1927 # the default inserted_primary_key_rows accessor will 

1928 # return an "empty" primary key collection when accessed. 

1929 

1930 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: 

1931 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1932 self.cursor, self.execution_options 

1933 ) 

1934 

1935 if strategy is _cursor._NO_CURSOR_DML: 

1936 cursor_description = None 

1937 else: 

1938 cursor_description = ( 

1939 strategy.alternate_cursor_description 

1940 or self.cursor.description 

1941 ) 

1942 

1943 if cursor_description is None: 

1944 strategy = _cursor._NO_CURSOR_DML 

1945 elif self._num_sentinel_cols: 

1946 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1947 # strip out the sentinel columns from cursor description 

1948 # a similar logic is done to the rows only in CursorResult 

1949 cursor_description = cursor_description[ 

1950 0 : -self._num_sentinel_cols 

1951 ] 

1952 

1953 result: _cursor.CursorResult[Any] = _cursor.CursorResult( 

1954 self, strategy, cursor_description 

1955 ) 

1956 

1957 if self.isinsert: 

1958 if self._is_implicit_returning: 

1959 rows = result.all() 

1960 

1961 self.returned_default_rows = rows 

1962 

1963 self.inserted_primary_key_rows = ( 

1964 self._setup_ins_pk_from_implicit_returning(result, rows) 

1965 ) 

1966 

1967 # test that it has a cursor metadata that is accurate. the 

1968 # first row will have been fetched and current assumptions 

1969 # are that the result has only one row, until executemany() 

1970 # support is added here. 

1971 assert result._metadata.returns_rows 

1972 

1973 # Insert statement has both return_defaults() and 

1974 # returning(). rewind the result on the list of rows 

1975 # we just used. 

1976 if self._is_supplemental_returning: 

1977 result._rewind(rows) 

1978 else: 

1979 result._soft_close() 

1980 elif not self._is_explicit_returning: 

1981 result._soft_close() 

1982 

1983 # we assume here the result does not return any rows. 

1984 # *usually*, this will be true. However, some dialects 

1985 # such as that of MSSQL/pyodbc need to SELECT a post fetch 

1986 # function so this is not necessarily true. 

1987 # assert not result.returns_rows 

1988 

1989 elif self._is_implicit_returning: 

1990 rows = result.all() 

1991 

1992 if rows: 

1993 self.returned_default_rows = rows 

1994 self._rowcount = len(rows) 

1995 

1996 if self._is_supplemental_returning: 

1997 result._rewind(rows) 

1998 else: 

1999 result._soft_close() 

2000 

2001 # test that it has a cursor metadata that is accurate. 

2002 # the rows have all been fetched however. 

2003 assert result._metadata.returns_rows 

2004 

2005 elif not result._metadata.returns_rows: 

2006 # no results, get rowcount 

2007 # (which requires open cursor on some drivers) 

2008 if self._rowcount is None: 

2009 self._rowcount = self.cursor.rowcount 

2010 result._soft_close() 

2011 elif self.isupdate or self.isdelete: 

2012 if self._rowcount is None: 

2013 self._rowcount = self.cursor.rowcount 

2014 return result 

2015 

2016 @util.memoized_property 

2017 def inserted_primary_key_rows(self): 

2018 # if no specific "get primary key" strategy was set up 

2019 # during execution, return a "default" primary key based 

2020 # on what's in the compiled_parameters and nothing else. 

2021 return self._setup_ins_pk_from_empty() 

2022 

2023 def _setup_ins_pk_from_lastrowid(self): 

2024 getter = cast( 

2025 SQLCompiler, self.compiled 

2026 )._inserted_primary_key_from_lastrowid_getter 

2027 lastrowid = self.get_lastrowid() 

2028 return [getter(lastrowid, self.compiled_parameters[0])] 

2029 

2030 def _setup_ins_pk_from_empty(self): 

2031 getter = cast( 

2032 SQLCompiler, self.compiled 

2033 )._inserted_primary_key_from_lastrowid_getter 

2034 return [getter(None, param) for param in self.compiled_parameters] 

2035 

2036 def _setup_ins_pk_from_implicit_returning(self, result, rows): 

2037 if not rows: 

2038 return [] 

2039 

2040 getter = cast( 

2041 SQLCompiler, self.compiled 

2042 )._inserted_primary_key_from_returning_getter 

2043 compiled_params = self.compiled_parameters 

2044 

2045 return [ 

2046 getter(row, param) for row, param in zip(rows, compiled_params) 

2047 ] 

2048 

2049 def lastrow_has_defaults(self): 

2050 return (self.isinsert or self.isupdate) and bool( 

2051 cast(SQLCompiler, self.compiled).postfetch 

2052 ) 

2053 

2054 def _prepare_set_input_sizes( 

2055 self, 

2056 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]: 

2057 """Given a cursor and ClauseParameters, prepare arguments 

2058 in order to call the appropriate 

2059 style of ``setinputsizes()`` on the cursor, using DB-API types 

2060 from the bind parameter's ``TypeEngine`` objects. 

2061 

2062 This method only called by those dialects which set 

2063 the :attr:`.Dialect.bind_typing` attribute to 

2064 :attr:`.BindTyping.SETINPUTSIZES`. cx_Oracle is the only DBAPI 

2065 that requires setinputsizes(), pyodbc offers it as an option. 

2066 

2067 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used 

2068 for pg8000 and asyncpg, which has been changed to inline rendering 

2069 of casts. 

2070 

2071 """ 

2072 if self.isddl or self.is_text: 

2073 return None 

2074 

2075 compiled = cast(SQLCompiler, self.compiled) 

2076 

2077 inputsizes = compiled._get_set_input_sizes_lookup() 

2078 

2079 if inputsizes is None: 

2080 return None 

2081 

2082 dialect = self.dialect 

2083 

2084 # all of the rest of this... cython? 

2085 

2086 if dialect._has_events: 

2087 inputsizes = dict(inputsizes) 

2088 dialect.dispatch.do_setinputsizes( 

2089 inputsizes, self.cursor, self.statement, self.parameters, self 

2090 ) 

2091 

2092 if compiled.escaped_bind_names: 

2093 escaped_bind_names = compiled.escaped_bind_names 

2094 else: 

2095 escaped_bind_names = None 

2096 

2097 if dialect.positional: 

2098 items = [ 

2099 (key, compiled.binds[key]) 

2100 for key in compiled.positiontup or () 

2101 ] 

2102 else: 

2103 items = [ 

2104 (key, bindparam) 

2105 for bindparam, key in compiled.bind_names.items() 

2106 ] 

2107 

2108 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = [] 

2109 for key, bindparam in items: 

2110 if bindparam in compiled.literal_execute_params: 

2111 continue 

2112 

2113 if key in self._expanded_parameters: 

2114 if is_tuple_type(bindparam.type): 

2115 num = len(bindparam.type.types) 

2116 dbtypes = inputsizes[bindparam] 

2117 generic_inputsizes.extend( 

2118 ( 

2119 ( 

2120 escaped_bind_names.get(paramname, paramname) 

2121 if escaped_bind_names is not None 

2122 else paramname 

2123 ), 

2124 dbtypes[idx % num], 

2125 bindparam.type.types[idx % num], 

2126 ) 

2127 for idx, paramname in enumerate( 

2128 self._expanded_parameters[key] 

2129 ) 

2130 ) 

2131 else: 

2132 dbtype = inputsizes.get(bindparam, None) 

2133 generic_inputsizes.extend( 

2134 ( 

2135 ( 

2136 escaped_bind_names.get(paramname, paramname) 

2137 if escaped_bind_names is not None 

2138 else paramname 

2139 ), 

2140 dbtype, 

2141 bindparam.type, 

2142 ) 

2143 for paramname in self._expanded_parameters[key] 

2144 ) 

2145 else: 

2146 dbtype = inputsizes.get(bindparam, None) 

2147 

2148 escaped_name = ( 

2149 escaped_bind_names.get(key, key) 

2150 if escaped_bind_names is not None 

2151 else key 

2152 ) 

2153 

2154 generic_inputsizes.append( 

2155 (escaped_name, dbtype, bindparam.type) 

2156 ) 

2157 

2158 return generic_inputsizes 

2159 

2160 def _exec_default(self, column, default, type_): 

2161 if default.is_sequence: 

2162 return self.fire_sequence(default, type_) 

2163 elif default.is_callable: 

2164 # this codepath is not normally used as it's inlined 

2165 # into _process_execute_defaults 

2166 self.current_column = column 

2167 return default.arg(self) 

2168 elif default.is_clause_element: 

2169 return self._exec_default_clause_element(column, default, type_) 

2170 else: 

2171 # this codepath is not normally used as it's inlined 

2172 # into _process_execute_defaults 

2173 return default.arg 

2174 

2175 def _exec_default_clause_element(self, column, default, type_): 

2176 # execute a default that's a complete clause element. Here, we have 

2177 # to re-implement a miniature version of the compile->parameters-> 

2178 # cursor.execute() sequence, since we don't want to modify the state 

2179 # of the connection / result in progress or create new connection/ 

2180 # result objects etc. 

2181 # .. versionchanged:: 1.4 

2182 

2183 if not default._arg_is_typed: 

2184 default_arg = expression.type_coerce(default.arg, type_) 

2185 else: 

2186 default_arg = default.arg 

2187 compiled = expression.select(default_arg).compile(dialect=self.dialect) 

2188 compiled_params = compiled.construct_params() 

2189 processors = compiled._bind_processors 

2190 if compiled.positional: 

2191 parameters = self.dialect.execute_sequence_format( 

2192 [ 

2193 ( 

2194 processors[key](compiled_params[key]) # type: ignore 

2195 if key in processors 

2196 else compiled_params[key] 

2197 ) 

2198 for key in compiled.positiontup or () 

2199 ] 

2200 ) 

2201 else: 

2202 parameters = { 

2203 key: ( 

2204 processors[key](compiled_params[key]) # type: ignore 

2205 if key in processors 

2206 else compiled_params[key] 

2207 ) 

2208 for key in compiled_params 

2209 } 

2210 return self._execute_scalar( 

2211 str(compiled), type_, parameters=parameters 

2212 ) 

2213 

2214 current_parameters: Optional[_CoreSingleExecuteParams] = None 

2215 """A dictionary of parameters applied to the current row. 

2216 

2217 This attribute is only available in the context of a user-defined default 

2218 generation function, e.g. as described at :ref:`context_default_functions`. 

2219 It consists of a dictionary which includes entries for each column/value 

2220 pair that is to be part of the INSERT or UPDATE statement. The keys of the 

2221 dictionary will be the key value of each :class:`_schema.Column`, 

2222 which is usually 

2223 synonymous with the name. 

2224 

2225 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute 

2226 does not accommodate for the "multi-values" feature of the 

2227 :meth:`_expression.Insert.values` method. The 

2228 :meth:`.DefaultExecutionContext.get_current_parameters` method should be 

2229 preferred. 

2230 

2231 .. seealso:: 

2232 

2233 :meth:`.DefaultExecutionContext.get_current_parameters` 

2234 

2235 :ref:`context_default_functions` 

2236 

2237 """ 

2238 

2239 def get_current_parameters(self, isolate_multiinsert_groups=True): 

2240 """Return a dictionary of parameters applied to the current row. 

2241 

2242 This method can only be used in the context of a user-defined default 

2243 generation function, e.g. as described at 

2244 :ref:`context_default_functions`. When invoked, a dictionary is 

2245 returned which includes entries for each column/value pair that is part 

2246 of the INSERT or UPDATE statement. The keys of the dictionary will be 

2247 the key value of each :class:`_schema.Column`, 

2248 which is usually synonymous 

2249 with the name. 

2250 

2251 :param isolate_multiinsert_groups=True: indicates that multi-valued 

2252 INSERT constructs created using :meth:`_expression.Insert.values` 

2253 should be 

2254 handled by returning only the subset of parameters that are local 

2255 to the current column default invocation. When ``False``, the 

2256 raw parameters of the statement are returned including the 

2257 naming convention used in the case of multi-valued INSERT. 

2258 

2259 .. versionadded:: 1.2 added 

2260 :meth:`.DefaultExecutionContext.get_current_parameters` 

2261 which provides more functionality over the existing 

2262 :attr:`.DefaultExecutionContext.current_parameters` 

2263 attribute. 

2264 

2265 .. seealso:: 

2266 

2267 :attr:`.DefaultExecutionContext.current_parameters` 

2268 

2269 :ref:`context_default_functions` 

2270 

2271 """ 

2272 try: 

2273 parameters = self.current_parameters 

2274 column = self.current_column 

2275 except AttributeError: 

2276 raise exc.InvalidRequestError( 

2277 "get_current_parameters() can only be invoked in the " 

2278 "context of a Python side column default function" 

2279 ) 

2280 else: 

2281 assert column is not None 

2282 assert parameters is not None 

2283 compile_state = cast( 

2284 "DMLState", cast(SQLCompiler, self.compiled).compile_state 

2285 ) 

2286 assert compile_state is not None 

2287 if ( 

2288 isolate_multiinsert_groups 

2289 and dml.isinsert(compile_state) 

2290 and compile_state._has_multi_parameters 

2291 ): 

2292 if column._is_multiparam_column: 

2293 index = column.index + 1 

2294 d = {column.original.key: parameters[column.key]} 

2295 else: 

2296 d = {column.key: parameters[column.key]} 

2297 index = 0 

2298 assert compile_state._dict_parameters is not None 

2299 keys = compile_state._dict_parameters.keys() 

2300 d.update( 

2301 (key, parameters["%s_m%d" % (key, index)]) for key in keys 

2302 ) 

2303 return d 

2304 else: 

2305 return parameters 

2306 

2307 def get_insert_default(self, column): 

2308 if column.default is None: 

2309 return None 

2310 else: 

2311 return self._exec_default(column, column.default, column.type) 

2312 

2313 def get_update_default(self, column): 

2314 if column.onupdate is None: 

2315 return None 

2316 else: 

2317 return self._exec_default(column, column.onupdate, column.type) 

2318 

2319 def _process_execute_defaults(self): 

2320 compiled = cast(SQLCompiler, self.compiled) 

2321 

2322 key_getter = compiled._within_exec_param_key_getter 

2323 

2324 sentinel_counter = 0 

2325 

2326 if compiled.insert_prefetch: 

2327 prefetch_recs = [ 

2328 ( 

2329 c, 

2330 key_getter(c), 

2331 c._default_description_tuple, 

2332 self.get_insert_default, 

2333 ) 

2334 for c in compiled.insert_prefetch 

2335 ] 

2336 elif compiled.update_prefetch: 

2337 prefetch_recs = [ 

2338 ( 

2339 c, 

2340 key_getter(c), 

2341 c._onupdate_description_tuple, 

2342 self.get_update_default, 

2343 ) 

2344 for c in compiled.update_prefetch 

2345 ] 

2346 else: 

2347 prefetch_recs = [] 

2348 

2349 for param in self.compiled_parameters: 

2350 self.current_parameters = param 

2351 

2352 for ( 

2353 c, 

2354 param_key, 

2355 (arg, is_scalar, is_callable, is_sentinel), 

2356 fallback, 

2357 ) in prefetch_recs: 

2358 if is_sentinel: 

2359 param[param_key] = sentinel_counter 

2360 sentinel_counter += 1 

2361 elif is_scalar: 

2362 param[param_key] = arg 

2363 elif is_callable: 

2364 self.current_column = c 

2365 param[param_key] = arg(self) 

2366 else: 

2367 val = fallback(c) 

2368 if val is not None: 

2369 param[param_key] = val 

2370 

2371 del self.current_parameters 

2372 

2373 

2374DefaultDialect.execution_ctx_cls = DefaultExecutionContext