Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1053 statements  

1# engine/default.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Default implementations of per-dialect sqlalchemy.engine classes. 

10 

11These are semi-private implementation classes which are only of importance 

12to database dialect authors; dialects will usually use the classes here 

13as the base class for their own corresponding classes. 

14 

15""" 

16 

17from __future__ import annotations 

18 

19import functools 

20import operator 

21import random 

22import re 

23from time import perf_counter 

24import typing 

25from typing import Any 

26from typing import Callable 

27from typing import cast 

28from typing import Dict 

29from typing import Final 

30from typing import List 

31from typing import Literal 

32from typing import Mapping 

33from typing import MutableMapping 

34from typing import MutableSequence 

35from typing import Optional 

36from typing import Sequence 

37from typing import Set 

38from typing import Tuple 

39from typing import Type 

40from typing import TYPE_CHECKING 

41from typing import Union 

42import weakref 

43 

44from . import characteristics 

45from . import cursor as _cursor 

46from . import interfaces 

47from .base import Connection 

48from .interfaces import CacheStats 

49from .interfaces import DBAPICursor 

50from .interfaces import Dialect 

51from .interfaces import ExecuteStyle 

52from .interfaces import ExecutionContext 

53from .reflection import ObjectKind 

54from .reflection import ObjectScope 

55from .. import event 

56from .. import exc 

57from .. import pool 

58from .. import util 

59from ..sql import compiler 

60from ..sql import dml 

61from ..sql import expression 

62from ..sql import type_api 

63from ..sql import util as sql_util 

64from ..sql._typing import is_tuple_type 

65from ..sql.base import _NoArg 

66from ..sql.compiler import AggregateOrderByStyle 

67from ..sql.compiler import DDLCompiler 

68from ..sql.compiler import InsertmanyvaluesSentinelOpts 

69from ..sql.compiler import SQLCompiler 

70from ..sql.elements import quoted_name 

71from ..util.typing import TupleAny 

72from ..util.typing import Unpack 

73 

74if typing.TYPE_CHECKING: 

75 from .base import Engine 

76 from .cursor import ResultFetchStrategy 

77 from .interfaces import _CoreMultiExecuteParams 

78 from .interfaces import _CoreSingleExecuteParams 

79 from .interfaces import _DBAPICursorDescription 

80 from .interfaces import _DBAPIMultiExecuteParams 

81 from .interfaces import _DBAPISingleExecuteParams 

82 from .interfaces import _ExecuteOptions 

83 from .interfaces import _MutableCoreSingleExecuteParams 

84 from .interfaces import _ParamStyle 

85 from .interfaces import ConnectArgsType 

86 from .interfaces import DBAPIConnection 

87 from .interfaces import DBAPIModule 

88 from .interfaces import DBAPIType 

89 from .interfaces import IsolationLevel 

90 from .row import Row 

91 from .url import URL 

92 from ..event import _ListenerFnType 

93 from ..pool import Pool 

94 from ..pool import PoolProxiedConnection 

95 from ..sql import Executable 

96 from ..sql.compiler import Compiled 

97 from ..sql.compiler import Linting 

98 from ..sql.compiler import ResultColumnsEntry 

99 from ..sql.dml import DMLState 

100 from ..sql.dml import UpdateBase 

101 from ..sql.elements import BindParameter 

102 from ..sql.schema import Column 

103 from ..sql.type_api import _BindProcessorType 

104 from ..sql.type_api import _ResultProcessorType 

105 from ..sql.type_api import TypeEngine 

106 

107 

108# When we're handed literal SQL, ensure it's a SELECT query 

109SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE) 

110 

111 

112( 

113 CACHE_HIT, 

114 CACHE_MISS, 

115 CACHING_DISABLED, 

116 NO_CACHE_KEY, 

117 NO_DIALECT_SUPPORT, 

118) = list(CacheStats) 

119 

120 

121class DefaultDialect(Dialect): 

122 """Default implementation of Dialect""" 

123 

124 statement_compiler = compiler.SQLCompiler 

125 ddl_compiler = compiler.DDLCompiler 

126 type_compiler_cls = compiler.GenericTypeCompiler 

127 

128 preparer = compiler.IdentifierPreparer 

129 supports_alter = True 

130 supports_comments = False 

131 supports_constraint_comments = False 

132 inline_comments = False 

133 supports_statement_cache = True 

134 

135 div_is_floordiv = True 

136 

137 bind_typing = interfaces.BindTyping.NONE 

138 

139 include_set_input_sizes: Optional[Set[Any]] = None 

140 exclude_set_input_sizes: Optional[Set[Any]] = None 

141 

142 # the first value we'd get for an autoincrement column. 

143 default_sequence_base = 1 

144 

145 # most DBAPIs happy with this for execute(). 

146 # not cx_oracle. 

147 execute_sequence_format = tuple 

148 

149 supports_schemas = True 

150 supports_views = True 

151 supports_sequences = False 

152 sequences_optional = False 

153 preexecute_autoincrement_sequences = False 

154 supports_identity_columns = False 

155 postfetch_lastrowid = True 

156 favor_returning_over_lastrowid = False 

157 insert_null_pk_still_autoincrements = False 

158 update_returning = False 

159 delete_returning = False 

160 update_returning_multifrom = False 

161 delete_returning_multifrom = False 

162 insert_returning = False 

163 

164 aggregate_order_by_style = AggregateOrderByStyle.INLINE 

165 

166 cte_follows_insert = False 

167 

168 supports_native_enum = False 

169 supports_native_boolean = False 

170 supports_native_uuid = False 

171 returns_native_bytes = False 

172 

173 non_native_boolean_check_constraint = True 

174 

175 supports_simple_order_by_label = True 

176 

177 tuple_in_values = False 

178 

179 connection_characteristics = util.immutabledict( 

180 { 

181 "isolation_level": characteristics.IsolationLevelCharacteristic(), 

182 "logging_token": characteristics.LoggingTokenCharacteristic(), 

183 } 

184 ) 

185 

186 engine_config_types: Mapping[str, Any] = util.immutabledict( 

187 { 

188 "pool_timeout": util.asint, 

189 "echo": util.bool_or_str("debug"), 

190 "echo_pool": util.bool_or_str("debug"), 

191 "pool_recycle": util.asint, 

192 "pool_size": util.asint, 

193 "max_overflow": util.asint, 

194 "future": util.asbool, 

195 } 

196 ) 

197 

198 # if the NUMERIC type 

199 # returns decimal.Decimal. 

200 # *not* the FLOAT type however. 

201 supports_native_decimal = False 

202 

203 name = "default" 

204 

205 # length at which to truncate 

206 # any identifier. 

207 max_identifier_length = 9999 

208 _user_defined_max_identifier_length: Optional[int] = None 

209 

210 isolation_level: Optional[str] = None 

211 

212 # sub-categories of max_identifier_length. 

213 # currently these accommodate for MySQL which allows alias names 

214 # of 255 but DDL names only of 64. 

215 max_index_name_length: Optional[int] = None 

216 max_constraint_name_length: Optional[int] = None 

217 

218 supports_sane_rowcount = True 

219 supports_sane_multi_rowcount = True 

220 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {} 

221 default_paramstyle = "named" 

222 

223 supports_default_values = False 

224 """dialect supports INSERT... DEFAULT VALUES syntax""" 

225 

226 supports_default_metavalue = False 

227 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

228 

229 default_metavalue_token = "DEFAULT" 

230 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

231 parenthesis.""" 

232 

233 # not sure if this is a real thing but the compiler will deliver it 

234 # if this is the only flag enabled. 

235 supports_empty_insert = True 

236 """dialect supports INSERT () VALUES ()""" 

237 

238 supports_multivalues_insert = False 

239 

240 use_insertmanyvalues: bool = False 

241 

242 use_insertmanyvalues_wo_returning: bool = False 

243 

244 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = ( 

245 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED 

246 ) 

247 

248 insertmanyvalues_page_size: int = 1000 

249 insertmanyvalues_max_parameters = 32700 

250 

251 supports_is_distinct_from = True 

252 

253 supports_server_side_cursors = False 

254 

255 server_side_cursors = False 

256 

257 # extra record-level locking features (#4860) 

258 supports_for_update_of = False 

259 

260 server_version_info = None 

261 

262 default_schema_name: Optional[str] = None 

263 

264 # indicates symbol names are 

265 # UPPERCASED if they are case insensitive 

266 # within the database. 

267 # if this is True, the methods normalize_name() 

268 # and denormalize_name() must be provided. 

269 requires_name_normalize = False 

270 

271 is_async = False 

272 

273 has_terminate = False 

274 

275 # TODO: this is not to be part of 2.0. implement rudimentary binary 

276 # literals for SQLite, PostgreSQL, MySQL only within 

277 # _Binary.literal_processor 

278 _legacy_binary_type_literal_encoding = "utf-8" 

279 

280 @util.deprecated_params( 

281 empty_in_strategy=( 

282 "1.4", 

283 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is " 

284 "deprecated, and no longer has any effect. All IN expressions " 

285 "are now rendered using " 

286 'the "expanding parameter" strategy which renders a set of bound' 

287 'expressions, or an "empty set" SELECT, at statement execution' 

288 "time.", 

289 ), 

290 server_side_cursors=( 

291 "1.4", 

292 "The :paramref:`_sa.create_engine.server_side_cursors` parameter " 

293 "is deprecated and will be removed in a future release. Please " 

294 "use the " 

295 ":paramref:`_engine.Connection.execution_options.stream_results` " 

296 "parameter.", 

297 ), 

298 ) 

299 def __init__( 

300 self, 

301 paramstyle: Optional[_ParamStyle] = None, 

302 isolation_level: Optional[IsolationLevel] = None, 

303 dbapi: Optional[DBAPIModule] = None, 

304 implicit_returning: Literal[True] = True, 

305 supports_native_boolean: Optional[bool] = None, 

306 max_identifier_length: Optional[int] = None, 

307 label_length: Optional[int] = None, 

308 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG, 

309 use_insertmanyvalues: Optional[bool] = None, 

310 # util.deprecated_params decorator cannot render the 

311 # Linting.NO_LINTING constant 

312 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore 

313 server_side_cursors: bool = False, 

314 skip_autocommit_rollback: bool = False, 

315 **kwargs: Any, 

316 ): 

317 if server_side_cursors: 

318 if not self.supports_server_side_cursors: 

319 raise exc.ArgumentError( 

320 "Dialect %s does not support server side cursors" % self 

321 ) 

322 else: 

323 self.server_side_cursors = True 

324 

325 if getattr(self, "use_setinputsizes", False): 

326 util.warn_deprecated( 

327 "The dialect-level use_setinputsizes attribute is " 

328 "deprecated. Please use " 

329 "bind_typing = BindTyping.SETINPUTSIZES", 

330 "2.0", 

331 ) 

332 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES 

333 

334 self.positional = False 

335 self._ischema = None 

336 

337 self.dbapi = dbapi 

338 

339 self.skip_autocommit_rollback = skip_autocommit_rollback 

340 

341 if paramstyle is not None: 

342 self.paramstyle = paramstyle 

343 elif self.dbapi is not None: 

344 self.paramstyle = self.dbapi.paramstyle 

345 else: 

346 self.paramstyle = self.default_paramstyle 

347 self.positional = self.paramstyle in ( 

348 "qmark", 

349 "format", 

350 "numeric", 

351 "numeric_dollar", 

352 ) 

353 self.identifier_preparer = self.preparer(self) 

354 self._on_connect_isolation_level = isolation_level 

355 

356 legacy_tt_callable = getattr(self, "type_compiler", None) 

357 if legacy_tt_callable is not None: 

358 tt_callable = cast( 

359 Type[compiler.GenericTypeCompiler], 

360 self.type_compiler, 

361 ) 

362 else: 

363 tt_callable = self.type_compiler_cls 

364 

365 self.type_compiler_instance = self.type_compiler = tt_callable(self) 

366 

367 if supports_native_boolean is not None: 

368 self.supports_native_boolean = supports_native_boolean 

369 

370 self._user_defined_max_identifier_length = max_identifier_length 

371 if self._user_defined_max_identifier_length: 

372 self.max_identifier_length = ( 

373 self._user_defined_max_identifier_length 

374 ) 

375 self.label_length = label_length 

376 self.compiler_linting = compiler_linting 

377 

378 if use_insertmanyvalues is not None: 

379 self.use_insertmanyvalues = use_insertmanyvalues 

380 

381 if insertmanyvalues_page_size is not _NoArg.NO_ARG: 

382 self.insertmanyvalues_page_size = insertmanyvalues_page_size 

383 

384 @property 

385 @util.deprecated( 

386 "2.0", 

387 "full_returning is deprecated, please use insert_returning, " 

388 "update_returning, delete_returning", 

389 ) 

390 def full_returning(self): 

391 return ( 

392 self.insert_returning 

393 and self.update_returning 

394 and self.delete_returning 

395 ) 

396 

397 @util.memoized_property 

398 def insert_executemany_returning(self): 

399 """Default implementation for insert_executemany_returning, if not 

400 otherwise overridden by the specific dialect. 

401 

402 The default dialect determines "insert_executemany_returning" is 

403 available if the dialect in use has opted into using the 

404 "use_insertmanyvalues" feature. If they haven't opted into that, then 

405 this attribute is False, unless the dialect in question overrides this 

406 and provides some other implementation (such as the Oracle Database 

407 dialects). 

408 

409 """ 

410 return self.insert_returning and self.use_insertmanyvalues 

411 

412 @util.memoized_property 

413 def insert_executemany_returning_sort_by_parameter_order(self): 

414 """Default implementation for 

415 insert_executemany_returning_deterministic_order, if not otherwise 

416 overridden by the specific dialect. 

417 

418 The default dialect determines "insert_executemany_returning" can have 

419 deterministic order only if the dialect in use has opted into using the 

420 "use_insertmanyvalues" feature, which implements deterministic ordering 

421 using client side sentinel columns only by default. The 

422 "insertmanyvalues" feature also features alternate forms that can 

423 use server-generated PK values as "sentinels", but those are only 

424 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel` 

425 bitflag enables those alternate SQL forms, which are disabled 

426 by default. 

427 

428 If the dialect in use hasn't opted into that, then this attribute is 

429 False, unless the dialect in question overrides this and provides some 

430 other implementation (such as the Oracle Database dialects). 

431 

432 """ 

433 return self.insert_returning and self.use_insertmanyvalues 

434 

435 update_executemany_returning = False 

436 delete_executemany_returning = False 

437 

438 @util.memoized_property 

439 def loaded_dbapi(self) -> DBAPIModule: 

440 if self.dbapi is None: 

441 raise exc.InvalidRequestError( 

442 f"Dialect {self} does not have a Python DBAPI established " 

443 "and cannot be used for actual database interaction" 

444 ) 

445 return self.dbapi 

446 

447 @util.memoized_property 

448 def _bind_typing_render_casts(self): 

449 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS 

450 

451 def _ensure_has_table_connection(self, arg: Connection) -> None: 

452 if not isinstance(arg, Connection): 

453 raise exc.ArgumentError( 

454 "The argument passed to Dialect.has_table() should be a " 

455 "%s, got %s. " 

456 "Additionally, the Dialect.has_table() method is for " 

457 "internal dialect " 

458 "use only; please use " 

459 "``inspect(some_engine).has_table(<tablename>>)`` " 

460 "for public API use." % (Connection, type(arg)) 

461 ) 

462 

463 @util.memoized_property 

464 def _supports_statement_cache(self): 

465 ssc = self.__class__.__dict__.get("supports_statement_cache", None) 

466 if ssc is None: 

467 util.warn( 

468 "Dialect %s:%s will not make use of SQL compilation caching " 

469 "as it does not set the 'supports_statement_cache' attribute " 

470 "to ``True``. This can have " 

471 "significant performance implications including some " 

472 "performance degradations in comparison to prior SQLAlchemy " 

473 "versions. Dialect maintainers should seek to set this " 

474 "attribute to True after appropriate development and testing " 

475 "for SQLAlchemy 1.4 caching support. Alternatively, this " 

476 "attribute may be set to False which will disable this " 

477 "warning." % (self.name, self.driver), 

478 code="cprf", 

479 ) 

480 

481 return bool(ssc) 

482 

483 @util.memoized_property 

484 def _type_memos(self): 

485 return weakref.WeakKeyDictionary() 

486 

487 @property 

488 def dialect_description(self): # type: ignore[override] 

489 return self.name + "+" + self.driver 

490 

491 @property 

492 def supports_sane_rowcount_returning(self): 

493 """True if this dialect supports sane rowcount even if RETURNING is 

494 in use. 

495 

496 For dialects that don't support RETURNING, this is synonymous with 

497 ``supports_sane_rowcount``. 

498 

499 """ 

500 return self.supports_sane_rowcount 

501 

502 @classmethod 

503 def get_pool_class(cls, url: URL) -> Type[Pool]: 

504 default: Type[pool.Pool] 

505 if cls.is_async: 

506 default = pool.AsyncAdaptedQueuePool 

507 else: 

508 default = pool.QueuePool 

509 

510 return getattr(cls, "poolclass", default) 

511 

512 def get_dialect_pool_class(self, url: URL) -> Type[Pool]: 

513 return self.get_pool_class(url) 

514 

515 @classmethod 

516 def load_provisioning(cls): 

517 package = ".".join(cls.__module__.split(".")[0:-1]) 

518 try: 

519 __import__(package + ".provision") 

520 except ImportError: 

521 pass 

522 

523 def _builtin_onconnect(self) -> Optional[_ListenerFnType]: 

524 if self._on_connect_isolation_level is not None: 

525 

526 def builtin_connect(dbapi_conn, conn_rec): 

527 self._assert_and_set_isolation_level( 

528 dbapi_conn, self._on_connect_isolation_level 

529 ) 

530 

531 return builtin_connect 

532 else: 

533 return None 

534 

535 def initialize(self, connection: Connection) -> None: 

536 try: 

537 self.server_version_info = self._get_server_version_info( 

538 connection 

539 ) 

540 except NotImplementedError: 

541 self.server_version_info = None 

542 try: 

543 self.default_schema_name = self._get_default_schema_name( 

544 connection 

545 ) 

546 except NotImplementedError: 

547 self.default_schema_name = None 

548 

549 try: 

550 self.default_isolation_level = self.get_default_isolation_level( 

551 connection.connection.dbapi_connection 

552 ) 

553 except NotImplementedError: 

554 self.default_isolation_level = None 

555 

556 if not self._user_defined_max_identifier_length: 

557 max_ident_length = self._check_max_identifier_length(connection) 

558 if max_ident_length: 

559 self.max_identifier_length = max_ident_length 

560 

561 if ( 

562 self.label_length 

563 and self.label_length > self.max_identifier_length 

564 ): 

565 raise exc.ArgumentError( 

566 "Label length of %d is greater than this dialect's" 

567 " maximum identifier length of %d" 

568 % (self.label_length, self.max_identifier_length) 

569 ) 

570 

571 def on_connect(self) -> Optional[Callable[[Any], None]]: 

572 # inherits the docstring from interfaces.Dialect.on_connect 

573 return None 

574 

575 def _check_max_identifier_length(self, connection): 

576 """Perform a connection / server version specific check to determine 

577 the max_identifier_length. 

578 

579 If the dialect's class level max_identifier_length should be used, 

580 can return None. 

581 

582 """ 

583 return None 

584 

585 def get_default_isolation_level(self, dbapi_conn): 

586 """Given a DBAPI connection, return its isolation level, or 

587 a default isolation level if one cannot be retrieved. 

588 

589 May be overridden by subclasses in order to provide a 

590 "fallback" isolation level for databases that cannot reliably 

591 retrieve the actual isolation level. 

592 

593 By default, calls the :meth:`_engine.Interfaces.get_isolation_level` 

594 method, propagating any exceptions raised. 

595 

596 """ 

597 return self.get_isolation_level(dbapi_conn) 

598 

599 def type_descriptor(self, typeobj): 

600 """Provide a database-specific :class:`.TypeEngine` object, given 

601 the generic object which comes from the types module. 

602 

603 This method looks for a dictionary called 

604 ``colspecs`` as a class or instance-level variable, 

605 and passes on to :func:`_types.adapt_type`. 

606 

607 """ 

608 return type_api.adapt_type(typeobj, self.colspecs) 

609 

610 def has_index(self, connection, table_name, index_name, schema=None, **kw): 

611 if not self.has_table(connection, table_name, schema=schema, **kw): 

612 return False 

613 for idx in self.get_indexes( 

614 connection, table_name, schema=schema, **kw 

615 ): 

616 if idx["name"] == index_name: 

617 return True 

618 else: 

619 return False 

620 

621 def has_schema( 

622 self, connection: Connection, schema_name: str, **kw: Any 

623 ) -> bool: 

624 return schema_name in self.get_schema_names(connection, **kw) 

625 

626 def validate_identifier(self, ident: str) -> None: 

627 if len(ident) > self.max_identifier_length: 

628 raise exc.IdentifierError( 

629 "Identifier '%s' exceeds maximum length of %d characters" 

630 % (ident, self.max_identifier_length) 

631 ) 

632 

633 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection: 

634 # inherits the docstring from interfaces.Dialect.connect 

635 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501 

636 

637 def create_connect_args(self, url: URL) -> ConnectArgsType: 

638 # inherits the docstring from interfaces.Dialect.create_connect_args 

639 opts = url.translate_connect_args() 

640 opts.update(url.query) 

641 return ([], opts) 

642 

643 def set_engine_execution_options( 

644 self, engine: Engine, opts: Mapping[str, Any] 

645 ) -> None: 

646 supported_names = set(self.connection_characteristics).intersection( 

647 opts 

648 ) 

649 if supported_names: 

650 characteristics: Mapping[str, Any] = util.immutabledict( 

651 (name, opts[name]) for name in supported_names 

652 ) 

653 

654 @event.listens_for(engine, "engine_connect") 

655 def set_connection_characteristics(connection): 

656 self._set_connection_characteristics( 

657 connection, characteristics 

658 ) 

659 

660 def set_connection_execution_options( 

661 self, connection: Connection, opts: Mapping[str, Any] 

662 ) -> None: 

663 supported_names = set(self.connection_characteristics).intersection( 

664 opts 

665 ) 

666 if supported_names: 

667 characteristics: Mapping[str, Any] = util.immutabledict( 

668 (name, opts[name]) for name in supported_names 

669 ) 

670 self._set_connection_characteristics(connection, characteristics) 

671 

672 def _set_connection_characteristics(self, connection, characteristics): 

673 characteristic_values = [ 

674 (name, self.connection_characteristics[name], value) 

675 for name, value in characteristics.items() 

676 ] 

677 

678 if connection.in_transaction(): 

679 trans_objs = [ 

680 (name, obj) 

681 for name, obj, _ in characteristic_values 

682 if obj.transactional 

683 ] 

684 if trans_objs: 

685 raise exc.InvalidRequestError( 

686 "This connection has already initialized a SQLAlchemy " 

687 "Transaction() object via begin() or autobegin; " 

688 "%s may not be altered unless rollback() or commit() " 

689 "is called first." 

690 % (", ".join(name for name, obj in trans_objs)) 

691 ) 

692 

693 dbapi_connection = connection.connection.dbapi_connection 

694 for _, characteristic, value in characteristic_values: 

695 characteristic.set_connection_characteristic( 

696 self, connection, dbapi_connection, value 

697 ) 

698 connection.connection._connection_record.finalize_callback.append( 

699 functools.partial(self._reset_characteristics, characteristics) 

700 ) 

701 

702 def _reset_characteristics(self, characteristics, dbapi_connection): 

703 for characteristic_name in characteristics: 

704 characteristic = self.connection_characteristics[ 

705 characteristic_name 

706 ] 

707 characteristic.reset_characteristic(self, dbapi_connection) 

708 

709 def do_begin(self, dbapi_connection): 

710 pass 

711 

712 def do_rollback(self, dbapi_connection): 

713 if self.skip_autocommit_rollback and self.detect_autocommit_setting( 

714 dbapi_connection 

715 ): 

716 return 

717 dbapi_connection.rollback() 

718 

719 def do_commit(self, dbapi_connection): 

720 dbapi_connection.commit() 

721 

722 def do_terminate(self, dbapi_connection): 

723 self.do_close(dbapi_connection) 

724 

725 def do_close(self, dbapi_connection): 

726 dbapi_connection.close() 

727 

728 @util.memoized_property 

729 def _dialect_specific_select_one(self): 

730 return str(expression.select(1).compile(dialect=self)) 

731 

732 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: 

733 try: 

734 return self.do_ping(dbapi_connection) 

735 except self.loaded_dbapi.Error as err: 

736 is_disconnect = self.is_disconnect(err, dbapi_connection, None) 

737 

738 if self._has_events: 

739 try: 

740 Connection._handle_dbapi_exception_noconnection( 

741 err, 

742 self, 

743 is_disconnect=is_disconnect, 

744 invalidate_pool_on_disconnect=False, 

745 is_pre_ping=True, 

746 ) 

747 except exc.StatementError as new_err: 

748 is_disconnect = new_err.connection_invalidated 

749 

750 if is_disconnect: 

751 return False 

752 else: 

753 raise 

754 

755 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool: 

756 cursor = dbapi_connection.cursor() 

757 try: 

758 cursor.execute(self._dialect_specific_select_one) 

759 finally: 

760 cursor.close() 

761 return True 

762 

763 def create_xid(self): 

764 """Create a random two-phase transaction ID. 

765 

766 This id will be passed to do_begin_twophase(), do_rollback_twophase(), 

767 do_commit_twophase(). Its format is unspecified. 

768 """ 

769 

770 return "_sa_%032x" % random.randint(0, 2**128) 

771 

772 def do_savepoint(self, connection, name): 

773 connection.execute(expression.SavepointClause(name)) 

774 

775 def do_rollback_to_savepoint(self, connection, name): 

776 connection.execute(expression.RollbackToSavepointClause(name)) 

777 

778 def do_release_savepoint(self, connection, name): 

779 connection.execute(expression.ReleaseSavepointClause(name)) 

780 

781 def _deliver_insertmanyvalues_batches( 

782 self, 

783 connection, 

784 cursor, 

785 statement, 

786 parameters, 

787 generic_setinputsizes, 

788 context, 

789 ): 

790 context = cast(DefaultExecutionContext, context) 

791 compiled = cast(SQLCompiler, context.compiled) 

792 

793 _composite_sentinel_proc: Sequence[ 

794 Optional[_ResultProcessorType[Any]] 

795 ] = () 

796 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None 

797 _sentinel_proc_initialized: bool = False 

798 

799 compiled_parameters = context.compiled_parameters 

800 

801 imv = compiled._insertmanyvalues 

802 assert imv is not None 

803 

804 is_returning: Final[bool] = bool(compiled.effective_returning) 

805 batch_size = context.execution_options.get( 

806 "insertmanyvalues_page_size", self.insertmanyvalues_page_size 

807 ) 

808 

809 if compiled.schema_translate_map: 

810 schema_translate_map = context.execution_options.get( 

811 "schema_translate_map", {} 

812 ) 

813 else: 

814 schema_translate_map = None 

815 

816 if is_returning: 

817 result: Optional[List[Any]] = [] 

818 context._insertmanyvalues_rows = result 

819 

820 sort_by_parameter_order = imv.sort_by_parameter_order 

821 

822 else: 

823 sort_by_parameter_order = False 

824 result = None 

825 

826 for imv_batch in compiled._deliver_insertmanyvalues_batches( 

827 statement, 

828 parameters, 

829 compiled_parameters, 

830 generic_setinputsizes, 

831 batch_size, 

832 sort_by_parameter_order, 

833 schema_translate_map, 

834 ): 

835 yield imv_batch 

836 

837 if is_returning: 

838 

839 try: 

840 rows = context.fetchall_for_returning(cursor) 

841 except BaseException as be: 

842 connection._handle_dbapi_exception( 

843 be, 

844 sql_util._long_statement(imv_batch.replaced_statement), 

845 imv_batch.replaced_parameters, 

846 None, 

847 context, 

848 is_sub_exec=True, 

849 ) 

850 

851 # I would have thought "is_returning: Final[bool]" 

852 # would have assured this but pylance thinks not 

853 assert result is not None 

854 

855 if imv.num_sentinel_columns and not imv_batch.is_downgraded: 

856 composite_sentinel = imv.num_sentinel_columns > 1 

857 if imv.implicit_sentinel: 

858 # for implicit sentinel, which is currently single-col 

859 # integer autoincrement, do a simple sort. 

860 assert not composite_sentinel 

861 result.extend( 

862 sorted(rows, key=operator.itemgetter(-1)) 

863 ) 

864 continue 

865 

866 # otherwise, create dictionaries to match up batches 

867 # with parameters 

868 assert imv.sentinel_param_keys 

869 assert imv.sentinel_columns 

870 

871 _nsc = imv.num_sentinel_columns 

872 

873 if not _sentinel_proc_initialized: 

874 if composite_sentinel: 

875 _composite_sentinel_proc = [ 

876 col.type._cached_result_processor( 

877 self, cursor_desc[1] 

878 ) 

879 for col, cursor_desc in zip( 

880 imv.sentinel_columns, 

881 cursor.description[-_nsc:], 

882 ) 

883 ] 

884 else: 

885 _scalar_sentinel_proc = ( 

886 imv.sentinel_columns[0] 

887 ).type._cached_result_processor( 

888 self, cursor.description[-1][1] 

889 ) 

890 _sentinel_proc_initialized = True 

891 

892 rows_by_sentinel: Union[ 

893 Dict[Tuple[Any, ...], Any], 

894 Dict[Any, Any], 

895 ] 

896 if composite_sentinel: 

897 rows_by_sentinel = { 

898 tuple( 

899 (proc(val) if proc else val) 

900 for val, proc in zip( 

901 row[-_nsc:], _composite_sentinel_proc 

902 ) 

903 ): row 

904 for row in rows 

905 } 

906 elif _scalar_sentinel_proc: 

907 rows_by_sentinel = { 

908 _scalar_sentinel_proc(row[-1]): row for row in rows 

909 } 

910 else: 

911 rows_by_sentinel = {row[-1]: row for row in rows} 

912 

913 if len(rows_by_sentinel) != len(imv_batch.batch): 

914 # see test_insert_exec.py:: 

915 # IMVSentinelTest::test_sentinel_incorrect_rowcount 

916 # for coverage / demonstration 

917 raise exc.InvalidRequestError( 

918 f"Sentinel-keyed result set did not produce " 

919 f"correct number of rows {len(imv_batch.batch)}; " 

920 "produced " 

921 f"{len(rows_by_sentinel)}. Please ensure the " 

922 "sentinel column is fully unique and populated in " 

923 "all cases." 

924 ) 

925 

926 try: 

927 ordered_rows = [ 

928 rows_by_sentinel[sentinel_keys] 

929 for sentinel_keys in imv_batch.sentinel_values 

930 ] 

931 except KeyError as ke: 

932 # see test_insert_exec.py:: 

933 # IMVSentinelTest::test_sentinel_cant_match_keys 

934 # for coverage / demonstration 

935 raise exc.InvalidRequestError( 

936 f"Can't match sentinel values in result set to " 

937 f"parameter sets; key {ke.args[0]!r} was not " 

938 "found. " 

939 "There may be a mismatch between the datatype " 

940 "passed to the DBAPI driver vs. that which it " 

941 "returns in a result row. Ensure the given " 

942 "Python value matches the expected result type " 

943 "*exactly*, taking care to not rely upon implicit " 

944 "conversions which may occur such as when using " 

945 "strings in place of UUID or integer values, etc. " 

946 ) from ke 

947 

948 result.extend(ordered_rows) 

949 

950 else: 

951 result.extend(rows) 

952 

953 def do_executemany(self, cursor, statement, parameters, context=None): 

954 cursor.executemany(statement, parameters) 

955 

956 def do_execute(self, cursor, statement, parameters, context=None): 

957 cursor.execute(statement, parameters) 

958 

959 def do_execute_no_params(self, cursor, statement, context=None): 

960 cursor.execute(statement) 

961 

962 def is_disconnect( 

963 self, 

964 e: DBAPIModule.Error, 

965 connection: Union[ 

966 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None 

967 ], 

968 cursor: Optional[interfaces.DBAPICursor], 

969 ) -> bool: 

970 return False 

971 

972 @util.memoized_instancemethod 

973 def _gen_allowed_isolation_levels(self, dbapi_conn): 

974 try: 

975 raw_levels = list(self.get_isolation_level_values(dbapi_conn)) 

976 except NotImplementedError: 

977 return None 

978 else: 

979 normalized_levels = [ 

980 level.replace("_", " ").upper() for level in raw_levels 

981 ] 

982 if raw_levels != normalized_levels: 

983 raise ValueError( 

984 f"Dialect {self.name!r} get_isolation_level_values() " 

985 f"method should return names as UPPERCASE using spaces, " 

986 f"not underscores; got " 

987 f"{sorted(set(raw_levels).difference(normalized_levels))}" 

988 ) 

989 return tuple(normalized_levels) 

990 

991 def _assert_and_set_isolation_level(self, dbapi_conn, level): 

992 level = level.replace("_", " ").upper() 

993 

994 _allowed_isolation_levels = self._gen_allowed_isolation_levels( 

995 dbapi_conn 

996 ) 

997 if ( 

998 _allowed_isolation_levels 

999 and level not in _allowed_isolation_levels 

1000 ): 

1001 raise exc.ArgumentError( 

1002 f"Invalid value {level!r} for isolation_level. " 

1003 f"Valid isolation levels for {self.name!r} are " 

1004 f"{', '.join(_allowed_isolation_levels)}" 

1005 ) 

1006 

1007 self.set_isolation_level(dbapi_conn, level) 

1008 

1009 def reset_isolation_level(self, dbapi_conn): 

1010 if self._on_connect_isolation_level is not None: 

1011 assert ( 

1012 self._on_connect_isolation_level == "AUTOCOMMIT" 

1013 or self._on_connect_isolation_level 

1014 == self.default_isolation_level 

1015 ) 

1016 self._assert_and_set_isolation_level( 

1017 dbapi_conn, self._on_connect_isolation_level 

1018 ) 

1019 else: 

1020 assert self.default_isolation_level is not None 

1021 self._assert_and_set_isolation_level( 

1022 dbapi_conn, 

1023 self.default_isolation_level, 

1024 ) 

1025 

1026 def normalize_name(self, name): 

1027 if name is None: 

1028 return None 

1029 

1030 name_lower = name.lower() 

1031 name_upper = name.upper() 

1032 

1033 if name_upper == name_lower: 

1034 # name has no upper/lower conversion, e.g. non-european characters. 

1035 # return unchanged 

1036 return name 

1037 elif name_upper == name and not ( 

1038 self.identifier_preparer._requires_quotes 

1039 )(name_lower): 

1040 # name is all uppercase and doesn't require quoting; normalize 

1041 # to all lower case 

1042 return name_lower 

1043 elif name_lower == name: 

1044 # name is all lower case, which if denormalized means we need to 

1045 # force quoting on it 

1046 return quoted_name(name, quote=True) 

1047 else: 

1048 # name is mixed case, means it will be quoted in SQL when used 

1049 # later, no normalizes 

1050 return name 

1051 

1052 def denormalize_name(self, name): 

1053 if name is None: 

1054 return None 

1055 

1056 name_lower = name.lower() 

1057 name_upper = name.upper() 

1058 

1059 if name_upper == name_lower: 

1060 # name has no upper/lower conversion, e.g. non-european characters. 

1061 # return unchanged 

1062 return name 

1063 elif name_lower == name and not ( 

1064 self.identifier_preparer._requires_quotes 

1065 )(name_lower): 

1066 name = name_upper 

1067 return name 

1068 

1069 def get_driver_connection(self, connection: DBAPIConnection) -> Any: 

1070 return connection 

1071 

1072 def _overrides_default(self, method): 

1073 return ( 

1074 getattr(type(self), method).__code__ 

1075 is not getattr(DefaultDialect, method).__code__ 

1076 ) 

1077 

1078 def _default_multi_reflect( 

1079 self, 

1080 single_tbl_method, 

1081 connection, 

1082 kind, 

1083 schema, 

1084 filter_names, 

1085 scope, 

1086 **kw, 

1087 ): 

1088 names_fns = [] 

1089 temp_names_fns = [] 

1090 if ObjectKind.TABLE in kind: 

1091 names_fns.append(self.get_table_names) 

1092 temp_names_fns.append(self.get_temp_table_names) 

1093 if ObjectKind.VIEW in kind: 

1094 names_fns.append(self.get_view_names) 

1095 temp_names_fns.append(self.get_temp_view_names) 

1096 if ObjectKind.MATERIALIZED_VIEW in kind: 

1097 names_fns.append(self.get_materialized_view_names) 

1098 # no temp materialized view at the moment 

1099 # temp_names_fns.append(self.get_temp_materialized_view_names) 

1100 

1101 unreflectable = kw.pop("unreflectable", {}) 

1102 

1103 if ( 

1104 filter_names 

1105 and scope is ObjectScope.ANY 

1106 and kind is ObjectKind.ANY 

1107 ): 

1108 # if names are given and no qualification on type of table 

1109 # (i.e. the Table(..., autoload) case), take the names as given, 

1110 # don't run names queries. If a table does not exit 

1111 # NoSuchTableError is raised and it's skipped 

1112 

1113 # this also suits the case for mssql where we can reflect 

1114 # individual temp tables but there's no temp_names_fn 

1115 names = filter_names 

1116 else: 

1117 names = [] 

1118 name_kw = {"schema": schema, **kw} 

1119 fns = [] 

1120 if ObjectScope.DEFAULT in scope: 

1121 fns.extend(names_fns) 

1122 if ObjectScope.TEMPORARY in scope: 

1123 fns.extend(temp_names_fns) 

1124 

1125 for fn in fns: 

1126 try: 

1127 names.extend(fn(connection, **name_kw)) 

1128 except NotImplementedError: 

1129 pass 

1130 

1131 if filter_names: 

1132 filter_names = set(filter_names) 

1133 

1134 # iterate over all the tables/views and call the single table method 

1135 for table in names: 

1136 if not filter_names or table in filter_names: 

1137 key = (schema, table) 

1138 try: 

1139 yield ( 

1140 key, 

1141 single_tbl_method( 

1142 connection, table, schema=schema, **kw 

1143 ), 

1144 ) 

1145 except exc.UnreflectableTableError as err: 

1146 if key not in unreflectable: 

1147 unreflectable[key] = err 

1148 except exc.NoSuchTableError: 

1149 pass 

1150 

1151 def get_multi_table_options(self, connection, **kw): 

1152 return self._default_multi_reflect( 

1153 self.get_table_options, connection, **kw 

1154 ) 

1155 

1156 def get_multi_columns(self, connection, **kw): 

1157 return self._default_multi_reflect(self.get_columns, connection, **kw) 

1158 

1159 def get_multi_pk_constraint(self, connection, **kw): 

1160 return self._default_multi_reflect( 

1161 self.get_pk_constraint, connection, **kw 

1162 ) 

1163 

1164 def get_multi_foreign_keys(self, connection, **kw): 

1165 return self._default_multi_reflect( 

1166 self.get_foreign_keys, connection, **kw 

1167 ) 

1168 

1169 def get_multi_indexes(self, connection, **kw): 

1170 return self._default_multi_reflect(self.get_indexes, connection, **kw) 

1171 

1172 def get_multi_unique_constraints(self, connection, **kw): 

1173 return self._default_multi_reflect( 

1174 self.get_unique_constraints, connection, **kw 

1175 ) 

1176 

1177 def get_multi_check_constraints(self, connection, **kw): 

1178 return self._default_multi_reflect( 

1179 self.get_check_constraints, connection, **kw 

1180 ) 

1181 

1182 def get_multi_table_comment(self, connection, **kw): 

1183 return self._default_multi_reflect( 

1184 self.get_table_comment, connection, **kw 

1185 ) 

1186 

1187 

1188class StrCompileDialect(DefaultDialect): 

1189 statement_compiler = compiler.StrSQLCompiler 

1190 ddl_compiler = compiler.DDLCompiler 

1191 type_compiler_cls = compiler.StrSQLTypeCompiler 

1192 preparer = compiler.IdentifierPreparer 

1193 

1194 insert_returning = True 

1195 update_returning = True 

1196 delete_returning = True 

1197 

1198 supports_statement_cache = True 

1199 

1200 supports_identity_columns = True 

1201 

1202 supports_sequences = True 

1203 sequences_optional = True 

1204 preexecute_autoincrement_sequences = False 

1205 

1206 supports_native_boolean = True 

1207 

1208 supports_multivalues_insert = True 

1209 supports_simple_order_by_label = True 

1210 

1211 

1212class DefaultExecutionContext(ExecutionContext): 

1213 isinsert = False 

1214 isupdate = False 

1215 isdelete = False 

1216 is_crud = False 

1217 is_text = False 

1218 isddl = False 

1219 

1220 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE 

1221 

1222 compiled: Optional[Compiled] = None 

1223 result_column_struct: Optional[ 

1224 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool] 

1225 ] = None 

1226 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None 

1227 

1228 execution_options: _ExecuteOptions = util.EMPTY_DICT 

1229 

1230 cursor_fetch_strategy = _cursor._DEFAULT_FETCH 

1231 

1232 invoked_statement: Optional[Executable] = None 

1233 

1234 _is_implicit_returning = False 

1235 _is_explicit_returning = False 

1236 _is_supplemental_returning = False 

1237 _is_server_side = False 

1238 

1239 _soft_closed = False 

1240 

1241 _rowcount: Optional[int] = None 

1242 

1243 # a hook for SQLite's translation of 

1244 # result column names 

1245 # NOTE: pyhive is using this hook, can't remove it :( 

1246 _translate_colname: Optional[ 

1247 Callable[[str], Tuple[str, Optional[str]]] 

1248 ] = None 

1249 

1250 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict() 

1251 """used by set_input_sizes(). 

1252 

1253 This collection comes from ``ExpandedState.parameter_expansion``. 

1254 

1255 """ 

1256 

1257 cache_hit = NO_CACHE_KEY 

1258 

1259 root_connection: Connection 

1260 _dbapi_connection: PoolProxiedConnection 

1261 dialect: Dialect 

1262 unicode_statement: str 

1263 cursor: DBAPICursor 

1264 compiled_parameters: List[_MutableCoreSingleExecuteParams] 

1265 parameters: _DBAPIMultiExecuteParams 

1266 extracted_parameters: Optional[Sequence[BindParameter[Any]]] 

1267 

1268 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) 

1269 

1270 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None 

1271 _num_sentinel_cols: int = 0 

1272 

1273 @classmethod 

1274 def _init_ddl( 

1275 cls, 

1276 dialect: Dialect, 

1277 connection: Connection, 

1278 dbapi_connection: PoolProxiedConnection, 

1279 execution_options: _ExecuteOptions, 

1280 compiled_ddl: DDLCompiler, 

1281 ) -> ExecutionContext: 

1282 """Initialize execution context for an ExecutableDDLElement 

1283 construct.""" 

1284 

1285 self = cls.__new__(cls) 

1286 self.root_connection = connection 

1287 self._dbapi_connection = dbapi_connection 

1288 self.dialect = connection.dialect 

1289 

1290 self.compiled = compiled = compiled_ddl 

1291 self.isddl = True 

1292 

1293 self.execution_options = execution_options 

1294 

1295 self.unicode_statement = str(compiled) 

1296 if compiled.schema_translate_map: 

1297 schema_translate_map = self.execution_options.get( 

1298 "schema_translate_map", {} 

1299 ) 

1300 

1301 rst = compiled.preparer._render_schema_translates 

1302 self.unicode_statement = rst( 

1303 self.unicode_statement, schema_translate_map 

1304 ) 

1305 

1306 self.statement = self.unicode_statement 

1307 

1308 self.cursor = self.create_cursor() 

1309 self.compiled_parameters = [] 

1310 

1311 if dialect.positional: 

1312 self.parameters = [dialect.execute_sequence_format()] 

1313 else: 

1314 self.parameters = [self._empty_dict_params] 

1315 

1316 return self 

1317 

1318 @classmethod 

1319 def _init_compiled( 

1320 cls, 

1321 dialect: Dialect, 

1322 connection: Connection, 

1323 dbapi_connection: PoolProxiedConnection, 

1324 execution_options: _ExecuteOptions, 

1325 compiled: SQLCompiler, 

1326 parameters: _CoreMultiExecuteParams, 

1327 invoked_statement: Executable, 

1328 extracted_parameters: Optional[Sequence[BindParameter[Any]]], 

1329 cache_hit: CacheStats = CacheStats.CACHING_DISABLED, 

1330 param_dict: _CoreSingleExecuteParams | None = None, 

1331 ) -> ExecutionContext: 

1332 """Initialize execution context for a Compiled construct.""" 

1333 

1334 self = cls.__new__(cls) 

1335 self.root_connection = connection 

1336 self._dbapi_connection = dbapi_connection 

1337 self.dialect = connection.dialect 

1338 self.extracted_parameters = extracted_parameters 

1339 self.invoked_statement = invoked_statement 

1340 self.compiled = compiled 

1341 self.cache_hit = cache_hit 

1342 

1343 self.execution_options = execution_options 

1344 

1345 self.result_column_struct = ( 

1346 compiled._result_columns, 

1347 compiled._ordered_columns, 

1348 compiled._textual_ordered_columns, 

1349 compiled._ad_hoc_textual, 

1350 compiled._loose_column_name_matching, 

1351 ) 

1352 

1353 self.isinsert = ii = compiled.isinsert 

1354 self.isupdate = iu = compiled.isupdate 

1355 self.isdelete = id_ = compiled.isdelete 

1356 self.is_text = compiled.isplaintext 

1357 

1358 if ii or iu or id_: 

1359 dml_statement = compiled.compile_state.statement # type: ignore 

1360 if TYPE_CHECKING: 

1361 assert isinstance(dml_statement, UpdateBase) 

1362 self.is_crud = True 

1363 self._is_explicit_returning = ier = bool(dml_statement._returning) 

1364 self._is_implicit_returning = iir = bool( 

1365 compiled.implicit_returning 

1366 ) 

1367 if iir and dml_statement._supplemental_returning: 

1368 self._is_supplemental_returning = True 

1369 

1370 # dont mix implicit and explicit returning 

1371 assert not (iir and ier) 

1372 

1373 if (ier or iir) and compiled.for_executemany: 

1374 if ii and not self.dialect.insert_executemany_returning: 

1375 raise exc.InvalidRequestError( 

1376 f"Dialect {self.dialect.dialect_description} with " 

1377 f"current server capabilities does not support " 

1378 "INSERT..RETURNING when executemany is used" 

1379 ) 

1380 elif ( 

1381 ii 

1382 and dml_statement._sort_by_parameter_order 

1383 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501 

1384 ): 

1385 raise exc.InvalidRequestError( 

1386 f"Dialect {self.dialect.dialect_description} with " 

1387 f"current server capabilities does not support " 

1388 "INSERT..RETURNING with deterministic row ordering " 

1389 "when executemany is used" 

1390 ) 

1391 elif ( 

1392 ii 

1393 and self.dialect.use_insertmanyvalues 

1394 and not compiled._insertmanyvalues 

1395 ): 

1396 raise exc.InvalidRequestError( 

1397 'Statement does not have "insertmanyvalues" ' 

1398 "enabled, can't use INSERT..RETURNING with " 

1399 "executemany in this case." 

1400 ) 

1401 elif iu and not self.dialect.update_executemany_returning: 

1402 raise exc.InvalidRequestError( 

1403 f"Dialect {self.dialect.dialect_description} with " 

1404 f"current server capabilities does not support " 

1405 "UPDATE..RETURNING when executemany is used" 

1406 ) 

1407 elif id_ and not self.dialect.delete_executemany_returning: 

1408 raise exc.InvalidRequestError( 

1409 f"Dialect {self.dialect.dialect_description} with " 

1410 f"current server capabilities does not support " 

1411 "DELETE..RETURNING when executemany is used" 

1412 ) 

1413 

1414 if not parameters: 

1415 self.compiled_parameters = [ 

1416 compiled.construct_params( 

1417 extracted_parameters=extracted_parameters, 

1418 escape_names=False, 

1419 _collected_params=param_dict, 

1420 ) 

1421 ] 

1422 else: 

1423 self.compiled_parameters = [ 

1424 compiled.construct_params( 

1425 m, 

1426 escape_names=False, 

1427 _group_number=grp, 

1428 extracted_parameters=extracted_parameters, 

1429 _collected_params=param_dict, 

1430 ) 

1431 for grp, m in enumerate(parameters) 

1432 ] 

1433 

1434 if len(parameters) > 1: 

1435 if self.isinsert and compiled._insertmanyvalues: 

1436 self.execute_style = ExecuteStyle.INSERTMANYVALUES 

1437 

1438 imv = compiled._insertmanyvalues 

1439 if imv.sentinel_columns is not None: 

1440 self._num_sentinel_cols = imv.num_sentinel_columns 

1441 else: 

1442 self.execute_style = ExecuteStyle.EXECUTEMANY 

1443 

1444 self.unicode_statement = compiled.string 

1445 

1446 self.cursor = self.create_cursor() 

1447 

1448 if self.compiled.insert_prefetch or self.compiled.update_prefetch: 

1449 self._process_execute_defaults() 

1450 

1451 processors = compiled._bind_processors 

1452 

1453 flattened_processors: Mapping[ 

1454 str, _BindProcessorType[Any] 

1455 ] = processors # type: ignore[assignment] 

1456 

1457 if compiled.literal_execute_params or compiled.post_compile_params: 

1458 if self.executemany: 

1459 raise exc.InvalidRequestError( 

1460 "'literal_execute' or 'expanding' parameters can't be " 

1461 "used with executemany()" 

1462 ) 

1463 

1464 expanded_state = compiled._process_parameters_for_postcompile( 

1465 self.compiled_parameters[0] 

1466 ) 

1467 

1468 # re-assign self.unicode_statement 

1469 self.unicode_statement = expanded_state.statement 

1470 

1471 self._expanded_parameters = expanded_state.parameter_expansion 

1472 

1473 flattened_processors = dict(processors) # type: ignore 

1474 flattened_processors.update(expanded_state.processors) 

1475 positiontup = expanded_state.positiontup 

1476 elif compiled.positional: 

1477 positiontup = self.compiled.positiontup 

1478 else: 

1479 positiontup = None 

1480 

1481 if compiled.schema_translate_map: 

1482 schema_translate_map = self.execution_options.get( 

1483 "schema_translate_map", {} 

1484 ) 

1485 rst = compiled.preparer._render_schema_translates 

1486 self.unicode_statement = rst( 

1487 self.unicode_statement, schema_translate_map 

1488 ) 

1489 

1490 # final self.unicode_statement is now assigned, encode if needed 

1491 # by dialect 

1492 self.statement = self.unicode_statement 

1493 

1494 # Convert the dictionary of bind parameter values 

1495 # into a dict or list to be sent to the DBAPI's 

1496 # execute() or executemany() method. 

1497 

1498 if compiled.positional: 

1499 core_positional_parameters: MutableSequence[Sequence[Any]] = [] 

1500 assert positiontup is not None 

1501 for compiled_params in self.compiled_parameters: 

1502 l_param: List[Any] = [ 

1503 ( 

1504 flattened_processors[key](compiled_params[key]) 

1505 if key in flattened_processors 

1506 else compiled_params[key] 

1507 ) 

1508 for key in positiontup 

1509 ] 

1510 core_positional_parameters.append( 

1511 dialect.execute_sequence_format(l_param) 

1512 ) 

1513 

1514 self.parameters = core_positional_parameters 

1515 else: 

1516 core_dict_parameters: MutableSequence[Dict[str, Any]] = [] 

1517 escaped_names = compiled.escaped_bind_names 

1518 

1519 # note that currently, "expanded" parameters will be present 

1520 # in self.compiled_parameters in their quoted form. This is 

1521 # slightly inconsistent with the approach taken as of 

1522 # #8056 where self.compiled_parameters is meant to contain unquoted 

1523 # param names. 

1524 d_param: Dict[str, Any] 

1525 for compiled_params in self.compiled_parameters: 

1526 if escaped_names: 

1527 d_param = { 

1528 escaped_names.get(key, key): ( 

1529 flattened_processors[key](compiled_params[key]) 

1530 if key in flattened_processors 

1531 else compiled_params[key] 

1532 ) 

1533 for key in compiled_params 

1534 } 

1535 else: 

1536 d_param = { 

1537 key: ( 

1538 flattened_processors[key](compiled_params[key]) 

1539 if key in flattened_processors 

1540 else compiled_params[key] 

1541 ) 

1542 for key in compiled_params 

1543 } 

1544 

1545 core_dict_parameters.append(d_param) 

1546 

1547 self.parameters = core_dict_parameters 

1548 

1549 return self 

1550 

1551 @classmethod 

1552 def _init_statement( 

1553 cls, 

1554 dialect: Dialect, 

1555 connection: Connection, 

1556 dbapi_connection: PoolProxiedConnection, 

1557 execution_options: _ExecuteOptions, 

1558 statement: str, 

1559 parameters: _DBAPIMultiExecuteParams, 

1560 ) -> ExecutionContext: 

1561 """Initialize execution context for a string SQL statement.""" 

1562 

1563 self = cls.__new__(cls) 

1564 self.root_connection = connection 

1565 self._dbapi_connection = dbapi_connection 

1566 self.dialect = connection.dialect 

1567 self.is_text = True 

1568 

1569 self.execution_options = execution_options 

1570 

1571 if not parameters: 

1572 if self.dialect.positional: 

1573 self.parameters = [dialect.execute_sequence_format()] 

1574 else: 

1575 self.parameters = [self._empty_dict_params] 

1576 elif isinstance(parameters[0], dialect.execute_sequence_format): 

1577 self.parameters = parameters 

1578 elif isinstance(parameters[0], dict): 

1579 self.parameters = parameters 

1580 else: 

1581 self.parameters = [ 

1582 dialect.execute_sequence_format(p) for p in parameters 

1583 ] 

1584 

1585 if len(parameters) > 1: 

1586 self.execute_style = ExecuteStyle.EXECUTEMANY 

1587 

1588 self.statement = self.unicode_statement = statement 

1589 

1590 self.cursor = self.create_cursor() 

1591 return self 

1592 

1593 @classmethod 

1594 def _init_default( 

1595 cls, 

1596 dialect: Dialect, 

1597 connection: Connection, 

1598 dbapi_connection: PoolProxiedConnection, 

1599 execution_options: _ExecuteOptions, 

1600 ) -> ExecutionContext: 

1601 """Initialize execution context for a ColumnDefault construct.""" 

1602 

1603 self = cls.__new__(cls) 

1604 self.root_connection = connection 

1605 self._dbapi_connection = dbapi_connection 

1606 self.dialect = connection.dialect 

1607 

1608 self.execution_options = execution_options 

1609 

1610 self.cursor = self.create_cursor() 

1611 return self 

1612 

1613 def _get_cache_stats(self) -> str: 

1614 if self.compiled is None: 

1615 return "raw sql" 

1616 

1617 now = perf_counter() 

1618 

1619 ch = self.cache_hit 

1620 

1621 gen_time = self.compiled._gen_time 

1622 assert gen_time is not None 

1623 

1624 if ch is NO_CACHE_KEY: 

1625 return "no key %.5fs" % (now - gen_time,) 

1626 elif ch is CACHE_HIT: 

1627 return "cached since %.4gs ago" % (now - gen_time,) 

1628 elif ch is CACHE_MISS: 

1629 return "generated in %.5fs" % (now - gen_time,) 

1630 elif ch is CACHING_DISABLED: 

1631 if "_cache_disable_reason" in self.execution_options: 

1632 return "caching disabled (%s) %.5fs " % ( 

1633 self.execution_options["_cache_disable_reason"], 

1634 now - gen_time, 

1635 ) 

1636 else: 

1637 return "caching disabled %.5fs" % (now - gen_time,) 

1638 elif ch is NO_DIALECT_SUPPORT: 

1639 return "dialect %s+%s does not support caching %.5fs" % ( 

1640 self.dialect.name, 

1641 self.dialect.driver, 

1642 now - gen_time, 

1643 ) 

1644 else: 

1645 return "unknown" 

1646 

1647 @property 

1648 def executemany(self): # type: ignore[override] 

1649 return self.execute_style in ( 

1650 ExecuteStyle.EXECUTEMANY, 

1651 ExecuteStyle.INSERTMANYVALUES, 

1652 ) 

1653 

1654 @util.memoized_property 

1655 def identifier_preparer(self): 

1656 if self.compiled: 

1657 return self.compiled.preparer 

1658 elif "schema_translate_map" in self.execution_options: 

1659 return self.dialect.identifier_preparer._with_schema_translate( 

1660 self.execution_options["schema_translate_map"] 

1661 ) 

1662 else: 

1663 return self.dialect.identifier_preparer 

1664 

1665 @util.memoized_property 

1666 def engine(self): 

1667 return self.root_connection.engine 

1668 

1669 @util.memoized_property 

1670 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1671 if TYPE_CHECKING: 

1672 assert isinstance(self.compiled, SQLCompiler) 

1673 return self.compiled.postfetch 

1674 

1675 @util.memoized_property 

1676 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1677 if TYPE_CHECKING: 

1678 assert isinstance(self.compiled, SQLCompiler) 

1679 if self.isinsert: 

1680 return self.compiled.insert_prefetch 

1681 elif self.isupdate: 

1682 return self.compiled.update_prefetch 

1683 else: 

1684 return () 

1685 

1686 @util.memoized_property 

1687 def no_parameters(self): 

1688 return self.execution_options.get("no_parameters", False) 

1689 

1690 def _execute_scalar( 

1691 self, 

1692 stmt: str, 

1693 type_: Optional[TypeEngine[Any]], 

1694 parameters: Optional[_DBAPISingleExecuteParams] = None, 

1695 ) -> Any: 

1696 """Execute a string statement on the current cursor, returning a 

1697 scalar result. 

1698 

1699 Used to fire off sequences, default phrases, and "select lastrowid" 

1700 types of statements individually or in the context of a parent INSERT 

1701 or UPDATE statement. 

1702 

1703 """ 

1704 

1705 conn = self.root_connection 

1706 

1707 if "schema_translate_map" in self.execution_options: 

1708 schema_translate_map = self.execution_options.get( 

1709 "schema_translate_map", {} 

1710 ) 

1711 

1712 rst = self.identifier_preparer._render_schema_translates 

1713 stmt = rst(stmt, schema_translate_map) 

1714 

1715 if not parameters: 

1716 if self.dialect.positional: 

1717 parameters = self.dialect.execute_sequence_format() 

1718 else: 

1719 parameters = {} 

1720 

1721 conn._cursor_execute(self.cursor, stmt, parameters, context=self) 

1722 row = self.cursor.fetchone() 

1723 if row is not None: 

1724 r = row[0] 

1725 else: 

1726 r = None 

1727 if type_ is not None: 

1728 # apply type post processors to the result 

1729 proc = type_._cached_result_processor( 

1730 self.dialect, self.cursor.description[0][1] 

1731 ) 

1732 if proc: 

1733 return proc(r) 

1734 return r 

1735 

1736 @util.memoized_property 

1737 def connection(self): 

1738 return self.root_connection 

1739 

1740 def _use_server_side_cursor(self): 

1741 if not self.dialect.supports_server_side_cursors: 

1742 return False 

1743 

1744 if self.dialect.server_side_cursors: 

1745 # this is deprecated 

1746 use_server_side = self.execution_options.get( 

1747 "stream_results", True 

1748 ) and ( 

1749 self.compiled 

1750 and isinstance(self.compiled.statement, expression.Selectable) 

1751 or ( 

1752 ( 

1753 not self.compiled 

1754 or isinstance( 

1755 self.compiled.statement, expression.TextClause 

1756 ) 

1757 ) 

1758 and self.unicode_statement 

1759 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement) 

1760 ) 

1761 ) 

1762 else: 

1763 use_server_side = self.execution_options.get( 

1764 "stream_results", False 

1765 ) 

1766 

1767 return use_server_side 

1768 

1769 def create_cursor(self) -> DBAPICursor: 

1770 if ( 

1771 # inlining initial preference checks for SS cursors 

1772 self.dialect.supports_server_side_cursors 

1773 and ( 

1774 self.execution_options.get("stream_results", False) 

1775 or ( 

1776 self.dialect.server_side_cursors 

1777 and self._use_server_side_cursor() 

1778 ) 

1779 ) 

1780 ): 

1781 self._is_server_side = True 

1782 return self.create_server_side_cursor() 

1783 else: 

1784 self._is_server_side = False 

1785 return self.create_default_cursor() 

1786 

1787 def fetchall_for_returning(self, cursor): 

1788 return cursor.fetchall() 

1789 

1790 def create_default_cursor(self) -> DBAPICursor: 

1791 return self._dbapi_connection.cursor() 

1792 

1793 def create_server_side_cursor(self) -> DBAPICursor: 

1794 raise NotImplementedError() 

1795 

1796 def pre_exec(self): 

1797 pass 

1798 

1799 def get_out_parameter_values(self, names): 

1800 raise NotImplementedError( 

1801 "This dialect does not support OUT parameters" 

1802 ) 

1803 

1804 def post_exec(self): 

1805 pass 

1806 

1807 def get_result_processor( 

1808 self, type_: TypeEngine[Any], colname: str, coltype: DBAPIType 

1809 ) -> Optional[_ResultProcessorType[Any]]: 

1810 """Return a 'result processor' for a given type as present in 

1811 cursor.description. 

1812 

1813 This has a default implementation that dialects can override 

1814 for context-sensitive result type handling. 

1815 

1816 """ 

1817 return type_._cached_result_processor(self.dialect, coltype) 

1818 

1819 def get_lastrowid(self) -> int: 

1820 """return self.cursor.lastrowid, or equivalent, after an INSERT. 

1821 

1822 This may involve calling special cursor functions, issuing a new SELECT 

1823 on the cursor (or a new one), or returning a stored value that was 

1824 calculated within post_exec(). 

1825 

1826 This function will only be called for dialects which support "implicit" 

1827 primary key generation, keep preexecute_autoincrement_sequences set to 

1828 False, and when no explicit id value was bound to the statement. 

1829 

1830 The function is called once for an INSERT statement that would need to 

1831 return the last inserted primary key for those dialects that make use 

1832 of the lastrowid concept. In these cases, it is called directly after 

1833 :meth:`.ExecutionContext.post_exec`. 

1834 

1835 """ 

1836 return self.cursor.lastrowid 

1837 

1838 def handle_dbapi_exception(self, e): 

1839 pass 

1840 

1841 @util.non_memoized_property 

1842 def rowcount(self) -> int: 

1843 if self._rowcount is not None: 

1844 return self._rowcount 

1845 else: 

1846 return self.cursor.rowcount 

1847 

1848 @property 

1849 def _has_rowcount(self): 

1850 return self._rowcount is not None 

1851 

1852 def supports_sane_rowcount(self): 

1853 return self.dialect.supports_sane_rowcount 

1854 

1855 def supports_sane_multi_rowcount(self): 

1856 return self.dialect.supports_sane_multi_rowcount 

1857 

1858 def _setup_result_proxy(self): 

1859 exec_opt = self.execution_options 

1860 

1861 if self._rowcount is None and exec_opt.get("preserve_rowcount", False): 

1862 self._rowcount = self.cursor.rowcount 

1863 

1864 yp: Optional[Union[int, bool]] 

1865 if self.is_crud or self.is_text: 

1866 result = self._setup_dml_or_text_result() 

1867 yp = False 

1868 else: 

1869 yp = exec_opt.get("yield_per", None) 

1870 sr = self._is_server_side or exec_opt.get("stream_results", False) 

1871 strategy = self.cursor_fetch_strategy 

1872 if sr and strategy is _cursor._DEFAULT_FETCH: 

1873 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1874 self.cursor, self.execution_options 

1875 ) 

1876 cursor_description: _DBAPICursorDescription = ( 

1877 strategy.alternate_cursor_description 

1878 or self.cursor.description 

1879 ) 

1880 if cursor_description is None: 

1881 strategy = _cursor._NO_CURSOR_DQL 

1882 

1883 result = _cursor.CursorResult(self, strategy, cursor_description) 

1884 

1885 compiled = self.compiled 

1886 

1887 if ( 

1888 compiled 

1889 and not self.isddl 

1890 and cast(SQLCompiler, compiled).has_out_parameters 

1891 ): 

1892 self._setup_out_parameters(result) 

1893 

1894 self._soft_closed = result._soft_closed 

1895 

1896 if yp: 

1897 result = result.yield_per(yp) 

1898 

1899 return result 

1900 

1901 def _setup_out_parameters(self, result): 

1902 compiled = cast(SQLCompiler, self.compiled) 

1903 

1904 out_bindparams = [ 

1905 (param, name) 

1906 for param, name in compiled.bind_names.items() 

1907 if param.isoutparam 

1908 ] 

1909 out_parameters = {} 

1910 

1911 for bindparam, raw_value in zip( 

1912 [param for param, name in out_bindparams], 

1913 self.get_out_parameter_values( 

1914 [name for param, name in out_bindparams] 

1915 ), 

1916 ): 

1917 type_ = bindparam.type 

1918 impl_type = type_.dialect_impl(self.dialect) 

1919 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi) 

1920 result_processor = impl_type.result_processor( 

1921 self.dialect, dbapi_type 

1922 ) 

1923 if result_processor is not None: 

1924 raw_value = result_processor(raw_value) 

1925 out_parameters[bindparam.key] = raw_value 

1926 

1927 result.out_parameters = out_parameters 

1928 

1929 def _setup_dml_or_text_result(self): 

1930 compiled = cast(SQLCompiler, self.compiled) 

1931 

1932 strategy: ResultFetchStrategy = self.cursor_fetch_strategy 

1933 

1934 if self.isinsert: 

1935 if ( 

1936 self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1937 and compiled.effective_returning 

1938 ): 

1939 strategy = _cursor.FullyBufferedCursorFetchStrategy( 

1940 self.cursor, 

1941 initial_buffer=self._insertmanyvalues_rows, 

1942 # maintain alt cursor description if set by the 

1943 # dialect, e.g. mssql preserves it 

1944 alternate_description=( 

1945 strategy.alternate_cursor_description 

1946 ), 

1947 ) 

1948 

1949 if compiled.postfetch_lastrowid: 

1950 self.inserted_primary_key_rows = ( 

1951 self._setup_ins_pk_from_lastrowid() 

1952 ) 

1953 # else if not self._is_implicit_returning, 

1954 # the default inserted_primary_key_rows accessor will 

1955 # return an "empty" primary key collection when accessed. 

1956 

1957 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: 

1958 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1959 self.cursor, self.execution_options 

1960 ) 

1961 

1962 if strategy is _cursor._NO_CURSOR_DML: 

1963 cursor_description = None 

1964 else: 

1965 cursor_description = ( 

1966 strategy.alternate_cursor_description 

1967 or self.cursor.description 

1968 ) 

1969 

1970 if cursor_description is None: 

1971 strategy = _cursor._NO_CURSOR_DML 

1972 elif self._num_sentinel_cols: 

1973 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1974 # the sentinel columns are handled in CursorResult._init_metadata 

1975 # using essentially _reduce 

1976 

1977 result: _cursor.CursorResult[Any] = _cursor.CursorResult( 

1978 self, strategy, cursor_description 

1979 ) 

1980 

1981 if self.isinsert: 

1982 if self._is_implicit_returning: 

1983 rows = result.all() 

1984 

1985 self.returned_default_rows = rows 

1986 

1987 self.inserted_primary_key_rows = ( 

1988 self._setup_ins_pk_from_implicit_returning(result, rows) 

1989 ) 

1990 

1991 # test that it has a cursor metadata that is accurate. the 

1992 # first row will have been fetched and current assumptions 

1993 # are that the result has only one row, until executemany() 

1994 # support is added here. 

1995 assert result._metadata.returns_rows 

1996 

1997 # Insert statement has both return_defaults() and 

1998 # returning(). rewind the result on the list of rows 

1999 # we just used. 

2000 if self._is_supplemental_returning: 

2001 result._rewind(rows) 

2002 else: 

2003 result._soft_close() 

2004 elif not self._is_explicit_returning: 

2005 result._soft_close() 

2006 

2007 # we assume here the result does not return any rows. 

2008 # *usually*, this will be true. However, some dialects 

2009 # such as that of MSSQL/pyodbc need to SELECT a post fetch 

2010 # function so this is not necessarily true. 

2011 # assert not result.returns_rows 

2012 

2013 elif self._is_implicit_returning: 

2014 rows = result.all() 

2015 

2016 if rows: 

2017 self.returned_default_rows = rows 

2018 self._rowcount = len(rows) 

2019 

2020 if self._is_supplemental_returning: 

2021 result._rewind(rows) 

2022 else: 

2023 result._soft_close() 

2024 

2025 # test that it has a cursor metadata that is accurate. 

2026 # the rows have all been fetched however. 

2027 assert result._metadata.returns_rows 

2028 

2029 elif not result._metadata.returns_rows: 

2030 # no results, get rowcount 

2031 # (which requires open cursor on some drivers) 

2032 if self._rowcount is None: 

2033 self._rowcount = self.cursor.rowcount 

2034 result._soft_close() 

2035 elif self.isupdate or self.isdelete: 

2036 if self._rowcount is None: 

2037 self._rowcount = self.cursor.rowcount 

2038 return result 

2039 

2040 @util.memoized_property 

2041 def inserted_primary_key_rows(self): 

2042 # if no specific "get primary key" strategy was set up 

2043 # during execution, return a "default" primary key based 

2044 # on what's in the compiled_parameters and nothing else. 

2045 return self._setup_ins_pk_from_empty() 

2046 

2047 def _setup_ins_pk_from_lastrowid(self): 

2048 getter = cast( 

2049 SQLCompiler, self.compiled 

2050 )._inserted_primary_key_from_lastrowid_getter 

2051 lastrowid = self.get_lastrowid() 

2052 return [getter(lastrowid, self.compiled_parameters[0])] 

2053 

2054 def _setup_ins_pk_from_empty(self): 

2055 getter = cast( 

2056 SQLCompiler, self.compiled 

2057 )._inserted_primary_key_from_lastrowid_getter 

2058 return [getter(None, param) for param in self.compiled_parameters] 

2059 

2060 def _setup_ins_pk_from_implicit_returning(self, result, rows): 

2061 if not rows: 

2062 return [] 

2063 

2064 getter = cast( 

2065 SQLCompiler, self.compiled 

2066 )._inserted_primary_key_from_returning_getter 

2067 compiled_params = self.compiled_parameters 

2068 

2069 return [ 

2070 getter(row, param) for row, param in zip(rows, compiled_params) 

2071 ] 

2072 

2073 def lastrow_has_defaults(self) -> bool: 

2074 return (self.isinsert or self.isupdate) and bool( 

2075 cast(SQLCompiler, self.compiled).postfetch 

2076 ) 

2077 

2078 def _prepare_set_input_sizes( 

2079 self, 

2080 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]: 

2081 """Given a cursor and ClauseParameters, prepare arguments 

2082 in order to call the appropriate 

2083 style of ``setinputsizes()`` on the cursor, using DB-API types 

2084 from the bind parameter's ``TypeEngine`` objects. 

2085 

2086 This method only called by those dialects which set the 

2087 :attr:`.Dialect.bind_typing` attribute to 

2088 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are 

2089 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an 

2090 option. 

2091 

2092 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used 

2093 for pg8000 and asyncpg, which has been changed to inline rendering 

2094 of casts. 

2095 

2096 """ 

2097 if self.isddl or self.is_text: 

2098 return None 

2099 

2100 compiled = cast(SQLCompiler, self.compiled) 

2101 

2102 inputsizes = compiled._get_set_input_sizes_lookup() 

2103 

2104 if inputsizes is None: 

2105 return None 

2106 

2107 dialect = self.dialect 

2108 

2109 # all of the rest of this... cython? 

2110 

2111 if dialect._has_events: 

2112 inputsizes = dict(inputsizes) 

2113 dialect.dispatch.do_setinputsizes( 

2114 inputsizes, self.cursor, self.statement, self.parameters, self 

2115 ) 

2116 

2117 if compiled.escaped_bind_names: 

2118 escaped_bind_names = compiled.escaped_bind_names 

2119 else: 

2120 escaped_bind_names = None 

2121 

2122 if dialect.positional: 

2123 items = [ 

2124 (key, compiled.binds[key]) 

2125 for key in compiled.positiontup or () 

2126 ] 

2127 else: 

2128 items = [ 

2129 (key, bindparam) 

2130 for bindparam, key in compiled.bind_names.items() 

2131 ] 

2132 

2133 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = [] 

2134 for key, bindparam in items: 

2135 if bindparam in compiled.literal_execute_params: 

2136 continue 

2137 

2138 if key in self._expanded_parameters: 

2139 if is_tuple_type(bindparam.type): 

2140 num = len(bindparam.type.types) 

2141 dbtypes = inputsizes[bindparam] 

2142 generic_inputsizes.extend( 

2143 ( 

2144 ( 

2145 escaped_bind_names.get(paramname, paramname) 

2146 if escaped_bind_names is not None 

2147 else paramname 

2148 ), 

2149 dbtypes[idx % num], 

2150 bindparam.type.types[idx % num], 

2151 ) 

2152 for idx, paramname in enumerate( 

2153 self._expanded_parameters[key] 

2154 ) 

2155 ) 

2156 else: 

2157 dbtype = inputsizes.get(bindparam, None) 

2158 generic_inputsizes.extend( 

2159 ( 

2160 ( 

2161 escaped_bind_names.get(paramname, paramname) 

2162 if escaped_bind_names is not None 

2163 else paramname 

2164 ), 

2165 dbtype, 

2166 bindparam.type, 

2167 ) 

2168 for paramname in self._expanded_parameters[key] 

2169 ) 

2170 else: 

2171 dbtype = inputsizes.get(bindparam, None) 

2172 

2173 escaped_name = ( 

2174 escaped_bind_names.get(key, key) 

2175 if escaped_bind_names is not None 

2176 else key 

2177 ) 

2178 

2179 generic_inputsizes.append( 

2180 (escaped_name, dbtype, bindparam.type) 

2181 ) 

2182 

2183 return generic_inputsizes 

2184 

2185 def _exec_default(self, column, default, type_): 

2186 if default.is_sequence: 

2187 return self.fire_sequence(default, type_) 

2188 elif default.is_callable: 

2189 # this codepath is not normally used as it's inlined 

2190 # into _process_execute_defaults 

2191 self.current_column = column 

2192 return default.arg(self) 

2193 elif default.is_clause_element: 

2194 return self._exec_default_clause_element(column, default, type_) 

2195 else: 

2196 # this codepath is not normally used as it's inlined 

2197 # into _process_execute_defaults 

2198 return default.arg 

2199 

2200 def _exec_default_clause_element(self, column, default, type_): 

2201 # execute a default that's a complete clause element. Here, we have 

2202 # to re-implement a miniature version of the compile->parameters-> 

2203 # cursor.execute() sequence, since we don't want to modify the state 

2204 # of the connection / result in progress or create new connection/ 

2205 # result objects etc. 

2206 # .. versionchanged:: 1.4 

2207 

2208 if not default._arg_is_typed: 

2209 default_arg = expression.type_coerce(default.arg, type_) 

2210 else: 

2211 default_arg = default.arg 

2212 compiled = expression.select(default_arg).compile(dialect=self.dialect) 

2213 compiled_params = compiled.construct_params() 

2214 processors = compiled._bind_processors 

2215 if compiled.positional: 

2216 parameters = self.dialect.execute_sequence_format( 

2217 [ 

2218 ( 

2219 processors[key](compiled_params[key]) # type: ignore 

2220 if key in processors 

2221 else compiled_params[key] 

2222 ) 

2223 for key in compiled.positiontup or () 

2224 ] 

2225 ) 

2226 else: 

2227 parameters = { 

2228 key: ( 

2229 processors[key](compiled_params[key]) # type: ignore 

2230 if key in processors 

2231 else compiled_params[key] 

2232 ) 

2233 for key in compiled_params 

2234 } 

2235 return self._execute_scalar( 

2236 str(compiled), type_, parameters=parameters 

2237 ) 

2238 

2239 current_parameters: Optional[_CoreSingleExecuteParams] = None 

2240 """A dictionary of parameters applied to the current row. 

2241 

2242 This attribute is only available in the context of a user-defined default 

2243 generation function, e.g. as described at :ref:`context_default_functions`. 

2244 It consists of a dictionary which includes entries for each column/value 

2245 pair that is to be part of the INSERT or UPDATE statement. The keys of the 

2246 dictionary will be the key value of each :class:`_schema.Column`, 

2247 which is usually 

2248 synonymous with the name. 

2249 

2250 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute 

2251 does not accommodate for the "multi-values" feature of the 

2252 :meth:`_expression.Insert.values` method. The 

2253 :meth:`.DefaultExecutionContext.get_current_parameters` method should be 

2254 preferred. 

2255 

2256 .. seealso:: 

2257 

2258 :meth:`.DefaultExecutionContext.get_current_parameters` 

2259 

2260 :ref:`context_default_functions` 

2261 

2262 """ 

2263 

2264 def get_current_parameters(self, isolate_multiinsert_groups=True): 

2265 """Return a dictionary of parameters applied to the current row. 

2266 

2267 This method can only be used in the context of a user-defined default 

2268 generation function, e.g. as described at 

2269 :ref:`context_default_functions`. When invoked, a dictionary is 

2270 returned which includes entries for each column/value pair that is part 

2271 of the INSERT or UPDATE statement. The keys of the dictionary will be 

2272 the key value of each :class:`_schema.Column`, 

2273 which is usually synonymous 

2274 with the name. 

2275 

2276 :param isolate_multiinsert_groups=True: indicates that multi-valued 

2277 INSERT constructs created using :meth:`_expression.Insert.values` 

2278 should be 

2279 handled by returning only the subset of parameters that are local 

2280 to the current column default invocation. When ``False``, the 

2281 raw parameters of the statement are returned including the 

2282 naming convention used in the case of multi-valued INSERT. 

2283 

2284 .. seealso:: 

2285 

2286 :attr:`.DefaultExecutionContext.current_parameters` 

2287 

2288 :ref:`context_default_functions` 

2289 

2290 """ 

2291 try: 

2292 parameters = self.current_parameters 

2293 column = self.current_column 

2294 except AttributeError: 

2295 raise exc.InvalidRequestError( 

2296 "get_current_parameters() can only be invoked in the " 

2297 "context of a Python side column default function" 

2298 ) 

2299 else: 

2300 assert column is not None 

2301 assert parameters is not None 

2302 compile_state = cast( 

2303 "DMLState", cast(SQLCompiler, self.compiled).compile_state 

2304 ) 

2305 assert compile_state is not None 

2306 if ( 

2307 isolate_multiinsert_groups 

2308 and dml.isinsert(compile_state) 

2309 and compile_state._has_multi_parameters 

2310 ): 

2311 if column._is_multiparam_column: 

2312 index = column.index + 1 

2313 d = {column.original.key: parameters[column.key]} 

2314 else: 

2315 d = {column.key: parameters[column.key]} 

2316 index = 0 

2317 assert compile_state._dict_parameters is not None 

2318 keys = compile_state._dict_parameters.keys() 

2319 d.update( 

2320 (key, parameters["%s_m%d" % (key, index)]) for key in keys 

2321 ) 

2322 return d 

2323 else: 

2324 return parameters 

2325 

2326 def get_insert_default(self, column): 

2327 if column.default is None: 

2328 return None 

2329 else: 

2330 return self._exec_default(column, column.default, column.type) 

2331 

2332 def get_update_default(self, column): 

2333 if column.onupdate is None: 

2334 return None 

2335 else: 

2336 return self._exec_default(column, column.onupdate, column.type) 

2337 

2338 def _process_execute_defaults(self): 

2339 compiled = cast(SQLCompiler, self.compiled) 

2340 

2341 key_getter = compiled._within_exec_param_key_getter 

2342 

2343 sentinel_counter = 0 

2344 

2345 if compiled.insert_prefetch: 

2346 prefetch_recs = [ 

2347 ( 

2348 c, 

2349 key_getter(c), 

2350 c._default_description_tuple, 

2351 self.get_insert_default, 

2352 ) 

2353 for c in compiled.insert_prefetch 

2354 ] 

2355 elif compiled.update_prefetch: 

2356 prefetch_recs = [ 

2357 ( 

2358 c, 

2359 key_getter(c), 

2360 c._onupdate_description_tuple, 

2361 self.get_update_default, 

2362 ) 

2363 for c in compiled.update_prefetch 

2364 ] 

2365 else: 

2366 prefetch_recs = [] 

2367 

2368 for param in self.compiled_parameters: 

2369 self.current_parameters = param 

2370 

2371 for ( 

2372 c, 

2373 param_key, 

2374 (arg, is_scalar, is_callable, is_sentinel), 

2375 fallback, 

2376 ) in prefetch_recs: 

2377 if is_sentinel: 

2378 param[param_key] = sentinel_counter 

2379 sentinel_counter += 1 

2380 elif is_scalar: 

2381 param[param_key] = arg 

2382 elif is_callable: 

2383 self.current_column = c 

2384 param[param_key] = arg(self) 

2385 else: 

2386 val = fallback(c) 

2387 if val is not None: 

2388 param[param_key] = val 

2389 

2390 del self.current_parameters 

2391 

2392 

2393DefaultDialect.execution_ctx_cls = DefaultExecutionContext