Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1047 statements  

1# engine/default.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Default implementations of per-dialect sqlalchemy.engine classes. 

10 

11These are semi-private implementation classes which are only of importance 

12to database dialect authors; dialects will usually use the classes here 

13as the base class for their own corresponding classes. 

14 

15""" 

16 

17from __future__ import annotations 

18 

19import functools 

20import operator 

21import random 

22import re 

23from time import perf_counter 

24import typing 

25from typing import Any 

26from typing import Callable 

27from typing import cast 

28from typing import Dict 

29from typing import List 

30from typing import Mapping 

31from typing import MutableMapping 

32from typing import MutableSequence 

33from typing import Optional 

34from typing import Sequence 

35from typing import Set 

36from typing import Tuple 

37from typing import Type 

38from typing import TYPE_CHECKING 

39from typing import Union 

40import weakref 

41 

42from . import characteristics 

43from . import cursor as _cursor 

44from . import interfaces 

45from .base import Connection 

46from .interfaces import CacheStats 

47from .interfaces import DBAPICursor 

48from .interfaces import Dialect 

49from .interfaces import ExecuteStyle 

50from .interfaces import ExecutionContext 

51from .reflection import ObjectKind 

52from .reflection import ObjectScope 

53from .. import event 

54from .. import exc 

55from .. import pool 

56from .. import util 

57from ..sql import compiler 

58from ..sql import dml 

59from ..sql import expression 

60from ..sql import type_api 

61from ..sql import util as sql_util 

62from ..sql._typing import is_tuple_type 

63from ..sql.base import _NoArg 

64from ..sql.compiler import DDLCompiler 

65from ..sql.compiler import InsertmanyvaluesSentinelOpts 

66from ..sql.compiler import SQLCompiler 

67from ..sql.elements import quoted_name 

68from ..util.typing import Final 

69from ..util.typing import Literal 

70 

71if typing.TYPE_CHECKING: 

72 from types import ModuleType 

73 

74 from .base import Engine 

75 from .cursor import ResultFetchStrategy 

76 from .interfaces import _CoreMultiExecuteParams 

77 from .interfaces import _CoreSingleExecuteParams 

78 from .interfaces import _DBAPICursorDescription 

79 from .interfaces import _DBAPIMultiExecuteParams 

80 from .interfaces import _DBAPISingleExecuteParams 

81 from .interfaces import _ExecuteOptions 

82 from .interfaces import _MutableCoreSingleExecuteParams 

83 from .interfaces import _ParamStyle 

84 from .interfaces import ConnectArgsType 

85 from .interfaces import DBAPIConnection 

86 from .interfaces import DBAPIModule 

87 from .interfaces import IsolationLevel 

88 from .row import Row 

89 from .url import URL 

90 from ..event import _ListenerFnType 

91 from ..pool import Pool 

92 from ..pool import PoolProxiedConnection 

93 from ..sql import Executable 

94 from ..sql.compiler import Compiled 

95 from ..sql.compiler import Linting 

96 from ..sql.compiler import ResultColumnsEntry 

97 from ..sql.dml import DMLState 

98 from ..sql.dml import UpdateBase 

99 from ..sql.elements import BindParameter 

100 from ..sql.schema import Column 

101 from ..sql.type_api import _BindProcessorType 

102 from ..sql.type_api import _ResultProcessorType 

103 from ..sql.type_api import TypeEngine 

104 

105 

106# When we're handed literal SQL, ensure it's a SELECT query 

107SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE) 

108 

109 

110( 

111 CACHE_HIT, 

112 CACHE_MISS, 

113 CACHING_DISABLED, 

114 NO_CACHE_KEY, 

115 NO_DIALECT_SUPPORT, 

116) = list(CacheStats) 

117 

118 

119class DefaultDialect(Dialect): 

120 """Default implementation of Dialect""" 

121 

122 statement_compiler = compiler.SQLCompiler 

123 ddl_compiler = compiler.DDLCompiler 

124 type_compiler_cls = compiler.GenericTypeCompiler 

125 

126 preparer = compiler.IdentifierPreparer 

127 supports_alter = True 

128 supports_comments = False 

129 supports_constraint_comments = False 

130 inline_comments = False 

131 supports_statement_cache = True 

132 

133 div_is_floordiv = True 

134 

135 bind_typing = interfaces.BindTyping.NONE 

136 

137 include_set_input_sizes: Optional[Set[Any]] = None 

138 exclude_set_input_sizes: Optional[Set[Any]] = None 

139 

140 # the first value we'd get for an autoincrement column. 

141 default_sequence_base = 1 

142 

143 # most DBAPIs happy with this for execute(). 

144 # not cx_oracle. 

145 execute_sequence_format = tuple 

146 

147 supports_schemas = True 

148 supports_views = True 

149 supports_sequences = False 

150 sequences_optional = False 

151 preexecute_autoincrement_sequences = False 

152 supports_identity_columns = False 

153 postfetch_lastrowid = True 

154 favor_returning_over_lastrowid = False 

155 insert_null_pk_still_autoincrements = False 

156 update_returning = False 

157 delete_returning = False 

158 update_returning_multifrom = False 

159 delete_returning_multifrom = False 

160 insert_returning = False 

161 

162 cte_follows_insert = False 

163 

164 supports_native_enum = False 

165 supports_native_boolean = False 

166 supports_native_uuid = False 

167 returns_native_bytes = False 

168 

169 non_native_boolean_check_constraint = True 

170 

171 supports_simple_order_by_label = True 

172 

173 tuple_in_values = False 

174 

175 connection_characteristics = util.immutabledict( 

176 { 

177 "isolation_level": characteristics.IsolationLevelCharacteristic(), 

178 "logging_token": characteristics.LoggingTokenCharacteristic(), 

179 } 

180 ) 

181 

182 engine_config_types: Mapping[str, Any] = util.immutabledict( 

183 { 

184 "pool_timeout": util.asint, 

185 "echo": util.bool_or_str("debug"), 

186 "echo_pool": util.bool_or_str("debug"), 

187 "pool_recycle": util.asint, 

188 "pool_size": util.asint, 

189 "max_overflow": util.asint, 

190 "future": util.asbool, 

191 } 

192 ) 

193 

194 # if the NUMERIC type 

195 # returns decimal.Decimal. 

196 # *not* the FLOAT type however. 

197 supports_native_decimal = False 

198 

199 name = "default" 

200 

201 # length at which to truncate 

202 # any identifier. 

203 max_identifier_length = 9999 

204 _user_defined_max_identifier_length: Optional[int] = None 

205 

206 isolation_level: Optional[str] = None 

207 

208 # sub-categories of max_identifier_length. 

209 # currently these accommodate for MySQL which allows alias names 

210 # of 255 but DDL names only of 64. 

211 max_index_name_length: Optional[int] = None 

212 max_constraint_name_length: Optional[int] = None 

213 

214 supports_sane_rowcount = True 

215 supports_sane_multi_rowcount = True 

216 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {} 

217 default_paramstyle = "named" 

218 

219 supports_default_values = False 

220 """dialect supports INSERT... DEFAULT VALUES syntax""" 

221 

222 supports_default_metavalue = False 

223 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

224 

225 default_metavalue_token = "DEFAULT" 

226 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

227 parenthesis.""" 

228 

229 # not sure if this is a real thing but the compiler will deliver it 

230 # if this is the only flag enabled. 

231 supports_empty_insert = True 

232 """dialect supports INSERT () VALUES ()""" 

233 

234 supports_multivalues_insert = False 

235 

236 use_insertmanyvalues: bool = False 

237 

238 use_insertmanyvalues_wo_returning: bool = False 

239 

240 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = ( 

241 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED 

242 ) 

243 

244 insertmanyvalues_page_size: int = 1000 

245 insertmanyvalues_max_parameters = 32700 

246 

247 supports_is_distinct_from = True 

248 

249 supports_server_side_cursors = False 

250 

251 server_side_cursors = False 

252 

253 # extra record-level locking features (#4860) 

254 supports_for_update_of = False 

255 

256 server_version_info = None 

257 

258 default_schema_name: Optional[str] = None 

259 

260 # indicates symbol names are 

261 # UPPERCASED if they are case insensitive 

262 # within the database. 

263 # if this is True, the methods normalize_name() 

264 # and denormalize_name() must be provided. 

265 requires_name_normalize = False 

266 

267 is_async = False 

268 

269 has_terminate = False 

270 

271 # TODO: this is not to be part of 2.0. implement rudimentary binary 

272 # literals for SQLite, PostgreSQL, MySQL only within 

273 # _Binary.literal_processor 

274 _legacy_binary_type_literal_encoding = "utf-8" 

275 

276 @util.deprecated_params( 

277 empty_in_strategy=( 

278 "1.4", 

279 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is " 

280 "deprecated, and no longer has any effect. All IN expressions " 

281 "are now rendered using " 

282 'the "expanding parameter" strategy which renders a set of bound' 

283 'expressions, or an "empty set" SELECT, at statement execution' 

284 "time.", 

285 ), 

286 server_side_cursors=( 

287 "1.4", 

288 "The :paramref:`_sa.create_engine.server_side_cursors` parameter " 

289 "is deprecated and will be removed in a future release. Please " 

290 "use the " 

291 ":paramref:`_engine.Connection.execution_options.stream_results` " 

292 "parameter.", 

293 ), 

294 ) 

295 def __init__( 

296 self, 

297 paramstyle: Optional[_ParamStyle] = None, 

298 isolation_level: Optional[IsolationLevel] = None, 

299 dbapi: Optional[ModuleType] = None, 

300 implicit_returning: Literal[True] = True, 

301 supports_native_boolean: Optional[bool] = None, 

302 max_identifier_length: Optional[int] = None, 

303 label_length: Optional[int] = None, 

304 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG, 

305 use_insertmanyvalues: Optional[bool] = None, 

306 # util.deprecated_params decorator cannot render the 

307 # Linting.NO_LINTING constant 

308 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore 

309 server_side_cursors: bool = False, 

310 skip_autocommit_rollback: bool = False, 

311 **kwargs: Any, 

312 ): 

313 if server_side_cursors: 

314 if not self.supports_server_side_cursors: 

315 raise exc.ArgumentError( 

316 "Dialect %s does not support server side cursors" % self 

317 ) 

318 else: 

319 self.server_side_cursors = True 

320 

321 if getattr(self, "use_setinputsizes", False): 

322 util.warn_deprecated( 

323 "The dialect-level use_setinputsizes attribute is " 

324 "deprecated. Please use " 

325 "bind_typing = BindTyping.SETINPUTSIZES", 

326 "2.0", 

327 ) 

328 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES 

329 

330 self.positional = False 

331 self._ischema = None 

332 

333 self.dbapi = dbapi 

334 

335 self.skip_autocommit_rollback = skip_autocommit_rollback 

336 

337 if paramstyle is not None: 

338 self.paramstyle = paramstyle 

339 elif self.dbapi is not None: 

340 self.paramstyle = self.dbapi.paramstyle 

341 else: 

342 self.paramstyle = self.default_paramstyle 

343 self.positional = self.paramstyle in ( 

344 "qmark", 

345 "format", 

346 "numeric", 

347 "numeric_dollar", 

348 ) 

349 self.identifier_preparer = self.preparer(self) 

350 self._on_connect_isolation_level = isolation_level 

351 

352 legacy_tt_callable = getattr(self, "type_compiler", None) 

353 if legacy_tt_callable is not None: 

354 tt_callable = cast( 

355 Type[compiler.GenericTypeCompiler], 

356 self.type_compiler, 

357 ) 

358 else: 

359 tt_callable = self.type_compiler_cls 

360 

361 self.type_compiler_instance = self.type_compiler = tt_callable(self) 

362 

363 if supports_native_boolean is not None: 

364 self.supports_native_boolean = supports_native_boolean 

365 

366 self._user_defined_max_identifier_length = max_identifier_length 

367 if self._user_defined_max_identifier_length: 

368 self.max_identifier_length = ( 

369 self._user_defined_max_identifier_length 

370 ) 

371 self.label_length = label_length 

372 self.compiler_linting = compiler_linting 

373 

374 if use_insertmanyvalues is not None: 

375 self.use_insertmanyvalues = use_insertmanyvalues 

376 

377 if insertmanyvalues_page_size is not _NoArg.NO_ARG: 

378 self.insertmanyvalues_page_size = insertmanyvalues_page_size 

379 

380 @property 

381 @util.deprecated( 

382 "2.0", 

383 "full_returning is deprecated, please use insert_returning, " 

384 "update_returning, delete_returning", 

385 ) 

386 def full_returning(self): 

387 return ( 

388 self.insert_returning 

389 and self.update_returning 

390 and self.delete_returning 

391 ) 

392 

393 @util.memoized_property 

394 def insert_executemany_returning(self): 

395 """Default implementation for insert_executemany_returning, if not 

396 otherwise overridden by the specific dialect. 

397 

398 The default dialect determines "insert_executemany_returning" is 

399 available if the dialect in use has opted into using the 

400 "use_insertmanyvalues" feature. If they haven't opted into that, then 

401 this attribute is False, unless the dialect in question overrides this 

402 and provides some other implementation (such as the Oracle Database 

403 dialects). 

404 

405 """ 

406 return self.insert_returning and self.use_insertmanyvalues 

407 

408 @util.memoized_property 

409 def insert_executemany_returning_sort_by_parameter_order(self): 

410 """Default implementation for 

411 insert_executemany_returning_deterministic_order, if not otherwise 

412 overridden by the specific dialect. 

413 

414 The default dialect determines "insert_executemany_returning" can have 

415 deterministic order only if the dialect in use has opted into using the 

416 "use_insertmanyvalues" feature, which implements deterministic ordering 

417 using client side sentinel columns only by default. The 

418 "insertmanyvalues" feature also features alternate forms that can 

419 use server-generated PK values as "sentinels", but those are only 

420 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel` 

421 bitflag enables those alternate SQL forms, which are disabled 

422 by default. 

423 

424 If the dialect in use hasn't opted into that, then this attribute is 

425 False, unless the dialect in question overrides this and provides some 

426 other implementation (such as the Oracle Database dialects). 

427 

428 """ 

429 return self.insert_returning and self.use_insertmanyvalues 

430 

431 update_executemany_returning = False 

432 delete_executemany_returning = False 

433 

434 @util.memoized_property 

435 def loaded_dbapi(self) -> DBAPIModule: 

436 if self.dbapi is None: 

437 raise exc.InvalidRequestError( 

438 f"Dialect {self} does not have a Python DBAPI established " 

439 "and cannot be used for actual database interaction" 

440 ) 

441 return self.dbapi 

442 

443 @util.memoized_property 

444 def _bind_typing_render_casts(self): 

445 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS 

446 

447 def _ensure_has_table_connection(self, arg: Connection) -> None: 

448 if not isinstance(arg, Connection): 

449 raise exc.ArgumentError( 

450 "The argument passed to Dialect.has_table() should be a " 

451 "%s, got %s. " 

452 "Additionally, the Dialect.has_table() method is for " 

453 "internal dialect " 

454 "use only; please use " 

455 "``inspect(some_engine).has_table(<tablename>>)`` " 

456 "for public API use." % (Connection, type(arg)) 

457 ) 

458 

459 @util.memoized_property 

460 def _supports_statement_cache(self): 

461 ssc = self.__class__.__dict__.get("supports_statement_cache", None) 

462 if ssc is None: 

463 util.warn( 

464 "Dialect %s:%s will not make use of SQL compilation caching " 

465 "as it does not set the 'supports_statement_cache' attribute " 

466 "to ``True``. This can have " 

467 "significant performance implications including some " 

468 "performance degradations in comparison to prior SQLAlchemy " 

469 "versions. Dialect maintainers should seek to set this " 

470 "attribute to True after appropriate development and testing " 

471 "for SQLAlchemy 1.4 caching support. Alternatively, this " 

472 "attribute may be set to False which will disable this " 

473 "warning." % (self.name, self.driver), 

474 code="cprf", 

475 ) 

476 

477 return bool(ssc) 

478 

479 @util.memoized_property 

480 def _type_memos(self): 

481 return weakref.WeakKeyDictionary() 

482 

483 @property 

484 def dialect_description(self): # type: ignore[override] 

485 return self.name + "+" + self.driver 

486 

487 @property 

488 def supports_sane_rowcount_returning(self): 

489 """True if this dialect supports sane rowcount even if RETURNING is 

490 in use. 

491 

492 For dialects that don't support RETURNING, this is synonymous with 

493 ``supports_sane_rowcount``. 

494 

495 """ 

496 return self.supports_sane_rowcount 

497 

498 @classmethod 

499 def get_pool_class(cls, url: URL) -> Type[Pool]: 

500 return getattr(cls, "poolclass", pool.QueuePool) 

501 

502 def get_dialect_pool_class(self, url: URL) -> Type[Pool]: 

503 return self.get_pool_class(url) 

504 

505 @classmethod 

506 def load_provisioning(cls): 

507 package = ".".join(cls.__module__.split(".")[0:-1]) 

508 try: 

509 __import__(package + ".provision") 

510 except ImportError: 

511 pass 

512 

513 def _builtin_onconnect(self) -> Optional[_ListenerFnType]: 

514 if self._on_connect_isolation_level is not None: 

515 

516 def builtin_connect(dbapi_conn, conn_rec): 

517 self._assert_and_set_isolation_level( 

518 dbapi_conn, self._on_connect_isolation_level 

519 ) 

520 

521 return builtin_connect 

522 else: 

523 return None 

524 

525 def initialize(self, connection: Connection) -> None: 

526 try: 

527 self.server_version_info = self._get_server_version_info( 

528 connection 

529 ) 

530 except NotImplementedError: 

531 self.server_version_info = None 

532 try: 

533 self.default_schema_name = self._get_default_schema_name( 

534 connection 

535 ) 

536 except NotImplementedError: 

537 self.default_schema_name = None 

538 

539 try: 

540 self.default_isolation_level = self.get_default_isolation_level( 

541 connection.connection.dbapi_connection 

542 ) 

543 except NotImplementedError: 

544 self.default_isolation_level = None 

545 

546 if not self._user_defined_max_identifier_length: 

547 max_ident_length = self._check_max_identifier_length(connection) 

548 if max_ident_length: 

549 self.max_identifier_length = max_ident_length 

550 

551 if ( 

552 self.label_length 

553 and self.label_length > self.max_identifier_length 

554 ): 

555 raise exc.ArgumentError( 

556 "Label length of %d is greater than this dialect's" 

557 " maximum identifier length of %d" 

558 % (self.label_length, self.max_identifier_length) 

559 ) 

560 

561 def on_connect(self) -> Optional[Callable[[Any], None]]: 

562 # inherits the docstring from interfaces.Dialect.on_connect 

563 return None 

564 

565 def _check_max_identifier_length(self, connection): 

566 """Perform a connection / server version specific check to determine 

567 the max_identifier_length. 

568 

569 If the dialect's class level max_identifier_length should be used, 

570 can return None. 

571 

572 .. versionadded:: 1.3.9 

573 

574 """ 

575 return None 

576 

577 def get_default_isolation_level(self, dbapi_conn): 

578 """Given a DBAPI connection, return its isolation level, or 

579 a default isolation level if one cannot be retrieved. 

580 

581 May be overridden by subclasses in order to provide a 

582 "fallback" isolation level for databases that cannot reliably 

583 retrieve the actual isolation level. 

584 

585 By default, calls the :meth:`_engine.Interfaces.get_isolation_level` 

586 method, propagating any exceptions raised. 

587 

588 .. versionadded:: 1.3.22 

589 

590 """ 

591 return self.get_isolation_level(dbapi_conn) 

592 

593 def type_descriptor(self, typeobj): 

594 """Provide a database-specific :class:`.TypeEngine` object, given 

595 the generic object which comes from the types module. 

596 

597 This method looks for a dictionary called 

598 ``colspecs`` as a class or instance-level variable, 

599 and passes on to :func:`_types.adapt_type`. 

600 

601 """ 

602 return type_api.adapt_type(typeobj, self.colspecs) 

603 

604 def has_index(self, connection, table_name, index_name, schema=None, **kw): 

605 if not self.has_table(connection, table_name, schema=schema, **kw): 

606 return False 

607 for idx in self.get_indexes( 

608 connection, table_name, schema=schema, **kw 

609 ): 

610 if idx["name"] == index_name: 

611 return True 

612 else: 

613 return False 

614 

615 def has_schema( 

616 self, connection: Connection, schema_name: str, **kw: Any 

617 ) -> bool: 

618 return schema_name in self.get_schema_names(connection, **kw) 

619 

620 def validate_identifier(self, ident: str) -> None: 

621 if len(ident) > self.max_identifier_length: 

622 raise exc.IdentifierError( 

623 "Identifier '%s' exceeds maximum length of %d characters" 

624 % (ident, self.max_identifier_length) 

625 ) 

626 

627 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection: 

628 # inherits the docstring from interfaces.Dialect.connect 

629 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501 

630 

631 def create_connect_args(self, url: URL) -> ConnectArgsType: 

632 # inherits the docstring from interfaces.Dialect.create_connect_args 

633 opts = url.translate_connect_args() 

634 opts.update(url.query) 

635 return ([], opts) 

636 

637 def set_engine_execution_options( 

638 self, engine: Engine, opts: Mapping[str, Any] 

639 ) -> None: 

640 supported_names = set(self.connection_characteristics).intersection( 

641 opts 

642 ) 

643 if supported_names: 

644 characteristics: Mapping[str, Any] = util.immutabledict( 

645 (name, opts[name]) for name in supported_names 

646 ) 

647 

648 @event.listens_for(engine, "engine_connect") 

649 def set_connection_characteristics(connection): 

650 self._set_connection_characteristics( 

651 connection, characteristics 

652 ) 

653 

654 def set_connection_execution_options( 

655 self, connection: Connection, opts: Mapping[str, Any] 

656 ) -> None: 

657 supported_names = set(self.connection_characteristics).intersection( 

658 opts 

659 ) 

660 if supported_names: 

661 characteristics: Mapping[str, Any] = util.immutabledict( 

662 (name, opts[name]) for name in supported_names 

663 ) 

664 self._set_connection_characteristics(connection, characteristics) 

665 

666 def _set_connection_characteristics(self, connection, characteristics): 

667 characteristic_values = [ 

668 (name, self.connection_characteristics[name], value) 

669 for name, value in characteristics.items() 

670 ] 

671 

672 if connection.in_transaction(): 

673 trans_objs = [ 

674 (name, obj) 

675 for name, obj, _ in characteristic_values 

676 if obj.transactional 

677 ] 

678 if trans_objs: 

679 raise exc.InvalidRequestError( 

680 "This connection has already initialized a SQLAlchemy " 

681 "Transaction() object via begin() or autobegin; " 

682 "%s may not be altered unless rollback() or commit() " 

683 "is called first." 

684 % (", ".join(name for name, obj in trans_objs)) 

685 ) 

686 

687 dbapi_connection = connection.connection.dbapi_connection 

688 for _, characteristic, value in characteristic_values: 

689 characteristic.set_connection_characteristic( 

690 self, connection, dbapi_connection, value 

691 ) 

692 connection.connection._connection_record.finalize_callback.append( 

693 functools.partial(self._reset_characteristics, characteristics) 

694 ) 

695 

696 def _reset_characteristics(self, characteristics, dbapi_connection): 

697 for characteristic_name in characteristics: 

698 characteristic = self.connection_characteristics[ 

699 characteristic_name 

700 ] 

701 characteristic.reset_characteristic(self, dbapi_connection) 

702 

703 def do_begin(self, dbapi_connection): 

704 pass 

705 

706 def do_rollback(self, dbapi_connection): 

707 if self.skip_autocommit_rollback and self.detect_autocommit_setting( 

708 dbapi_connection 

709 ): 

710 return 

711 dbapi_connection.rollback() 

712 

713 def do_commit(self, dbapi_connection): 

714 dbapi_connection.commit() 

715 

716 def do_terminate(self, dbapi_connection): 

717 self.do_close(dbapi_connection) 

718 

719 def do_close(self, dbapi_connection): 

720 dbapi_connection.close() 

721 

722 @util.memoized_property 

723 def _dialect_specific_select_one(self): 

724 return str(expression.select(1).compile(dialect=self)) 

725 

726 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: 

727 try: 

728 return self.do_ping(dbapi_connection) 

729 except self.loaded_dbapi.Error as err: 

730 is_disconnect = self.is_disconnect(err, dbapi_connection, None) 

731 

732 if self._has_events: 

733 try: 

734 Connection._handle_dbapi_exception_noconnection( 

735 err, 

736 self, 

737 is_disconnect=is_disconnect, 

738 invalidate_pool_on_disconnect=False, 

739 is_pre_ping=True, 

740 ) 

741 except exc.StatementError as new_err: 

742 is_disconnect = new_err.connection_invalidated 

743 

744 if is_disconnect: 

745 return False 

746 else: 

747 raise 

748 

749 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool: 

750 cursor = dbapi_connection.cursor() 

751 try: 

752 cursor.execute(self._dialect_specific_select_one) 

753 finally: 

754 cursor.close() 

755 return True 

756 

757 def create_xid(self): 

758 """Create a random two-phase transaction ID. 

759 

760 This id will be passed to do_begin_twophase(), do_rollback_twophase(), 

761 do_commit_twophase(). Its format is unspecified. 

762 """ 

763 

764 return "_sa_%032x" % random.randint(0, 2**128) 

765 

766 def do_savepoint(self, connection, name): 

767 connection.execute(expression.SavepointClause(name)) 

768 

769 def do_rollback_to_savepoint(self, connection, name): 

770 connection.execute(expression.RollbackToSavepointClause(name)) 

771 

772 def do_release_savepoint(self, connection, name): 

773 connection.execute(expression.ReleaseSavepointClause(name)) 

774 

775 def _deliver_insertmanyvalues_batches( 

776 self, 

777 connection, 

778 cursor, 

779 statement, 

780 parameters, 

781 generic_setinputsizes, 

782 context, 

783 ): 

784 context = cast(DefaultExecutionContext, context) 

785 compiled = cast(SQLCompiler, context.compiled) 

786 

787 _composite_sentinel_proc: Sequence[ 

788 Optional[_ResultProcessorType[Any]] 

789 ] = () 

790 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None 

791 _sentinel_proc_initialized: bool = False 

792 

793 compiled_parameters = context.compiled_parameters 

794 

795 imv = compiled._insertmanyvalues 

796 assert imv is not None 

797 

798 is_returning: Final[bool] = bool(compiled.effective_returning) 

799 batch_size = context.execution_options.get( 

800 "insertmanyvalues_page_size", self.insertmanyvalues_page_size 

801 ) 

802 

803 if compiled.schema_translate_map: 

804 schema_translate_map = context.execution_options.get( 

805 "schema_translate_map", {} 

806 ) 

807 else: 

808 schema_translate_map = None 

809 

810 if is_returning: 

811 result: Optional[List[Any]] = [] 

812 context._insertmanyvalues_rows = result 

813 

814 sort_by_parameter_order = imv.sort_by_parameter_order 

815 

816 else: 

817 sort_by_parameter_order = False 

818 result = None 

819 

820 for imv_batch in compiled._deliver_insertmanyvalues_batches( 

821 statement, 

822 parameters, 

823 compiled_parameters, 

824 generic_setinputsizes, 

825 batch_size, 

826 sort_by_parameter_order, 

827 schema_translate_map, 

828 ): 

829 yield imv_batch 

830 

831 if is_returning: 

832 

833 try: 

834 rows = context.fetchall_for_returning(cursor) 

835 except BaseException as be: 

836 connection._handle_dbapi_exception( 

837 be, 

838 sql_util._long_statement(imv_batch.replaced_statement), 

839 imv_batch.replaced_parameters, 

840 None, 

841 context, 

842 is_sub_exec=True, 

843 ) 

844 

845 # I would have thought "is_returning: Final[bool]" 

846 # would have assured this but pylance thinks not 

847 assert result is not None 

848 

849 if imv.num_sentinel_columns and not imv_batch.is_downgraded: 

850 composite_sentinel = imv.num_sentinel_columns > 1 

851 if imv.implicit_sentinel: 

852 # for implicit sentinel, which is currently single-col 

853 # integer autoincrement, do a simple sort. 

854 assert not composite_sentinel 

855 result.extend( 

856 sorted(rows, key=operator.itemgetter(-1)) 

857 ) 

858 continue 

859 

860 # otherwise, create dictionaries to match up batches 

861 # with parameters 

862 assert imv.sentinel_param_keys 

863 assert imv.sentinel_columns 

864 

865 _nsc = imv.num_sentinel_columns 

866 

867 if not _sentinel_proc_initialized: 

868 if composite_sentinel: 

869 _composite_sentinel_proc = [ 

870 col.type._cached_result_processor( 

871 self, cursor_desc[1] 

872 ) 

873 for col, cursor_desc in zip( 

874 imv.sentinel_columns, 

875 cursor.description[-_nsc:], 

876 ) 

877 ] 

878 else: 

879 _scalar_sentinel_proc = ( 

880 imv.sentinel_columns[0] 

881 ).type._cached_result_processor( 

882 self, cursor.description[-1][1] 

883 ) 

884 _sentinel_proc_initialized = True 

885 

886 rows_by_sentinel: Union[ 

887 Dict[Tuple[Any, ...], Any], 

888 Dict[Any, Any], 

889 ] 

890 if composite_sentinel: 

891 rows_by_sentinel = { 

892 tuple( 

893 (proc(val) if proc else val) 

894 for val, proc in zip( 

895 row[-_nsc:], _composite_sentinel_proc 

896 ) 

897 ): row 

898 for row in rows 

899 } 

900 elif _scalar_sentinel_proc: 

901 rows_by_sentinel = { 

902 _scalar_sentinel_proc(row[-1]): row for row in rows 

903 } 

904 else: 

905 rows_by_sentinel = {row[-1]: row for row in rows} 

906 

907 if len(rows_by_sentinel) != len(imv_batch.batch): 

908 # see test_insert_exec.py:: 

909 # IMVSentinelTest::test_sentinel_incorrect_rowcount 

910 # for coverage / demonstration 

911 raise exc.InvalidRequestError( 

912 f"Sentinel-keyed result set did not produce " 

913 f"correct number of rows {len(imv_batch.batch)}; " 

914 "produced " 

915 f"{len(rows_by_sentinel)}. Please ensure the " 

916 "sentinel column is fully unique and populated in " 

917 "all cases." 

918 ) 

919 

920 try: 

921 ordered_rows = [ 

922 rows_by_sentinel[sentinel_keys] 

923 for sentinel_keys in imv_batch.sentinel_values 

924 ] 

925 except KeyError as ke: 

926 # see test_insert_exec.py:: 

927 # IMVSentinelTest::test_sentinel_cant_match_keys 

928 # for coverage / demonstration 

929 raise exc.InvalidRequestError( 

930 f"Can't match sentinel values in result set to " 

931 f"parameter sets; key {ke.args[0]!r} was not " 

932 "found. " 

933 "There may be a mismatch between the datatype " 

934 "passed to the DBAPI driver vs. that which it " 

935 "returns in a result row. Ensure the given " 

936 "Python value matches the expected result type " 

937 "*exactly*, taking care to not rely upon implicit " 

938 "conversions which may occur such as when using " 

939 "strings in place of UUID or integer values, etc. " 

940 ) from ke 

941 

942 result.extend(ordered_rows) 

943 

944 else: 

945 result.extend(rows) 

946 

947 def do_executemany(self, cursor, statement, parameters, context=None): 

948 cursor.executemany(statement, parameters) 

949 

950 def do_execute(self, cursor, statement, parameters, context=None): 

951 cursor.execute(statement, parameters) 

952 

953 def do_execute_no_params(self, cursor, statement, context=None): 

954 cursor.execute(statement) 

955 

956 def is_disconnect( 

957 self, 

958 e: DBAPIModule.Error, 

959 connection: Union[ 

960 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None 

961 ], 

962 cursor: Optional[interfaces.DBAPICursor], 

963 ) -> bool: 

964 return False 

965 

966 @util.memoized_instancemethod 

967 def _gen_allowed_isolation_levels(self, dbapi_conn): 

968 try: 

969 raw_levels = list(self.get_isolation_level_values(dbapi_conn)) 

970 except NotImplementedError: 

971 return None 

972 else: 

973 normalized_levels = [ 

974 level.replace("_", " ").upper() for level in raw_levels 

975 ] 

976 if raw_levels != normalized_levels: 

977 raise ValueError( 

978 f"Dialect {self.name!r} get_isolation_level_values() " 

979 f"method should return names as UPPERCASE using spaces, " 

980 f"not underscores; got " 

981 f"{sorted(set(raw_levels).difference(normalized_levels))}" 

982 ) 

983 return tuple(normalized_levels) 

984 

985 def _assert_and_set_isolation_level(self, dbapi_conn, level): 

986 level = level.replace("_", " ").upper() 

987 

988 _allowed_isolation_levels = self._gen_allowed_isolation_levels( 

989 dbapi_conn 

990 ) 

991 if ( 

992 _allowed_isolation_levels 

993 and level not in _allowed_isolation_levels 

994 ): 

995 raise exc.ArgumentError( 

996 f"Invalid value {level!r} for isolation_level. " 

997 f"Valid isolation levels for {self.name!r} are " 

998 f"{', '.join(_allowed_isolation_levels)}" 

999 ) 

1000 

1001 self.set_isolation_level(dbapi_conn, level) 

1002 

1003 def reset_isolation_level(self, dbapi_conn): 

1004 if self._on_connect_isolation_level is not None: 

1005 assert ( 

1006 self._on_connect_isolation_level == "AUTOCOMMIT" 

1007 or self._on_connect_isolation_level 

1008 == self.default_isolation_level 

1009 ) 

1010 self._assert_and_set_isolation_level( 

1011 dbapi_conn, self._on_connect_isolation_level 

1012 ) 

1013 else: 

1014 assert self.default_isolation_level is not None 

1015 self._assert_and_set_isolation_level( 

1016 dbapi_conn, 

1017 self.default_isolation_level, 

1018 ) 

1019 

1020 def normalize_name(self, name): 

1021 if name is None: 

1022 return None 

1023 

1024 name_lower = name.lower() 

1025 name_upper = name.upper() 

1026 

1027 if name_upper == name_lower: 

1028 # name has no upper/lower conversion, e.g. non-european characters. 

1029 # return unchanged 

1030 return name 

1031 elif name_upper == name and not ( 

1032 self.identifier_preparer._requires_quotes 

1033 )(name_lower): 

1034 # name is all uppercase and doesn't require quoting; normalize 

1035 # to all lower case 

1036 return name_lower 

1037 elif name_lower == name: 

1038 # name is all lower case, which if denormalized means we need to 

1039 # force quoting on it 

1040 return quoted_name(name, quote=True) 

1041 else: 

1042 # name is mixed case, means it will be quoted in SQL when used 

1043 # later, no normalizes 

1044 return name 

1045 

1046 def denormalize_name(self, name): 

1047 if name is None: 

1048 return None 

1049 

1050 name_lower = name.lower() 

1051 name_upper = name.upper() 

1052 

1053 if name_upper == name_lower: 

1054 # name has no upper/lower conversion, e.g. non-european characters. 

1055 # return unchanged 

1056 return name 

1057 elif name_lower == name and not ( 

1058 self.identifier_preparer._requires_quotes 

1059 )(name_lower): 

1060 name = name_upper 

1061 return name 

1062 

1063 def get_driver_connection(self, connection: DBAPIConnection) -> Any: 

1064 return connection 

1065 

1066 def _overrides_default(self, method): 

1067 return ( 

1068 getattr(type(self), method).__code__ 

1069 is not getattr(DefaultDialect, method).__code__ 

1070 ) 

1071 

1072 def _default_multi_reflect( 

1073 self, 

1074 single_tbl_method, 

1075 connection, 

1076 kind, 

1077 schema, 

1078 filter_names, 

1079 scope, 

1080 **kw, 

1081 ): 

1082 names_fns = [] 

1083 temp_names_fns = [] 

1084 if ObjectKind.TABLE in kind: 

1085 names_fns.append(self.get_table_names) 

1086 temp_names_fns.append(self.get_temp_table_names) 

1087 if ObjectKind.VIEW in kind: 

1088 names_fns.append(self.get_view_names) 

1089 temp_names_fns.append(self.get_temp_view_names) 

1090 if ObjectKind.MATERIALIZED_VIEW in kind: 

1091 names_fns.append(self.get_materialized_view_names) 

1092 # no temp materialized view at the moment 

1093 # temp_names_fns.append(self.get_temp_materialized_view_names) 

1094 

1095 unreflectable = kw.pop("unreflectable", {}) 

1096 

1097 if ( 

1098 filter_names 

1099 and scope is ObjectScope.ANY 

1100 and kind is ObjectKind.ANY 

1101 ): 

1102 # if names are given and no qualification on type of table 

1103 # (i.e. the Table(..., autoload) case), take the names as given, 

1104 # don't run names queries. If a table does not exit 

1105 # NoSuchTableError is raised and it's skipped 

1106 

1107 # this also suits the case for mssql where we can reflect 

1108 # individual temp tables but there's no temp_names_fn 

1109 names = filter_names 

1110 else: 

1111 names = [] 

1112 name_kw = {"schema": schema, **kw} 

1113 fns = [] 

1114 if ObjectScope.DEFAULT in scope: 

1115 fns.extend(names_fns) 

1116 if ObjectScope.TEMPORARY in scope: 

1117 fns.extend(temp_names_fns) 

1118 

1119 for fn in fns: 

1120 try: 

1121 names.extend(fn(connection, **name_kw)) 

1122 except NotImplementedError: 

1123 pass 

1124 

1125 if filter_names: 

1126 filter_names = set(filter_names) 

1127 

1128 # iterate over all the tables/views and call the single table method 

1129 for table in names: 

1130 if not filter_names or table in filter_names: 

1131 key = (schema, table) 

1132 try: 

1133 yield ( 

1134 key, 

1135 single_tbl_method( 

1136 connection, table, schema=schema, **kw 

1137 ), 

1138 ) 

1139 except exc.UnreflectableTableError as err: 

1140 if key not in unreflectable: 

1141 unreflectable[key] = err 

1142 except exc.NoSuchTableError: 

1143 pass 

1144 

1145 def get_multi_table_options(self, connection, **kw): 

1146 return self._default_multi_reflect( 

1147 self.get_table_options, connection, **kw 

1148 ) 

1149 

1150 def get_multi_columns(self, connection, **kw): 

1151 return self._default_multi_reflect(self.get_columns, connection, **kw) 

1152 

1153 def get_multi_pk_constraint(self, connection, **kw): 

1154 return self._default_multi_reflect( 

1155 self.get_pk_constraint, connection, **kw 

1156 ) 

1157 

1158 def get_multi_foreign_keys(self, connection, **kw): 

1159 return self._default_multi_reflect( 

1160 self.get_foreign_keys, connection, **kw 

1161 ) 

1162 

1163 def get_multi_indexes(self, connection, **kw): 

1164 return self._default_multi_reflect(self.get_indexes, connection, **kw) 

1165 

1166 def get_multi_unique_constraints(self, connection, **kw): 

1167 return self._default_multi_reflect( 

1168 self.get_unique_constraints, connection, **kw 

1169 ) 

1170 

1171 def get_multi_check_constraints(self, connection, **kw): 

1172 return self._default_multi_reflect( 

1173 self.get_check_constraints, connection, **kw 

1174 ) 

1175 

1176 def get_multi_table_comment(self, connection, **kw): 

1177 return self._default_multi_reflect( 

1178 self.get_table_comment, connection, **kw 

1179 ) 

1180 

1181 

1182class StrCompileDialect(DefaultDialect): 

1183 statement_compiler = compiler.StrSQLCompiler 

1184 ddl_compiler = compiler.DDLCompiler 

1185 type_compiler_cls = compiler.StrSQLTypeCompiler 

1186 preparer = compiler.IdentifierPreparer 

1187 

1188 insert_returning = True 

1189 update_returning = True 

1190 delete_returning = True 

1191 

1192 supports_statement_cache = True 

1193 

1194 supports_identity_columns = True 

1195 

1196 supports_sequences = True 

1197 sequences_optional = True 

1198 preexecute_autoincrement_sequences = False 

1199 

1200 supports_native_boolean = True 

1201 

1202 supports_multivalues_insert = True 

1203 supports_simple_order_by_label = True 

1204 

1205 

1206class DefaultExecutionContext(ExecutionContext): 

1207 isinsert = False 

1208 isupdate = False 

1209 isdelete = False 

1210 is_crud = False 

1211 is_text = False 

1212 isddl = False 

1213 

1214 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE 

1215 

1216 compiled: Optional[Compiled] = None 

1217 result_column_struct: Optional[ 

1218 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool] 

1219 ] = None 

1220 returned_default_rows: Optional[Sequence[Row[Any]]] = None 

1221 

1222 execution_options: _ExecuteOptions = util.EMPTY_DICT 

1223 

1224 cursor_fetch_strategy = _cursor._DEFAULT_FETCH 

1225 

1226 invoked_statement: Optional[Executable] = None 

1227 

1228 _is_implicit_returning = False 

1229 _is_explicit_returning = False 

1230 _is_supplemental_returning = False 

1231 _is_server_side = False 

1232 

1233 _soft_closed = False 

1234 

1235 _rowcount: Optional[int] = None 

1236 

1237 # a hook for SQLite's translation of 

1238 # result column names 

1239 # NOTE: pyhive is using this hook, can't remove it :( 

1240 _translate_colname: Optional[Callable[[str], str]] = None 

1241 

1242 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict() 

1243 """used by set_input_sizes(). 

1244 

1245 This collection comes from ``ExpandedState.parameter_expansion``. 

1246 

1247 """ 

1248 

1249 cache_hit = NO_CACHE_KEY 

1250 

1251 root_connection: Connection 

1252 _dbapi_connection: PoolProxiedConnection 

1253 dialect: Dialect 

1254 unicode_statement: str 

1255 cursor: DBAPICursor 

1256 compiled_parameters: List[_MutableCoreSingleExecuteParams] 

1257 parameters: _DBAPIMultiExecuteParams 

1258 extracted_parameters: Optional[Sequence[BindParameter[Any]]] 

1259 

1260 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) 

1261 

1262 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None 

1263 _num_sentinel_cols: int = 0 

1264 

1265 @classmethod 

1266 def _init_ddl( 

1267 cls, 

1268 dialect: Dialect, 

1269 connection: Connection, 

1270 dbapi_connection: PoolProxiedConnection, 

1271 execution_options: _ExecuteOptions, 

1272 compiled_ddl: DDLCompiler, 

1273 ) -> ExecutionContext: 

1274 """Initialize execution context for an ExecutableDDLElement 

1275 construct.""" 

1276 

1277 self = cls.__new__(cls) 

1278 self.root_connection = connection 

1279 self._dbapi_connection = dbapi_connection 

1280 self.dialect = connection.dialect 

1281 

1282 self.compiled = compiled = compiled_ddl 

1283 self.isddl = True 

1284 

1285 self.execution_options = execution_options 

1286 

1287 self.unicode_statement = str(compiled) 

1288 if compiled.schema_translate_map: 

1289 schema_translate_map = self.execution_options.get( 

1290 "schema_translate_map", {} 

1291 ) 

1292 

1293 rst = compiled.preparer._render_schema_translates 

1294 self.unicode_statement = rst( 

1295 self.unicode_statement, schema_translate_map 

1296 ) 

1297 

1298 self.statement = self.unicode_statement 

1299 

1300 self.cursor = self.create_cursor() 

1301 self.compiled_parameters = [] 

1302 

1303 if dialect.positional: 

1304 self.parameters = [dialect.execute_sequence_format()] 

1305 else: 

1306 self.parameters = [self._empty_dict_params] 

1307 

1308 return self 

1309 

1310 @classmethod 

1311 def _init_compiled( 

1312 cls, 

1313 dialect: Dialect, 

1314 connection: Connection, 

1315 dbapi_connection: PoolProxiedConnection, 

1316 execution_options: _ExecuteOptions, 

1317 compiled: SQLCompiler, 

1318 parameters: _CoreMultiExecuteParams, 

1319 invoked_statement: Executable, 

1320 extracted_parameters: Optional[Sequence[BindParameter[Any]]], 

1321 cache_hit: CacheStats = CacheStats.CACHING_DISABLED, 

1322 ) -> ExecutionContext: 

1323 """Initialize execution context for a Compiled construct.""" 

1324 

1325 self = cls.__new__(cls) 

1326 self.root_connection = connection 

1327 self._dbapi_connection = dbapi_connection 

1328 self.dialect = connection.dialect 

1329 self.extracted_parameters = extracted_parameters 

1330 self.invoked_statement = invoked_statement 

1331 self.compiled = compiled 

1332 self.cache_hit = cache_hit 

1333 

1334 self.execution_options = execution_options 

1335 

1336 self.result_column_struct = ( 

1337 compiled._result_columns, 

1338 compiled._ordered_columns, 

1339 compiled._textual_ordered_columns, 

1340 compiled._ad_hoc_textual, 

1341 compiled._loose_column_name_matching, 

1342 ) 

1343 

1344 self.isinsert = ii = compiled.isinsert 

1345 self.isupdate = iu = compiled.isupdate 

1346 self.isdelete = id_ = compiled.isdelete 

1347 self.is_text = compiled.isplaintext 

1348 

1349 if ii or iu or id_: 

1350 dml_statement = compiled.compile_state.statement # type: ignore 

1351 if TYPE_CHECKING: 

1352 assert isinstance(dml_statement, UpdateBase) 

1353 self.is_crud = True 

1354 self._is_explicit_returning = ier = bool(dml_statement._returning) 

1355 self._is_implicit_returning = iir = bool( 

1356 compiled.implicit_returning 

1357 ) 

1358 if iir and dml_statement._supplemental_returning: 

1359 self._is_supplemental_returning = True 

1360 

1361 # dont mix implicit and explicit returning 

1362 assert not (iir and ier) 

1363 

1364 if (ier or iir) and compiled.for_executemany: 

1365 if ii and not self.dialect.insert_executemany_returning: 

1366 raise exc.InvalidRequestError( 

1367 f"Dialect {self.dialect.dialect_description} with " 

1368 f"current server capabilities does not support " 

1369 "INSERT..RETURNING when executemany is used" 

1370 ) 

1371 elif ( 

1372 ii 

1373 and dml_statement._sort_by_parameter_order 

1374 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501 

1375 ): 

1376 raise exc.InvalidRequestError( 

1377 f"Dialect {self.dialect.dialect_description} with " 

1378 f"current server capabilities does not support " 

1379 "INSERT..RETURNING with deterministic row ordering " 

1380 "when executemany is used" 

1381 ) 

1382 elif ( 

1383 ii 

1384 and self.dialect.use_insertmanyvalues 

1385 and not compiled._insertmanyvalues 

1386 ): 

1387 raise exc.InvalidRequestError( 

1388 'Statement does not have "insertmanyvalues" ' 

1389 "enabled, can't use INSERT..RETURNING with " 

1390 "executemany in this case." 

1391 ) 

1392 elif iu and not self.dialect.update_executemany_returning: 

1393 raise exc.InvalidRequestError( 

1394 f"Dialect {self.dialect.dialect_description} with " 

1395 f"current server capabilities does not support " 

1396 "UPDATE..RETURNING when executemany is used" 

1397 ) 

1398 elif id_ and not self.dialect.delete_executemany_returning: 

1399 raise exc.InvalidRequestError( 

1400 f"Dialect {self.dialect.dialect_description} with " 

1401 f"current server capabilities does not support " 

1402 "DELETE..RETURNING when executemany is used" 

1403 ) 

1404 

1405 if not parameters: 

1406 self.compiled_parameters = [ 

1407 compiled.construct_params( 

1408 extracted_parameters=extracted_parameters, 

1409 escape_names=False, 

1410 ) 

1411 ] 

1412 else: 

1413 self.compiled_parameters = [ 

1414 compiled.construct_params( 

1415 m, 

1416 escape_names=False, 

1417 _group_number=grp, 

1418 extracted_parameters=extracted_parameters, 

1419 ) 

1420 for grp, m in enumerate(parameters) 

1421 ] 

1422 

1423 if len(parameters) > 1: 

1424 if self.isinsert and compiled._insertmanyvalues: 

1425 self.execute_style = ExecuteStyle.INSERTMANYVALUES 

1426 

1427 imv = compiled._insertmanyvalues 

1428 if imv.sentinel_columns is not None: 

1429 self._num_sentinel_cols = imv.num_sentinel_columns 

1430 else: 

1431 self.execute_style = ExecuteStyle.EXECUTEMANY 

1432 

1433 self.unicode_statement = compiled.string 

1434 

1435 self.cursor = self.create_cursor() 

1436 

1437 if self.compiled.insert_prefetch or self.compiled.update_prefetch: 

1438 self._process_execute_defaults() 

1439 

1440 processors = compiled._bind_processors 

1441 

1442 flattened_processors: Mapping[ 

1443 str, _BindProcessorType[Any] 

1444 ] = processors # type: ignore[assignment] 

1445 

1446 if compiled.literal_execute_params or compiled.post_compile_params: 

1447 if self.executemany: 

1448 raise exc.InvalidRequestError( 

1449 "'literal_execute' or 'expanding' parameters can't be " 

1450 "used with executemany()" 

1451 ) 

1452 

1453 expanded_state = compiled._process_parameters_for_postcompile( 

1454 self.compiled_parameters[0] 

1455 ) 

1456 

1457 # re-assign self.unicode_statement 

1458 self.unicode_statement = expanded_state.statement 

1459 

1460 self._expanded_parameters = expanded_state.parameter_expansion 

1461 

1462 flattened_processors = dict(processors) # type: ignore 

1463 flattened_processors.update(expanded_state.processors) 

1464 positiontup = expanded_state.positiontup 

1465 elif compiled.positional: 

1466 positiontup = self.compiled.positiontup 

1467 else: 

1468 positiontup = None 

1469 

1470 if compiled.schema_translate_map: 

1471 schema_translate_map = self.execution_options.get( 

1472 "schema_translate_map", {} 

1473 ) 

1474 rst = compiled.preparer._render_schema_translates 

1475 self.unicode_statement = rst( 

1476 self.unicode_statement, schema_translate_map 

1477 ) 

1478 

1479 # final self.unicode_statement is now assigned, encode if needed 

1480 # by dialect 

1481 self.statement = self.unicode_statement 

1482 

1483 # Convert the dictionary of bind parameter values 

1484 # into a dict or list to be sent to the DBAPI's 

1485 # execute() or executemany() method. 

1486 

1487 if compiled.positional: 

1488 core_positional_parameters: MutableSequence[Sequence[Any]] = [] 

1489 assert positiontup is not None 

1490 for compiled_params in self.compiled_parameters: 

1491 l_param: List[Any] = [ 

1492 ( 

1493 flattened_processors[key](compiled_params[key]) 

1494 if key in flattened_processors 

1495 else compiled_params[key] 

1496 ) 

1497 for key in positiontup 

1498 ] 

1499 core_positional_parameters.append( 

1500 dialect.execute_sequence_format(l_param) 

1501 ) 

1502 

1503 self.parameters = core_positional_parameters 

1504 else: 

1505 core_dict_parameters: MutableSequence[Dict[str, Any]] = [] 

1506 escaped_names = compiled.escaped_bind_names 

1507 

1508 # note that currently, "expanded" parameters will be present 

1509 # in self.compiled_parameters in their quoted form. This is 

1510 # slightly inconsistent with the approach taken as of 

1511 # #8056 where self.compiled_parameters is meant to contain unquoted 

1512 # param names. 

1513 d_param: Dict[str, Any] 

1514 for compiled_params in self.compiled_parameters: 

1515 if escaped_names: 

1516 d_param = { 

1517 escaped_names.get(key, key): ( 

1518 flattened_processors[key](compiled_params[key]) 

1519 if key in flattened_processors 

1520 else compiled_params[key] 

1521 ) 

1522 for key in compiled_params 

1523 } 

1524 else: 

1525 d_param = { 

1526 key: ( 

1527 flattened_processors[key](compiled_params[key]) 

1528 if key in flattened_processors 

1529 else compiled_params[key] 

1530 ) 

1531 for key in compiled_params 

1532 } 

1533 

1534 core_dict_parameters.append(d_param) 

1535 

1536 self.parameters = core_dict_parameters 

1537 

1538 return self 

1539 

1540 @classmethod 

1541 def _init_statement( 

1542 cls, 

1543 dialect: Dialect, 

1544 connection: Connection, 

1545 dbapi_connection: PoolProxiedConnection, 

1546 execution_options: _ExecuteOptions, 

1547 statement: str, 

1548 parameters: _DBAPIMultiExecuteParams, 

1549 ) -> ExecutionContext: 

1550 """Initialize execution context for a string SQL statement.""" 

1551 

1552 self = cls.__new__(cls) 

1553 self.root_connection = connection 

1554 self._dbapi_connection = dbapi_connection 

1555 self.dialect = connection.dialect 

1556 self.is_text = True 

1557 

1558 self.execution_options = execution_options 

1559 

1560 if not parameters: 

1561 if self.dialect.positional: 

1562 self.parameters = [dialect.execute_sequence_format()] 

1563 else: 

1564 self.parameters = [self._empty_dict_params] 

1565 elif isinstance(parameters[0], dialect.execute_sequence_format): 

1566 self.parameters = parameters 

1567 elif isinstance(parameters[0], dict): 

1568 self.parameters = parameters 

1569 else: 

1570 self.parameters = [ 

1571 dialect.execute_sequence_format(p) for p in parameters 

1572 ] 

1573 

1574 if len(parameters) > 1: 

1575 self.execute_style = ExecuteStyle.EXECUTEMANY 

1576 

1577 self.statement = self.unicode_statement = statement 

1578 

1579 self.cursor = self.create_cursor() 

1580 return self 

1581 

1582 @classmethod 

1583 def _init_default( 

1584 cls, 

1585 dialect: Dialect, 

1586 connection: Connection, 

1587 dbapi_connection: PoolProxiedConnection, 

1588 execution_options: _ExecuteOptions, 

1589 ) -> ExecutionContext: 

1590 """Initialize execution context for a ColumnDefault construct.""" 

1591 

1592 self = cls.__new__(cls) 

1593 self.root_connection = connection 

1594 self._dbapi_connection = dbapi_connection 

1595 self.dialect = connection.dialect 

1596 

1597 self.execution_options = execution_options 

1598 

1599 self.cursor = self.create_cursor() 

1600 return self 

1601 

1602 def _get_cache_stats(self) -> str: 

1603 if self.compiled is None: 

1604 return "raw sql" 

1605 

1606 now = perf_counter() 

1607 

1608 ch = self.cache_hit 

1609 

1610 gen_time = self.compiled._gen_time 

1611 assert gen_time is not None 

1612 

1613 if ch is NO_CACHE_KEY: 

1614 return "no key %.5fs" % (now - gen_time,) 

1615 elif ch is CACHE_HIT: 

1616 return "cached since %.4gs ago" % (now - gen_time,) 

1617 elif ch is CACHE_MISS: 

1618 return "generated in %.5fs" % (now - gen_time,) 

1619 elif ch is CACHING_DISABLED: 

1620 if "_cache_disable_reason" in self.execution_options: 

1621 return "caching disabled (%s) %.5fs " % ( 

1622 self.execution_options["_cache_disable_reason"], 

1623 now - gen_time, 

1624 ) 

1625 else: 

1626 return "caching disabled %.5fs" % (now - gen_time,) 

1627 elif ch is NO_DIALECT_SUPPORT: 

1628 return "dialect %s+%s does not support caching %.5fs" % ( 

1629 self.dialect.name, 

1630 self.dialect.driver, 

1631 now - gen_time, 

1632 ) 

1633 else: 

1634 return "unknown" 

1635 

1636 @property 

1637 def executemany(self): # type: ignore[override] 

1638 return self.execute_style in ( 

1639 ExecuteStyle.EXECUTEMANY, 

1640 ExecuteStyle.INSERTMANYVALUES, 

1641 ) 

1642 

1643 @util.memoized_property 

1644 def identifier_preparer(self): 

1645 if self.compiled: 

1646 return self.compiled.preparer 

1647 elif "schema_translate_map" in self.execution_options: 

1648 return self.dialect.identifier_preparer._with_schema_translate( 

1649 self.execution_options["schema_translate_map"] 

1650 ) 

1651 else: 

1652 return self.dialect.identifier_preparer 

1653 

1654 @util.memoized_property 

1655 def engine(self): 

1656 return self.root_connection.engine 

1657 

1658 @util.memoized_property 

1659 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1660 if TYPE_CHECKING: 

1661 assert isinstance(self.compiled, SQLCompiler) 

1662 return self.compiled.postfetch 

1663 

1664 @util.memoized_property 

1665 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1666 if TYPE_CHECKING: 

1667 assert isinstance(self.compiled, SQLCompiler) 

1668 if self.isinsert: 

1669 return self.compiled.insert_prefetch 

1670 elif self.isupdate: 

1671 return self.compiled.update_prefetch 

1672 else: 

1673 return () 

1674 

1675 @util.memoized_property 

1676 def no_parameters(self): 

1677 return self.execution_options.get("no_parameters", False) 

1678 

1679 def _execute_scalar( 

1680 self, 

1681 stmt: str, 

1682 type_: Optional[TypeEngine[Any]], 

1683 parameters: Optional[_DBAPISingleExecuteParams] = None, 

1684 ) -> Any: 

1685 """Execute a string statement on the current cursor, returning a 

1686 scalar result. 

1687 

1688 Used to fire off sequences, default phrases, and "select lastrowid" 

1689 types of statements individually or in the context of a parent INSERT 

1690 or UPDATE statement. 

1691 

1692 """ 

1693 

1694 conn = self.root_connection 

1695 

1696 if "schema_translate_map" in self.execution_options: 

1697 schema_translate_map = self.execution_options.get( 

1698 "schema_translate_map", {} 

1699 ) 

1700 

1701 rst = self.identifier_preparer._render_schema_translates 

1702 stmt = rst(stmt, schema_translate_map) 

1703 

1704 if not parameters: 

1705 if self.dialect.positional: 

1706 parameters = self.dialect.execute_sequence_format() 

1707 else: 

1708 parameters = {} 

1709 

1710 conn._cursor_execute(self.cursor, stmt, parameters, context=self) 

1711 row = self.cursor.fetchone() 

1712 if row is not None: 

1713 r = row[0] 

1714 else: 

1715 r = None 

1716 if type_ is not None: 

1717 # apply type post processors to the result 

1718 proc = type_._cached_result_processor( 

1719 self.dialect, self.cursor.description[0][1] 

1720 ) 

1721 if proc: 

1722 return proc(r) 

1723 return r 

1724 

1725 @util.memoized_property 

1726 def connection(self): 

1727 return self.root_connection 

1728 

1729 def _use_server_side_cursor(self): 

1730 if not self.dialect.supports_server_side_cursors: 

1731 return False 

1732 

1733 if self.dialect.server_side_cursors: 

1734 # this is deprecated 

1735 use_server_side = self.execution_options.get( 

1736 "stream_results", True 

1737 ) and ( 

1738 self.compiled 

1739 and isinstance(self.compiled.statement, expression.Selectable) 

1740 or ( 

1741 ( 

1742 not self.compiled 

1743 or isinstance( 

1744 self.compiled.statement, expression.TextClause 

1745 ) 

1746 ) 

1747 and self.unicode_statement 

1748 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement) 

1749 ) 

1750 ) 

1751 else: 

1752 use_server_side = self.execution_options.get( 

1753 "stream_results", False 

1754 ) 

1755 

1756 return use_server_side 

1757 

1758 def create_cursor(self) -> DBAPICursor: 

1759 if ( 

1760 # inlining initial preference checks for SS cursors 

1761 self.dialect.supports_server_side_cursors 

1762 and ( 

1763 self.execution_options.get("stream_results", False) 

1764 or ( 

1765 self.dialect.server_side_cursors 

1766 and self._use_server_side_cursor() 

1767 ) 

1768 ) 

1769 ): 

1770 self._is_server_side = True 

1771 return self.create_server_side_cursor() 

1772 else: 

1773 self._is_server_side = False 

1774 return self.create_default_cursor() 

1775 

1776 def fetchall_for_returning(self, cursor): 

1777 return cursor.fetchall() 

1778 

1779 def create_default_cursor(self) -> DBAPICursor: 

1780 return self._dbapi_connection.cursor() 

1781 

1782 def create_server_side_cursor(self) -> DBAPICursor: 

1783 raise NotImplementedError() 

1784 

1785 def pre_exec(self): 

1786 pass 

1787 

1788 def get_out_parameter_values(self, names): 

1789 raise NotImplementedError( 

1790 "This dialect does not support OUT parameters" 

1791 ) 

1792 

1793 def post_exec(self): 

1794 pass 

1795 

1796 def get_result_processor(self, type_, colname, coltype): 

1797 """Return a 'result processor' for a given type as present in 

1798 cursor.description. 

1799 

1800 This has a default implementation that dialects can override 

1801 for context-sensitive result type handling. 

1802 

1803 """ 

1804 return type_._cached_result_processor(self.dialect, coltype) 

1805 

1806 def get_lastrowid(self): 

1807 """return self.cursor.lastrowid, or equivalent, after an INSERT. 

1808 

1809 This may involve calling special cursor functions, issuing a new SELECT 

1810 on the cursor (or a new one), or returning a stored value that was 

1811 calculated within post_exec(). 

1812 

1813 This function will only be called for dialects which support "implicit" 

1814 primary key generation, keep preexecute_autoincrement_sequences set to 

1815 False, and when no explicit id value was bound to the statement. 

1816 

1817 The function is called once for an INSERT statement that would need to 

1818 return the last inserted primary key for those dialects that make use 

1819 of the lastrowid concept. In these cases, it is called directly after 

1820 :meth:`.ExecutionContext.post_exec`. 

1821 

1822 """ 

1823 return self.cursor.lastrowid 

1824 

1825 def handle_dbapi_exception(self, e): 

1826 pass 

1827 

1828 @util.non_memoized_property 

1829 def rowcount(self) -> int: 

1830 if self._rowcount is not None: 

1831 return self._rowcount 

1832 else: 

1833 return self.cursor.rowcount 

1834 

1835 @property 

1836 def _has_rowcount(self): 

1837 return self._rowcount is not None 

1838 

1839 def supports_sane_rowcount(self): 

1840 return self.dialect.supports_sane_rowcount 

1841 

1842 def supports_sane_multi_rowcount(self): 

1843 return self.dialect.supports_sane_multi_rowcount 

1844 

1845 def _setup_result_proxy(self): 

1846 exec_opt = self.execution_options 

1847 

1848 if self._rowcount is None and exec_opt.get("preserve_rowcount", False): 

1849 self._rowcount = self.cursor.rowcount 

1850 

1851 yp: Optional[Union[int, bool]] 

1852 if self.is_crud or self.is_text: 

1853 result = self._setup_dml_or_text_result() 

1854 yp = False 

1855 else: 

1856 yp = exec_opt.get("yield_per", None) 

1857 sr = self._is_server_side or exec_opt.get("stream_results", False) 

1858 strategy = self.cursor_fetch_strategy 

1859 if sr and strategy is _cursor._DEFAULT_FETCH: 

1860 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1861 self.cursor, self.execution_options 

1862 ) 

1863 cursor_description: _DBAPICursorDescription = ( 

1864 strategy.alternate_cursor_description 

1865 or self.cursor.description 

1866 ) 

1867 if cursor_description is None: 

1868 strategy = _cursor._NO_CURSOR_DQL 

1869 

1870 result = _cursor.CursorResult(self, strategy, cursor_description) 

1871 

1872 compiled = self.compiled 

1873 

1874 if ( 

1875 compiled 

1876 and not self.isddl 

1877 and cast(SQLCompiler, compiled).has_out_parameters 

1878 ): 

1879 self._setup_out_parameters(result) 

1880 

1881 self._soft_closed = result._soft_closed 

1882 

1883 if yp: 

1884 result = result.yield_per(yp) 

1885 

1886 return result 

1887 

1888 def _setup_out_parameters(self, result): 

1889 compiled = cast(SQLCompiler, self.compiled) 

1890 

1891 out_bindparams = [ 

1892 (param, name) 

1893 for param, name in compiled.bind_names.items() 

1894 if param.isoutparam 

1895 ] 

1896 out_parameters = {} 

1897 

1898 for bindparam, raw_value in zip( 

1899 [param for param, name in out_bindparams], 

1900 self.get_out_parameter_values( 

1901 [name for param, name in out_bindparams] 

1902 ), 

1903 ): 

1904 type_ = bindparam.type 

1905 impl_type = type_.dialect_impl(self.dialect) 

1906 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi) 

1907 result_processor = impl_type.result_processor( 

1908 self.dialect, dbapi_type 

1909 ) 

1910 if result_processor is not None: 

1911 raw_value = result_processor(raw_value) 

1912 out_parameters[bindparam.key] = raw_value 

1913 

1914 result.out_parameters = out_parameters 

1915 

1916 def _setup_dml_or_text_result(self): 

1917 compiled = cast(SQLCompiler, self.compiled) 

1918 

1919 strategy: ResultFetchStrategy = self.cursor_fetch_strategy 

1920 

1921 if self.isinsert: 

1922 if ( 

1923 self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1924 and compiled.effective_returning 

1925 ): 

1926 strategy = _cursor.FullyBufferedCursorFetchStrategy( 

1927 self.cursor, 

1928 initial_buffer=self._insertmanyvalues_rows, 

1929 # maintain alt cursor description if set by the 

1930 # dialect, e.g. mssql preserves it 

1931 alternate_description=( 

1932 strategy.alternate_cursor_description 

1933 ), 

1934 ) 

1935 

1936 if compiled.postfetch_lastrowid: 

1937 self.inserted_primary_key_rows = ( 

1938 self._setup_ins_pk_from_lastrowid() 

1939 ) 

1940 # else if not self._is_implicit_returning, 

1941 # the default inserted_primary_key_rows accessor will 

1942 # return an "empty" primary key collection when accessed. 

1943 

1944 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: 

1945 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1946 self.cursor, self.execution_options 

1947 ) 

1948 

1949 if strategy is _cursor._NO_CURSOR_DML: 

1950 cursor_description = None 

1951 else: 

1952 cursor_description = ( 

1953 strategy.alternate_cursor_description 

1954 or self.cursor.description 

1955 ) 

1956 

1957 if cursor_description is None: 

1958 strategy = _cursor._NO_CURSOR_DML 

1959 elif self._num_sentinel_cols: 

1960 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1961 # strip out the sentinel columns from cursor description 

1962 # a similar logic is done to the rows only in CursorResult 

1963 cursor_description = cursor_description[ 

1964 0 : -self._num_sentinel_cols 

1965 ] 

1966 

1967 result: _cursor.CursorResult[Any] = _cursor.CursorResult( 

1968 self, strategy, cursor_description 

1969 ) 

1970 

1971 if self.isinsert: 

1972 if self._is_implicit_returning: 

1973 rows = result.all() 

1974 

1975 self.returned_default_rows = rows 

1976 

1977 self.inserted_primary_key_rows = ( 

1978 self._setup_ins_pk_from_implicit_returning(result, rows) 

1979 ) 

1980 

1981 # test that it has a cursor metadata that is accurate. the 

1982 # first row will have been fetched and current assumptions 

1983 # are that the result has only one row, until executemany() 

1984 # support is added here. 

1985 assert result._metadata.returns_rows 

1986 

1987 # Insert statement has both return_defaults() and 

1988 # returning(). rewind the result on the list of rows 

1989 # we just used. 

1990 if self._is_supplemental_returning: 

1991 result._rewind(rows) 

1992 else: 

1993 result._soft_close() 

1994 elif not self._is_explicit_returning: 

1995 result._soft_close() 

1996 

1997 # we assume here the result does not return any rows. 

1998 # *usually*, this will be true. However, some dialects 

1999 # such as that of MSSQL/pyodbc need to SELECT a post fetch 

2000 # function so this is not necessarily true. 

2001 # assert not result.returns_rows 

2002 

2003 elif self._is_implicit_returning: 

2004 rows = result.all() 

2005 

2006 if rows: 

2007 self.returned_default_rows = rows 

2008 self._rowcount = len(rows) 

2009 

2010 if self._is_supplemental_returning: 

2011 result._rewind(rows) 

2012 else: 

2013 result._soft_close() 

2014 

2015 # test that it has a cursor metadata that is accurate. 

2016 # the rows have all been fetched however. 

2017 assert result._metadata.returns_rows 

2018 

2019 elif not result._metadata.returns_rows: 

2020 # no results, get rowcount 

2021 # (which requires open cursor on some drivers) 

2022 if self._rowcount is None: 

2023 self._rowcount = self.cursor.rowcount 

2024 result._soft_close() 

2025 elif self.isupdate or self.isdelete: 

2026 if self._rowcount is None: 

2027 self._rowcount = self.cursor.rowcount 

2028 return result 

2029 

2030 @util.memoized_property 

2031 def inserted_primary_key_rows(self): 

2032 # if no specific "get primary key" strategy was set up 

2033 # during execution, return a "default" primary key based 

2034 # on what's in the compiled_parameters and nothing else. 

2035 return self._setup_ins_pk_from_empty() 

2036 

2037 def _setup_ins_pk_from_lastrowid(self): 

2038 getter = cast( 

2039 SQLCompiler, self.compiled 

2040 )._inserted_primary_key_from_lastrowid_getter 

2041 lastrowid = self.get_lastrowid() 

2042 return [getter(lastrowid, self.compiled_parameters[0])] 

2043 

2044 def _setup_ins_pk_from_empty(self): 

2045 getter = cast( 

2046 SQLCompiler, self.compiled 

2047 )._inserted_primary_key_from_lastrowid_getter 

2048 return [getter(None, param) for param in self.compiled_parameters] 

2049 

2050 def _setup_ins_pk_from_implicit_returning(self, result, rows): 

2051 if not rows: 

2052 return [] 

2053 

2054 getter = cast( 

2055 SQLCompiler, self.compiled 

2056 )._inserted_primary_key_from_returning_getter 

2057 compiled_params = self.compiled_parameters 

2058 

2059 return [ 

2060 getter(row, param) for row, param in zip(rows, compiled_params) 

2061 ] 

2062 

2063 def lastrow_has_defaults(self): 

2064 return (self.isinsert or self.isupdate) and bool( 

2065 cast(SQLCompiler, self.compiled).postfetch 

2066 ) 

2067 

2068 def _prepare_set_input_sizes( 

2069 self, 

2070 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]: 

2071 """Given a cursor and ClauseParameters, prepare arguments 

2072 in order to call the appropriate 

2073 style of ``setinputsizes()`` on the cursor, using DB-API types 

2074 from the bind parameter's ``TypeEngine`` objects. 

2075 

2076 This method only called by those dialects which set the 

2077 :attr:`.Dialect.bind_typing` attribute to 

2078 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are 

2079 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an 

2080 option. 

2081 

2082 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used 

2083 for pg8000 and asyncpg, which has been changed to inline rendering 

2084 of casts. 

2085 

2086 """ 

2087 if self.isddl or self.is_text: 

2088 return None 

2089 

2090 compiled = cast(SQLCompiler, self.compiled) 

2091 

2092 inputsizes = compiled._get_set_input_sizes_lookup() 

2093 

2094 if inputsizes is None: 

2095 return None 

2096 

2097 dialect = self.dialect 

2098 

2099 # all of the rest of this... cython? 

2100 

2101 if dialect._has_events: 

2102 inputsizes = dict(inputsizes) 

2103 dialect.dispatch.do_setinputsizes( 

2104 inputsizes, self.cursor, self.statement, self.parameters, self 

2105 ) 

2106 

2107 if compiled.escaped_bind_names: 

2108 escaped_bind_names = compiled.escaped_bind_names 

2109 else: 

2110 escaped_bind_names = None 

2111 

2112 if dialect.positional: 

2113 items = [ 

2114 (key, compiled.binds[key]) 

2115 for key in compiled.positiontup or () 

2116 ] 

2117 else: 

2118 items = [ 

2119 (key, bindparam) 

2120 for bindparam, key in compiled.bind_names.items() 

2121 ] 

2122 

2123 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = [] 

2124 for key, bindparam in items: 

2125 if bindparam in compiled.literal_execute_params: 

2126 continue 

2127 

2128 if key in self._expanded_parameters: 

2129 if is_tuple_type(bindparam.type): 

2130 num = len(bindparam.type.types) 

2131 dbtypes = inputsizes[bindparam] 

2132 generic_inputsizes.extend( 

2133 ( 

2134 ( 

2135 escaped_bind_names.get(paramname, paramname) 

2136 if escaped_bind_names is not None 

2137 else paramname 

2138 ), 

2139 dbtypes[idx % num], 

2140 bindparam.type.types[idx % num], 

2141 ) 

2142 for idx, paramname in enumerate( 

2143 self._expanded_parameters[key] 

2144 ) 

2145 ) 

2146 else: 

2147 dbtype = inputsizes.get(bindparam, None) 

2148 generic_inputsizes.extend( 

2149 ( 

2150 ( 

2151 escaped_bind_names.get(paramname, paramname) 

2152 if escaped_bind_names is not None 

2153 else paramname 

2154 ), 

2155 dbtype, 

2156 bindparam.type, 

2157 ) 

2158 for paramname in self._expanded_parameters[key] 

2159 ) 

2160 else: 

2161 dbtype = inputsizes.get(bindparam, None) 

2162 

2163 escaped_name = ( 

2164 escaped_bind_names.get(key, key) 

2165 if escaped_bind_names is not None 

2166 else key 

2167 ) 

2168 

2169 generic_inputsizes.append( 

2170 (escaped_name, dbtype, bindparam.type) 

2171 ) 

2172 

2173 return generic_inputsizes 

2174 

2175 def _exec_default(self, column, default, type_): 

2176 if default.is_sequence: 

2177 return self.fire_sequence(default, type_) 

2178 elif default.is_callable: 

2179 # this codepath is not normally used as it's inlined 

2180 # into _process_execute_defaults 

2181 self.current_column = column 

2182 return default.arg(self) 

2183 elif default.is_clause_element: 

2184 return self._exec_default_clause_element(column, default, type_) 

2185 else: 

2186 # this codepath is not normally used as it's inlined 

2187 # into _process_execute_defaults 

2188 return default.arg 

2189 

2190 def _exec_default_clause_element(self, column, default, type_): 

2191 # execute a default that's a complete clause element. Here, we have 

2192 # to re-implement a miniature version of the compile->parameters-> 

2193 # cursor.execute() sequence, since we don't want to modify the state 

2194 # of the connection / result in progress or create new connection/ 

2195 # result objects etc. 

2196 # .. versionchanged:: 1.4 

2197 

2198 if not default._arg_is_typed: 

2199 default_arg = expression.type_coerce(default.arg, type_) 

2200 else: 

2201 default_arg = default.arg 

2202 compiled = expression.select(default_arg).compile(dialect=self.dialect) 

2203 compiled_params = compiled.construct_params() 

2204 processors = compiled._bind_processors 

2205 if compiled.positional: 

2206 parameters = self.dialect.execute_sequence_format( 

2207 [ 

2208 ( 

2209 processors[key](compiled_params[key]) # type: ignore 

2210 if key in processors 

2211 else compiled_params[key] 

2212 ) 

2213 for key in compiled.positiontup or () 

2214 ] 

2215 ) 

2216 else: 

2217 parameters = { 

2218 key: ( 

2219 processors[key](compiled_params[key]) # type: ignore 

2220 if key in processors 

2221 else compiled_params[key] 

2222 ) 

2223 for key in compiled_params 

2224 } 

2225 return self._execute_scalar( 

2226 str(compiled), type_, parameters=parameters 

2227 ) 

2228 

2229 current_parameters: Optional[_CoreSingleExecuteParams] = None 

2230 """A dictionary of parameters applied to the current row. 

2231 

2232 This attribute is only available in the context of a user-defined default 

2233 generation function, e.g. as described at :ref:`context_default_functions`. 

2234 It consists of a dictionary which includes entries for each column/value 

2235 pair that is to be part of the INSERT or UPDATE statement. The keys of the 

2236 dictionary will be the key value of each :class:`_schema.Column`, 

2237 which is usually 

2238 synonymous with the name. 

2239 

2240 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute 

2241 does not accommodate for the "multi-values" feature of the 

2242 :meth:`_expression.Insert.values` method. The 

2243 :meth:`.DefaultExecutionContext.get_current_parameters` method should be 

2244 preferred. 

2245 

2246 .. seealso:: 

2247 

2248 :meth:`.DefaultExecutionContext.get_current_parameters` 

2249 

2250 :ref:`context_default_functions` 

2251 

2252 """ 

2253 

2254 def get_current_parameters(self, isolate_multiinsert_groups=True): 

2255 """Return a dictionary of parameters applied to the current row. 

2256 

2257 This method can only be used in the context of a user-defined default 

2258 generation function, e.g. as described at 

2259 :ref:`context_default_functions`. When invoked, a dictionary is 

2260 returned which includes entries for each column/value pair that is part 

2261 of the INSERT or UPDATE statement. The keys of the dictionary will be 

2262 the key value of each :class:`_schema.Column`, 

2263 which is usually synonymous 

2264 with the name. 

2265 

2266 :param isolate_multiinsert_groups=True: indicates that multi-valued 

2267 INSERT constructs created using :meth:`_expression.Insert.values` 

2268 should be 

2269 handled by returning only the subset of parameters that are local 

2270 to the current column default invocation. When ``False``, the 

2271 raw parameters of the statement are returned including the 

2272 naming convention used in the case of multi-valued INSERT. 

2273 

2274 .. versionadded:: 1.2 added 

2275 :meth:`.DefaultExecutionContext.get_current_parameters` 

2276 which provides more functionality over the existing 

2277 :attr:`.DefaultExecutionContext.current_parameters` 

2278 attribute. 

2279 

2280 .. seealso:: 

2281 

2282 :attr:`.DefaultExecutionContext.current_parameters` 

2283 

2284 :ref:`context_default_functions` 

2285 

2286 """ 

2287 try: 

2288 parameters = self.current_parameters 

2289 column = self.current_column 

2290 except AttributeError: 

2291 raise exc.InvalidRequestError( 

2292 "get_current_parameters() can only be invoked in the " 

2293 "context of a Python side column default function" 

2294 ) 

2295 else: 

2296 assert column is not None 

2297 assert parameters is not None 

2298 compile_state = cast( 

2299 "DMLState", cast(SQLCompiler, self.compiled).compile_state 

2300 ) 

2301 assert compile_state is not None 

2302 if ( 

2303 isolate_multiinsert_groups 

2304 and dml.isinsert(compile_state) 

2305 and compile_state._has_multi_parameters 

2306 ): 

2307 if column._is_multiparam_column: 

2308 index = column.index + 1 

2309 d = {column.original.key: parameters[column.key]} 

2310 else: 

2311 d = {column.key: parameters[column.key]} 

2312 index = 0 

2313 assert compile_state._dict_parameters is not None 

2314 keys = compile_state._dict_parameters.keys() 

2315 d.update( 

2316 (key, parameters["%s_m%d" % (key, index)]) for key in keys 

2317 ) 

2318 return d 

2319 else: 

2320 return parameters 

2321 

2322 def get_insert_default(self, column): 

2323 if column.default is None: 

2324 return None 

2325 else: 

2326 return self._exec_default(column, column.default, column.type) 

2327 

2328 def get_update_default(self, column): 

2329 if column.onupdate is None: 

2330 return None 

2331 else: 

2332 return self._exec_default(column, column.onupdate, column.type) 

2333 

2334 def _process_execute_defaults(self): 

2335 compiled = cast(SQLCompiler, self.compiled) 

2336 

2337 key_getter = compiled._within_exec_param_key_getter 

2338 

2339 sentinel_counter = 0 

2340 

2341 if compiled.insert_prefetch: 

2342 prefetch_recs = [ 

2343 ( 

2344 c, 

2345 key_getter(c), 

2346 c._default_description_tuple, 

2347 self.get_insert_default, 

2348 ) 

2349 for c in compiled.insert_prefetch 

2350 ] 

2351 elif compiled.update_prefetch: 

2352 prefetch_recs = [ 

2353 ( 

2354 c, 

2355 key_getter(c), 

2356 c._onupdate_description_tuple, 

2357 self.get_update_default, 

2358 ) 

2359 for c in compiled.update_prefetch 

2360 ] 

2361 else: 

2362 prefetch_recs = [] 

2363 

2364 for param in self.compiled_parameters: 

2365 self.current_parameters = param 

2366 

2367 for ( 

2368 c, 

2369 param_key, 

2370 (arg, is_scalar, is_callable, is_sentinel), 

2371 fallback, 

2372 ) in prefetch_recs: 

2373 if is_sentinel: 

2374 param[param_key] = sentinel_counter 

2375 sentinel_counter += 1 

2376 elif is_scalar: 

2377 param[param_key] = arg 

2378 elif is_callable: 

2379 self.current_column = c 

2380 param[param_key] = arg(self) 

2381 else: 

2382 val = fallback(c) 

2383 if val is not None: 

2384 param[param_key] = val 

2385 

2386 del self.current_parameters 

2387 

2388 

2389DefaultDialect.execution_ctx_cls = DefaultExecutionContext