Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1054 statements  

1# engine/default.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Default implementations of per-dialect sqlalchemy.engine classes. 

10 

11These are semi-private implementation classes which are only of importance 

12to database dialect authors; dialects will usually use the classes here 

13as the base class for their own corresponding classes. 

14 

15""" 

16 

17from __future__ import annotations 

18 

19import functools 

20import operator 

21import random 

22import re 

23from time import perf_counter 

24import typing 

25from typing import Any 

26from typing import Callable 

27from typing import cast 

28from typing import Dict 

29from typing import Final 

30from typing import List 

31from typing import Literal 

32from typing import Mapping 

33from typing import MutableMapping 

34from typing import MutableSequence 

35from typing import Optional 

36from typing import Sequence 

37from typing import Set 

38from typing import Tuple 

39from typing import Type 

40from typing import TYPE_CHECKING 

41from typing import Union 

42import weakref 

43 

44from . import characteristics 

45from . import cursor as _cursor 

46from . import interfaces 

47from .base import Connection 

48from .interfaces import CacheStats 

49from .interfaces import DBAPICursor 

50from .interfaces import Dialect 

51from .interfaces import ExecuteStyle 

52from .interfaces import ExecutionContext 

53from .reflection import ObjectKind 

54from .reflection import ObjectScope 

55from .. import event 

56from .. import exc 

57from .. import pool 

58from .. import util 

59from ..sql import compiler 

60from ..sql import dml 

61from ..sql import expression 

62from ..sql import type_api 

63from ..sql import util as sql_util 

64from ..sql._typing import is_tuple_type 

65from ..sql.base import _NoArg 

66from ..sql.compiler import AggregateOrderByStyle 

67from ..sql.compiler import DDLCompiler 

68from ..sql.compiler import InsertmanyvaluesSentinelOpts 

69from ..sql.compiler import SQLCompiler 

70from ..sql.elements import quoted_name 

71from ..util.typing import TupleAny 

72from ..util.typing import Unpack 

73 

74if typing.TYPE_CHECKING: 

75 from types import ModuleType 

76 

77 from .base import Engine 

78 from .cursor import ResultFetchStrategy 

79 from .interfaces import _CoreMultiExecuteParams 

80 from .interfaces import _CoreSingleExecuteParams 

81 from .interfaces import _DBAPICursorDescription 

82 from .interfaces import _DBAPIMultiExecuteParams 

83 from .interfaces import _DBAPISingleExecuteParams 

84 from .interfaces import _ExecuteOptions 

85 from .interfaces import _MutableCoreSingleExecuteParams 

86 from .interfaces import _ParamStyle 

87 from .interfaces import ConnectArgsType 

88 from .interfaces import DBAPIConnection 

89 from .interfaces import DBAPIModule 

90 from .interfaces import DBAPIType 

91 from .interfaces import IsolationLevel 

92 from .row import Row 

93 from .url import URL 

94 from ..event import _ListenerFnType 

95 from ..pool import Pool 

96 from ..pool import PoolProxiedConnection 

97 from ..sql import Executable 

98 from ..sql.compiler import Compiled 

99 from ..sql.compiler import Linting 

100 from ..sql.compiler import ResultColumnsEntry 

101 from ..sql.dml import DMLState 

102 from ..sql.dml import UpdateBase 

103 from ..sql.elements import BindParameter 

104 from ..sql.schema import Column 

105 from ..sql.type_api import _BindProcessorType 

106 from ..sql.type_api import _ResultProcessorType 

107 from ..sql.type_api import TypeEngine 

108 

109 

110# When we're handed literal SQL, ensure it's a SELECT query 

111SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE) 

112 

113 

114( 

115 CACHE_HIT, 

116 CACHE_MISS, 

117 CACHING_DISABLED, 

118 NO_CACHE_KEY, 

119 NO_DIALECT_SUPPORT, 

120) = list(CacheStats) 

121 

122 

123class DefaultDialect(Dialect): 

124 """Default implementation of Dialect""" 

125 

126 statement_compiler = compiler.SQLCompiler 

127 ddl_compiler = compiler.DDLCompiler 

128 type_compiler_cls = compiler.GenericTypeCompiler 

129 

130 preparer = compiler.IdentifierPreparer 

131 supports_alter = True 

132 supports_comments = False 

133 supports_constraint_comments = False 

134 inline_comments = False 

135 supports_statement_cache = True 

136 

137 div_is_floordiv = True 

138 

139 bind_typing = interfaces.BindTyping.NONE 

140 

141 include_set_input_sizes: Optional[Set[Any]] = None 

142 exclude_set_input_sizes: Optional[Set[Any]] = None 

143 

144 # the first value we'd get for an autoincrement column. 

145 default_sequence_base = 1 

146 

147 # most DBAPIs happy with this for execute(). 

148 # not cx_oracle. 

149 execute_sequence_format = tuple 

150 

151 supports_schemas = True 

152 supports_views = True 

153 supports_sequences = False 

154 sequences_optional = False 

155 preexecute_autoincrement_sequences = False 

156 supports_identity_columns = False 

157 postfetch_lastrowid = True 

158 favor_returning_over_lastrowid = False 

159 insert_null_pk_still_autoincrements = False 

160 update_returning = False 

161 delete_returning = False 

162 update_returning_multifrom = False 

163 delete_returning_multifrom = False 

164 insert_returning = False 

165 

166 aggregate_order_by_style = AggregateOrderByStyle.INLINE 

167 

168 cte_follows_insert = False 

169 

170 supports_native_enum = False 

171 supports_native_boolean = False 

172 supports_native_uuid = False 

173 returns_native_bytes = False 

174 

175 non_native_boolean_check_constraint = True 

176 

177 supports_simple_order_by_label = True 

178 

179 tuple_in_values = False 

180 

181 connection_characteristics = util.immutabledict( 

182 { 

183 "isolation_level": characteristics.IsolationLevelCharacteristic(), 

184 "logging_token": characteristics.LoggingTokenCharacteristic(), 

185 } 

186 ) 

187 

188 engine_config_types: Mapping[str, Any] = util.immutabledict( 

189 { 

190 "pool_timeout": util.asint, 

191 "echo": util.bool_or_str("debug"), 

192 "echo_pool": util.bool_or_str("debug"), 

193 "pool_recycle": util.asint, 

194 "pool_size": util.asint, 

195 "max_overflow": util.asint, 

196 "future": util.asbool, 

197 } 

198 ) 

199 

200 # if the NUMERIC type 

201 # returns decimal.Decimal. 

202 # *not* the FLOAT type however. 

203 supports_native_decimal = False 

204 

205 name = "default" 

206 

207 # length at which to truncate 

208 # any identifier. 

209 max_identifier_length = 9999 

210 _user_defined_max_identifier_length: Optional[int] = None 

211 

212 isolation_level: Optional[str] = None 

213 

214 # sub-categories of max_identifier_length. 

215 # currently these accommodate for MySQL which allows alias names 

216 # of 255 but DDL names only of 64. 

217 max_index_name_length: Optional[int] = None 

218 max_constraint_name_length: Optional[int] = None 

219 

220 supports_sane_rowcount = True 

221 supports_sane_multi_rowcount = True 

222 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {} 

223 default_paramstyle = "named" 

224 

225 supports_default_values = False 

226 """dialect supports INSERT... DEFAULT VALUES syntax""" 

227 

228 supports_default_metavalue = False 

229 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

230 

231 default_metavalue_token = "DEFAULT" 

232 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

233 parenthesis.""" 

234 

235 # not sure if this is a real thing but the compiler will deliver it 

236 # if this is the only flag enabled. 

237 supports_empty_insert = True 

238 """dialect supports INSERT () VALUES ()""" 

239 

240 supports_multivalues_insert = False 

241 

242 use_insertmanyvalues: bool = False 

243 

244 use_insertmanyvalues_wo_returning: bool = False 

245 

246 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = ( 

247 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED 

248 ) 

249 

250 insertmanyvalues_page_size: int = 1000 

251 insertmanyvalues_max_parameters = 32700 

252 

253 supports_is_distinct_from = True 

254 

255 supports_server_side_cursors = False 

256 

257 server_side_cursors = False 

258 

259 # extra record-level locking features (#4860) 

260 supports_for_update_of = False 

261 

262 server_version_info = None 

263 

264 default_schema_name: Optional[str] = None 

265 

266 # indicates symbol names are 

267 # UPPERCASED if they are case insensitive 

268 # within the database. 

269 # if this is True, the methods normalize_name() 

270 # and denormalize_name() must be provided. 

271 requires_name_normalize = False 

272 

273 is_async = False 

274 

275 has_terminate = False 

276 

277 # TODO: this is not to be part of 2.0. implement rudimentary binary 

278 # literals for SQLite, PostgreSQL, MySQL only within 

279 # _Binary.literal_processor 

280 _legacy_binary_type_literal_encoding = "utf-8" 

281 

282 @util.deprecated_params( 

283 empty_in_strategy=( 

284 "1.4", 

285 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is " 

286 "deprecated, and no longer has any effect. All IN expressions " 

287 "are now rendered using " 

288 'the "expanding parameter" strategy which renders a set of bound' 

289 'expressions, or an "empty set" SELECT, at statement execution' 

290 "time.", 

291 ), 

292 server_side_cursors=( 

293 "1.4", 

294 "The :paramref:`_sa.create_engine.server_side_cursors` parameter " 

295 "is deprecated and will be removed in a future release. Please " 

296 "use the " 

297 ":paramref:`_engine.Connection.execution_options.stream_results` " 

298 "parameter.", 

299 ), 

300 ) 

301 def __init__( 

302 self, 

303 paramstyle: Optional[_ParamStyle] = None, 

304 isolation_level: Optional[IsolationLevel] = None, 

305 dbapi: Optional[ModuleType] = None, 

306 implicit_returning: Literal[True] = True, 

307 supports_native_boolean: Optional[bool] = None, 

308 max_identifier_length: Optional[int] = None, 

309 label_length: Optional[int] = None, 

310 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG, 

311 use_insertmanyvalues: Optional[bool] = None, 

312 # util.deprecated_params decorator cannot render the 

313 # Linting.NO_LINTING constant 

314 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore 

315 server_side_cursors: bool = False, 

316 skip_autocommit_rollback: bool = False, 

317 **kwargs: Any, 

318 ): 

319 if server_side_cursors: 

320 if not self.supports_server_side_cursors: 

321 raise exc.ArgumentError( 

322 "Dialect %s does not support server side cursors" % self 

323 ) 

324 else: 

325 self.server_side_cursors = True 

326 

327 if getattr(self, "use_setinputsizes", False): 

328 util.warn_deprecated( 

329 "The dialect-level use_setinputsizes attribute is " 

330 "deprecated. Please use " 

331 "bind_typing = BindTyping.SETINPUTSIZES", 

332 "2.0", 

333 ) 

334 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES 

335 

336 self.positional = False 

337 self._ischema = None 

338 

339 self.dbapi = dbapi 

340 

341 self.skip_autocommit_rollback = skip_autocommit_rollback 

342 

343 if paramstyle is not None: 

344 self.paramstyle = paramstyle 

345 elif self.dbapi is not None: 

346 self.paramstyle = self.dbapi.paramstyle 

347 else: 

348 self.paramstyle = self.default_paramstyle 

349 self.positional = self.paramstyle in ( 

350 "qmark", 

351 "format", 

352 "numeric", 

353 "numeric_dollar", 

354 ) 

355 self.identifier_preparer = self.preparer(self) 

356 self._on_connect_isolation_level = isolation_level 

357 

358 legacy_tt_callable = getattr(self, "type_compiler", None) 

359 if legacy_tt_callable is not None: 

360 tt_callable = cast( 

361 Type[compiler.GenericTypeCompiler], 

362 self.type_compiler, 

363 ) 

364 else: 

365 tt_callable = self.type_compiler_cls 

366 

367 self.type_compiler_instance = self.type_compiler = tt_callable(self) 

368 

369 if supports_native_boolean is not None: 

370 self.supports_native_boolean = supports_native_boolean 

371 

372 self._user_defined_max_identifier_length = max_identifier_length 

373 if self._user_defined_max_identifier_length: 

374 self.max_identifier_length = ( 

375 self._user_defined_max_identifier_length 

376 ) 

377 self.label_length = label_length 

378 self.compiler_linting = compiler_linting 

379 

380 if use_insertmanyvalues is not None: 

381 self.use_insertmanyvalues = use_insertmanyvalues 

382 

383 if insertmanyvalues_page_size is not _NoArg.NO_ARG: 

384 self.insertmanyvalues_page_size = insertmanyvalues_page_size 

385 

386 @property 

387 @util.deprecated( 

388 "2.0", 

389 "full_returning is deprecated, please use insert_returning, " 

390 "update_returning, delete_returning", 

391 ) 

392 def full_returning(self): 

393 return ( 

394 self.insert_returning 

395 and self.update_returning 

396 and self.delete_returning 

397 ) 

398 

399 @util.memoized_property 

400 def insert_executemany_returning(self): 

401 """Default implementation for insert_executemany_returning, if not 

402 otherwise overridden by the specific dialect. 

403 

404 The default dialect determines "insert_executemany_returning" is 

405 available if the dialect in use has opted into using the 

406 "use_insertmanyvalues" feature. If they haven't opted into that, then 

407 this attribute is False, unless the dialect in question overrides this 

408 and provides some other implementation (such as the Oracle Database 

409 dialects). 

410 

411 """ 

412 return self.insert_returning and self.use_insertmanyvalues 

413 

414 @util.memoized_property 

415 def insert_executemany_returning_sort_by_parameter_order(self): 

416 """Default implementation for 

417 insert_executemany_returning_deterministic_order, if not otherwise 

418 overridden by the specific dialect. 

419 

420 The default dialect determines "insert_executemany_returning" can have 

421 deterministic order only if the dialect in use has opted into using the 

422 "use_insertmanyvalues" feature, which implements deterministic ordering 

423 using client side sentinel columns only by default. The 

424 "insertmanyvalues" feature also features alternate forms that can 

425 use server-generated PK values as "sentinels", but those are only 

426 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel` 

427 bitflag enables those alternate SQL forms, which are disabled 

428 by default. 

429 

430 If the dialect in use hasn't opted into that, then this attribute is 

431 False, unless the dialect in question overrides this and provides some 

432 other implementation (such as the Oracle Database dialects). 

433 

434 """ 

435 return self.insert_returning and self.use_insertmanyvalues 

436 

437 update_executemany_returning = False 

438 delete_executemany_returning = False 

439 

440 @util.memoized_property 

441 def loaded_dbapi(self) -> DBAPIModule: 

442 if self.dbapi is None: 

443 raise exc.InvalidRequestError( 

444 f"Dialect {self} does not have a Python DBAPI established " 

445 "and cannot be used for actual database interaction" 

446 ) 

447 return self.dbapi 

448 

449 @util.memoized_property 

450 def _bind_typing_render_casts(self): 

451 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS 

452 

453 def _ensure_has_table_connection(self, arg: Connection) -> None: 

454 if not isinstance(arg, Connection): 

455 raise exc.ArgumentError( 

456 "The argument passed to Dialect.has_table() should be a " 

457 "%s, got %s. " 

458 "Additionally, the Dialect.has_table() method is for " 

459 "internal dialect " 

460 "use only; please use " 

461 "``inspect(some_engine).has_table(<tablename>>)`` " 

462 "for public API use." % (Connection, type(arg)) 

463 ) 

464 

465 @util.memoized_property 

466 def _supports_statement_cache(self): 

467 ssc = self.__class__.__dict__.get("supports_statement_cache", None) 

468 if ssc is None: 

469 util.warn( 

470 "Dialect %s:%s will not make use of SQL compilation caching " 

471 "as it does not set the 'supports_statement_cache' attribute " 

472 "to ``True``. This can have " 

473 "significant performance implications including some " 

474 "performance degradations in comparison to prior SQLAlchemy " 

475 "versions. Dialect maintainers should seek to set this " 

476 "attribute to True after appropriate development and testing " 

477 "for SQLAlchemy 1.4 caching support. Alternatively, this " 

478 "attribute may be set to False which will disable this " 

479 "warning." % (self.name, self.driver), 

480 code="cprf", 

481 ) 

482 

483 return bool(ssc) 

484 

485 @util.memoized_property 

486 def _type_memos(self): 

487 return weakref.WeakKeyDictionary() 

488 

489 @property 

490 def dialect_description(self): # type: ignore[override] 

491 return self.name + "+" + self.driver 

492 

493 @property 

494 def supports_sane_rowcount_returning(self): 

495 """True if this dialect supports sane rowcount even if RETURNING is 

496 in use. 

497 

498 For dialects that don't support RETURNING, this is synonymous with 

499 ``supports_sane_rowcount``. 

500 

501 """ 

502 return self.supports_sane_rowcount 

503 

504 @classmethod 

505 def get_pool_class(cls, url: URL) -> Type[Pool]: 

506 default: Type[pool.Pool] 

507 if cls.is_async: 

508 default = pool.AsyncAdaptedQueuePool 

509 else: 

510 default = pool.QueuePool 

511 

512 return getattr(cls, "poolclass", default) 

513 

514 def get_dialect_pool_class(self, url: URL) -> Type[Pool]: 

515 return self.get_pool_class(url) 

516 

517 @classmethod 

518 def load_provisioning(cls): 

519 package = ".".join(cls.__module__.split(".")[0:-1]) 

520 try: 

521 __import__(package + ".provision") 

522 except ImportError: 

523 pass 

524 

525 def _builtin_onconnect(self) -> Optional[_ListenerFnType]: 

526 if self._on_connect_isolation_level is not None: 

527 

528 def builtin_connect(dbapi_conn, conn_rec): 

529 self._assert_and_set_isolation_level( 

530 dbapi_conn, self._on_connect_isolation_level 

531 ) 

532 

533 return builtin_connect 

534 else: 

535 return None 

536 

537 def initialize(self, connection: Connection) -> None: 

538 try: 

539 self.server_version_info = self._get_server_version_info( 

540 connection 

541 ) 

542 except NotImplementedError: 

543 self.server_version_info = None 

544 try: 

545 self.default_schema_name = self._get_default_schema_name( 

546 connection 

547 ) 

548 except NotImplementedError: 

549 self.default_schema_name = None 

550 

551 try: 

552 self.default_isolation_level = self.get_default_isolation_level( 

553 connection.connection.dbapi_connection 

554 ) 

555 except NotImplementedError: 

556 self.default_isolation_level = None 

557 

558 if not self._user_defined_max_identifier_length: 

559 max_ident_length = self._check_max_identifier_length(connection) 

560 if max_ident_length: 

561 self.max_identifier_length = max_ident_length 

562 

563 if ( 

564 self.label_length 

565 and self.label_length > self.max_identifier_length 

566 ): 

567 raise exc.ArgumentError( 

568 "Label length of %d is greater than this dialect's" 

569 " maximum identifier length of %d" 

570 % (self.label_length, self.max_identifier_length) 

571 ) 

572 

573 def on_connect(self) -> Optional[Callable[[Any], None]]: 

574 # inherits the docstring from interfaces.Dialect.on_connect 

575 return None 

576 

577 def _check_max_identifier_length(self, connection): 

578 """Perform a connection / server version specific check to determine 

579 the max_identifier_length. 

580 

581 If the dialect's class level max_identifier_length should be used, 

582 can return None. 

583 

584 """ 

585 return None 

586 

587 def get_default_isolation_level(self, dbapi_conn): 

588 """Given a DBAPI connection, return its isolation level, or 

589 a default isolation level if one cannot be retrieved. 

590 

591 May be overridden by subclasses in order to provide a 

592 "fallback" isolation level for databases that cannot reliably 

593 retrieve the actual isolation level. 

594 

595 By default, calls the :meth:`_engine.Interfaces.get_isolation_level` 

596 method, propagating any exceptions raised. 

597 

598 """ 

599 return self.get_isolation_level(dbapi_conn) 

600 

601 def type_descriptor(self, typeobj): 

602 """Provide a database-specific :class:`.TypeEngine` object, given 

603 the generic object which comes from the types module. 

604 

605 This method looks for a dictionary called 

606 ``colspecs`` as a class or instance-level variable, 

607 and passes on to :func:`_types.adapt_type`. 

608 

609 """ 

610 return type_api.adapt_type(typeobj, self.colspecs) 

611 

612 def has_index(self, connection, table_name, index_name, schema=None, **kw): 

613 if not self.has_table(connection, table_name, schema=schema, **kw): 

614 return False 

615 for idx in self.get_indexes( 

616 connection, table_name, schema=schema, **kw 

617 ): 

618 if idx["name"] == index_name: 

619 return True 

620 else: 

621 return False 

622 

623 def has_schema( 

624 self, connection: Connection, schema_name: str, **kw: Any 

625 ) -> bool: 

626 return schema_name in self.get_schema_names(connection, **kw) 

627 

628 def validate_identifier(self, ident: str) -> None: 

629 if len(ident) > self.max_identifier_length: 

630 raise exc.IdentifierError( 

631 "Identifier '%s' exceeds maximum length of %d characters" 

632 % (ident, self.max_identifier_length) 

633 ) 

634 

635 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection: 

636 # inherits the docstring from interfaces.Dialect.connect 

637 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501 

638 

639 def create_connect_args(self, url: URL) -> ConnectArgsType: 

640 # inherits the docstring from interfaces.Dialect.create_connect_args 

641 opts = url.translate_connect_args() 

642 opts.update(url.query) 

643 return ([], opts) 

644 

645 def set_engine_execution_options( 

646 self, engine: Engine, opts: Mapping[str, Any] 

647 ) -> None: 

648 supported_names = set(self.connection_characteristics).intersection( 

649 opts 

650 ) 

651 if supported_names: 

652 characteristics: Mapping[str, Any] = util.immutabledict( 

653 (name, opts[name]) for name in supported_names 

654 ) 

655 

656 @event.listens_for(engine, "engine_connect") 

657 def set_connection_characteristics(connection): 

658 self._set_connection_characteristics( 

659 connection, characteristics 

660 ) 

661 

662 def set_connection_execution_options( 

663 self, connection: Connection, opts: Mapping[str, Any] 

664 ) -> None: 

665 supported_names = set(self.connection_characteristics).intersection( 

666 opts 

667 ) 

668 if supported_names: 

669 characteristics: Mapping[str, Any] = util.immutabledict( 

670 (name, opts[name]) for name in supported_names 

671 ) 

672 self._set_connection_characteristics(connection, characteristics) 

673 

674 def _set_connection_characteristics(self, connection, characteristics): 

675 characteristic_values = [ 

676 (name, self.connection_characteristics[name], value) 

677 for name, value in characteristics.items() 

678 ] 

679 

680 if connection.in_transaction(): 

681 trans_objs = [ 

682 (name, obj) 

683 for name, obj, _ in characteristic_values 

684 if obj.transactional 

685 ] 

686 if trans_objs: 

687 raise exc.InvalidRequestError( 

688 "This connection has already initialized a SQLAlchemy " 

689 "Transaction() object via begin() or autobegin; " 

690 "%s may not be altered unless rollback() or commit() " 

691 "is called first." 

692 % (", ".join(name for name, obj in trans_objs)) 

693 ) 

694 

695 dbapi_connection = connection.connection.dbapi_connection 

696 for _, characteristic, value in characteristic_values: 

697 characteristic.set_connection_characteristic( 

698 self, connection, dbapi_connection, value 

699 ) 

700 connection.connection._connection_record.finalize_callback.append( 

701 functools.partial(self._reset_characteristics, characteristics) 

702 ) 

703 

704 def _reset_characteristics(self, characteristics, dbapi_connection): 

705 for characteristic_name in characteristics: 

706 characteristic = self.connection_characteristics[ 

707 characteristic_name 

708 ] 

709 characteristic.reset_characteristic(self, dbapi_connection) 

710 

711 def do_begin(self, dbapi_connection): 

712 pass 

713 

714 def do_rollback(self, dbapi_connection): 

715 if self.skip_autocommit_rollback and self.detect_autocommit_setting( 

716 dbapi_connection 

717 ): 

718 return 

719 dbapi_connection.rollback() 

720 

721 def do_commit(self, dbapi_connection): 

722 dbapi_connection.commit() 

723 

724 def do_terminate(self, dbapi_connection): 

725 self.do_close(dbapi_connection) 

726 

727 def do_close(self, dbapi_connection): 

728 dbapi_connection.close() 

729 

730 @util.memoized_property 

731 def _dialect_specific_select_one(self): 

732 return str(expression.select(1).compile(dialect=self)) 

733 

734 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: 

735 try: 

736 return self.do_ping(dbapi_connection) 

737 except self.loaded_dbapi.Error as err: 

738 is_disconnect = self.is_disconnect(err, dbapi_connection, None) 

739 

740 if self._has_events: 

741 try: 

742 Connection._handle_dbapi_exception_noconnection( 

743 err, 

744 self, 

745 is_disconnect=is_disconnect, 

746 invalidate_pool_on_disconnect=False, 

747 is_pre_ping=True, 

748 ) 

749 except exc.StatementError as new_err: 

750 is_disconnect = new_err.connection_invalidated 

751 

752 if is_disconnect: 

753 return False 

754 else: 

755 raise 

756 

757 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool: 

758 cursor = dbapi_connection.cursor() 

759 try: 

760 cursor.execute(self._dialect_specific_select_one) 

761 finally: 

762 cursor.close() 

763 return True 

764 

765 def create_xid(self): 

766 """Create a random two-phase transaction ID. 

767 

768 This id will be passed to do_begin_twophase(), do_rollback_twophase(), 

769 do_commit_twophase(). Its format is unspecified. 

770 """ 

771 

772 return "_sa_%032x" % random.randint(0, 2**128) 

773 

774 def do_savepoint(self, connection, name): 

775 connection.execute(expression.SavepointClause(name)) 

776 

777 def do_rollback_to_savepoint(self, connection, name): 

778 connection.execute(expression.RollbackToSavepointClause(name)) 

779 

780 def do_release_savepoint(self, connection, name): 

781 connection.execute(expression.ReleaseSavepointClause(name)) 

782 

783 def _deliver_insertmanyvalues_batches( 

784 self, 

785 connection, 

786 cursor, 

787 statement, 

788 parameters, 

789 generic_setinputsizes, 

790 context, 

791 ): 

792 context = cast(DefaultExecutionContext, context) 

793 compiled = cast(SQLCompiler, context.compiled) 

794 

795 _composite_sentinel_proc: Sequence[ 

796 Optional[_ResultProcessorType[Any]] 

797 ] = () 

798 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None 

799 _sentinel_proc_initialized: bool = False 

800 

801 compiled_parameters = context.compiled_parameters 

802 

803 imv = compiled._insertmanyvalues 

804 assert imv is not None 

805 

806 is_returning: Final[bool] = bool(compiled.effective_returning) 

807 batch_size = context.execution_options.get( 

808 "insertmanyvalues_page_size", self.insertmanyvalues_page_size 

809 ) 

810 

811 if compiled.schema_translate_map: 

812 schema_translate_map = context.execution_options.get( 

813 "schema_translate_map", {} 

814 ) 

815 else: 

816 schema_translate_map = None 

817 

818 if is_returning: 

819 result: Optional[List[Any]] = [] 

820 context._insertmanyvalues_rows = result 

821 

822 sort_by_parameter_order = imv.sort_by_parameter_order 

823 

824 else: 

825 sort_by_parameter_order = False 

826 result = None 

827 

828 for imv_batch in compiled._deliver_insertmanyvalues_batches( 

829 statement, 

830 parameters, 

831 compiled_parameters, 

832 generic_setinputsizes, 

833 batch_size, 

834 sort_by_parameter_order, 

835 schema_translate_map, 

836 ): 

837 yield imv_batch 

838 

839 if is_returning: 

840 

841 try: 

842 rows = context.fetchall_for_returning(cursor) 

843 except BaseException as be: 

844 connection._handle_dbapi_exception( 

845 be, 

846 sql_util._long_statement(imv_batch.replaced_statement), 

847 imv_batch.replaced_parameters, 

848 None, 

849 context, 

850 is_sub_exec=True, 

851 ) 

852 

853 # I would have thought "is_returning: Final[bool]" 

854 # would have assured this but pylance thinks not 

855 assert result is not None 

856 

857 if imv.num_sentinel_columns and not imv_batch.is_downgraded: 

858 composite_sentinel = imv.num_sentinel_columns > 1 

859 if imv.implicit_sentinel: 

860 # for implicit sentinel, which is currently single-col 

861 # integer autoincrement, do a simple sort. 

862 assert not composite_sentinel 

863 result.extend( 

864 sorted(rows, key=operator.itemgetter(-1)) 

865 ) 

866 continue 

867 

868 # otherwise, create dictionaries to match up batches 

869 # with parameters 

870 assert imv.sentinel_param_keys 

871 assert imv.sentinel_columns 

872 

873 _nsc = imv.num_sentinel_columns 

874 

875 if not _sentinel_proc_initialized: 

876 if composite_sentinel: 

877 _composite_sentinel_proc = [ 

878 col.type._cached_result_processor( 

879 self, cursor_desc[1] 

880 ) 

881 for col, cursor_desc in zip( 

882 imv.sentinel_columns, 

883 cursor.description[-_nsc:], 

884 ) 

885 ] 

886 else: 

887 _scalar_sentinel_proc = ( 

888 imv.sentinel_columns[0] 

889 ).type._cached_result_processor( 

890 self, cursor.description[-1][1] 

891 ) 

892 _sentinel_proc_initialized = True 

893 

894 rows_by_sentinel: Union[ 

895 Dict[Tuple[Any, ...], Any], 

896 Dict[Any, Any], 

897 ] 

898 if composite_sentinel: 

899 rows_by_sentinel = { 

900 tuple( 

901 (proc(val) if proc else val) 

902 for val, proc in zip( 

903 row[-_nsc:], _composite_sentinel_proc 

904 ) 

905 ): row 

906 for row in rows 

907 } 

908 elif _scalar_sentinel_proc: 

909 rows_by_sentinel = { 

910 _scalar_sentinel_proc(row[-1]): row for row in rows 

911 } 

912 else: 

913 rows_by_sentinel = {row[-1]: row for row in rows} 

914 

915 if len(rows_by_sentinel) != len(imv_batch.batch): 

916 # see test_insert_exec.py:: 

917 # IMVSentinelTest::test_sentinel_incorrect_rowcount 

918 # for coverage / demonstration 

919 raise exc.InvalidRequestError( 

920 f"Sentinel-keyed result set did not produce " 

921 f"correct number of rows {len(imv_batch.batch)}; " 

922 "produced " 

923 f"{len(rows_by_sentinel)}. Please ensure the " 

924 "sentinel column is fully unique and populated in " 

925 "all cases." 

926 ) 

927 

928 try: 

929 ordered_rows = [ 

930 rows_by_sentinel[sentinel_keys] 

931 for sentinel_keys in imv_batch.sentinel_values 

932 ] 

933 except KeyError as ke: 

934 # see test_insert_exec.py:: 

935 # IMVSentinelTest::test_sentinel_cant_match_keys 

936 # for coverage / demonstration 

937 raise exc.InvalidRequestError( 

938 f"Can't match sentinel values in result set to " 

939 f"parameter sets; key {ke.args[0]!r} was not " 

940 "found. " 

941 "There may be a mismatch between the datatype " 

942 "passed to the DBAPI driver vs. that which it " 

943 "returns in a result row. Ensure the given " 

944 "Python value matches the expected result type " 

945 "*exactly*, taking care to not rely upon implicit " 

946 "conversions which may occur such as when using " 

947 "strings in place of UUID or integer values, etc. " 

948 ) from ke 

949 

950 result.extend(ordered_rows) 

951 

952 else: 

953 result.extend(rows) 

954 

955 def do_executemany(self, cursor, statement, parameters, context=None): 

956 cursor.executemany(statement, parameters) 

957 

958 def do_execute(self, cursor, statement, parameters, context=None): 

959 cursor.execute(statement, parameters) 

960 

961 def do_execute_no_params(self, cursor, statement, context=None): 

962 cursor.execute(statement) 

963 

964 def is_disconnect( 

965 self, 

966 e: DBAPIModule.Error, 

967 connection: Union[ 

968 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None 

969 ], 

970 cursor: Optional[interfaces.DBAPICursor], 

971 ) -> bool: 

972 return False 

973 

974 @util.memoized_instancemethod 

975 def _gen_allowed_isolation_levels(self, dbapi_conn): 

976 try: 

977 raw_levels = list(self.get_isolation_level_values(dbapi_conn)) 

978 except NotImplementedError: 

979 return None 

980 else: 

981 normalized_levels = [ 

982 level.replace("_", " ").upper() for level in raw_levels 

983 ] 

984 if raw_levels != normalized_levels: 

985 raise ValueError( 

986 f"Dialect {self.name!r} get_isolation_level_values() " 

987 f"method should return names as UPPERCASE using spaces, " 

988 f"not underscores; got " 

989 f"{sorted(set(raw_levels).difference(normalized_levels))}" 

990 ) 

991 return tuple(normalized_levels) 

992 

993 def _assert_and_set_isolation_level(self, dbapi_conn, level): 

994 level = level.replace("_", " ").upper() 

995 

996 _allowed_isolation_levels = self._gen_allowed_isolation_levels( 

997 dbapi_conn 

998 ) 

999 if ( 

1000 _allowed_isolation_levels 

1001 and level not in _allowed_isolation_levels 

1002 ): 

1003 raise exc.ArgumentError( 

1004 f"Invalid value {level!r} for isolation_level. " 

1005 f"Valid isolation levels for {self.name!r} are " 

1006 f"{', '.join(_allowed_isolation_levels)}" 

1007 ) 

1008 

1009 self.set_isolation_level(dbapi_conn, level) 

1010 

1011 def reset_isolation_level(self, dbapi_conn): 

1012 if self._on_connect_isolation_level is not None: 

1013 assert ( 

1014 self._on_connect_isolation_level == "AUTOCOMMIT" 

1015 or self._on_connect_isolation_level 

1016 == self.default_isolation_level 

1017 ) 

1018 self._assert_and_set_isolation_level( 

1019 dbapi_conn, self._on_connect_isolation_level 

1020 ) 

1021 else: 

1022 assert self.default_isolation_level is not None 

1023 self._assert_and_set_isolation_level( 

1024 dbapi_conn, 

1025 self.default_isolation_level, 

1026 ) 

1027 

1028 def normalize_name(self, name): 

1029 if name is None: 

1030 return None 

1031 

1032 name_lower = name.lower() 

1033 name_upper = name.upper() 

1034 

1035 if name_upper == name_lower: 

1036 # name has no upper/lower conversion, e.g. non-european characters. 

1037 # return unchanged 

1038 return name 

1039 elif name_upper == name and not ( 

1040 self.identifier_preparer._requires_quotes 

1041 )(name_lower): 

1042 # name is all uppercase and doesn't require quoting; normalize 

1043 # to all lower case 

1044 return name_lower 

1045 elif name_lower == name: 

1046 # name is all lower case, which if denormalized means we need to 

1047 # force quoting on it 

1048 return quoted_name(name, quote=True) 

1049 else: 

1050 # name is mixed case, means it will be quoted in SQL when used 

1051 # later, no normalizes 

1052 return name 

1053 

1054 def denormalize_name(self, name): 

1055 if name is None: 

1056 return None 

1057 

1058 name_lower = name.lower() 

1059 name_upper = name.upper() 

1060 

1061 if name_upper == name_lower: 

1062 # name has no upper/lower conversion, e.g. non-european characters. 

1063 # return unchanged 

1064 return name 

1065 elif name_lower == name and not ( 

1066 self.identifier_preparer._requires_quotes 

1067 )(name_lower): 

1068 name = name_upper 

1069 return name 

1070 

1071 def get_driver_connection(self, connection: DBAPIConnection) -> Any: 

1072 return connection 

1073 

1074 def _overrides_default(self, method): 

1075 return ( 

1076 getattr(type(self), method).__code__ 

1077 is not getattr(DefaultDialect, method).__code__ 

1078 ) 

1079 

1080 def _default_multi_reflect( 

1081 self, 

1082 single_tbl_method, 

1083 connection, 

1084 kind, 

1085 schema, 

1086 filter_names, 

1087 scope, 

1088 **kw, 

1089 ): 

1090 names_fns = [] 

1091 temp_names_fns = [] 

1092 if ObjectKind.TABLE in kind: 

1093 names_fns.append(self.get_table_names) 

1094 temp_names_fns.append(self.get_temp_table_names) 

1095 if ObjectKind.VIEW in kind: 

1096 names_fns.append(self.get_view_names) 

1097 temp_names_fns.append(self.get_temp_view_names) 

1098 if ObjectKind.MATERIALIZED_VIEW in kind: 

1099 names_fns.append(self.get_materialized_view_names) 

1100 # no temp materialized view at the moment 

1101 # temp_names_fns.append(self.get_temp_materialized_view_names) 

1102 

1103 unreflectable = kw.pop("unreflectable", {}) 

1104 

1105 if ( 

1106 filter_names 

1107 and scope is ObjectScope.ANY 

1108 and kind is ObjectKind.ANY 

1109 ): 

1110 # if names are given and no qualification on type of table 

1111 # (i.e. the Table(..., autoload) case), take the names as given, 

1112 # don't run names queries. If a table does not exit 

1113 # NoSuchTableError is raised and it's skipped 

1114 

1115 # this also suits the case for mssql where we can reflect 

1116 # individual temp tables but there's no temp_names_fn 

1117 names = filter_names 

1118 else: 

1119 names = [] 

1120 name_kw = {"schema": schema, **kw} 

1121 fns = [] 

1122 if ObjectScope.DEFAULT in scope: 

1123 fns.extend(names_fns) 

1124 if ObjectScope.TEMPORARY in scope: 

1125 fns.extend(temp_names_fns) 

1126 

1127 for fn in fns: 

1128 try: 

1129 names.extend(fn(connection, **name_kw)) 

1130 except NotImplementedError: 

1131 pass 

1132 

1133 if filter_names: 

1134 filter_names = set(filter_names) 

1135 

1136 # iterate over all the tables/views and call the single table method 

1137 for table in names: 

1138 if not filter_names or table in filter_names: 

1139 key = (schema, table) 

1140 try: 

1141 yield ( 

1142 key, 

1143 single_tbl_method( 

1144 connection, table, schema=schema, **kw 

1145 ), 

1146 ) 

1147 except exc.UnreflectableTableError as err: 

1148 if key not in unreflectable: 

1149 unreflectable[key] = err 

1150 except exc.NoSuchTableError: 

1151 pass 

1152 

1153 def get_multi_table_options(self, connection, **kw): 

1154 return self._default_multi_reflect( 

1155 self.get_table_options, connection, **kw 

1156 ) 

1157 

1158 def get_multi_columns(self, connection, **kw): 

1159 return self._default_multi_reflect(self.get_columns, connection, **kw) 

1160 

1161 def get_multi_pk_constraint(self, connection, **kw): 

1162 return self._default_multi_reflect( 

1163 self.get_pk_constraint, connection, **kw 

1164 ) 

1165 

1166 def get_multi_foreign_keys(self, connection, **kw): 

1167 return self._default_multi_reflect( 

1168 self.get_foreign_keys, connection, **kw 

1169 ) 

1170 

1171 def get_multi_indexes(self, connection, **kw): 

1172 return self._default_multi_reflect(self.get_indexes, connection, **kw) 

1173 

1174 def get_multi_unique_constraints(self, connection, **kw): 

1175 return self._default_multi_reflect( 

1176 self.get_unique_constraints, connection, **kw 

1177 ) 

1178 

1179 def get_multi_check_constraints(self, connection, **kw): 

1180 return self._default_multi_reflect( 

1181 self.get_check_constraints, connection, **kw 

1182 ) 

1183 

1184 def get_multi_table_comment(self, connection, **kw): 

1185 return self._default_multi_reflect( 

1186 self.get_table_comment, connection, **kw 

1187 ) 

1188 

1189 

1190class StrCompileDialect(DefaultDialect): 

1191 statement_compiler = compiler.StrSQLCompiler 

1192 ddl_compiler = compiler.DDLCompiler 

1193 type_compiler_cls = compiler.StrSQLTypeCompiler 

1194 preparer = compiler.IdentifierPreparer 

1195 

1196 insert_returning = True 

1197 update_returning = True 

1198 delete_returning = True 

1199 

1200 supports_statement_cache = True 

1201 

1202 supports_identity_columns = True 

1203 

1204 supports_sequences = True 

1205 sequences_optional = True 

1206 preexecute_autoincrement_sequences = False 

1207 

1208 supports_native_boolean = True 

1209 

1210 supports_multivalues_insert = True 

1211 supports_simple_order_by_label = True 

1212 

1213 

1214class DefaultExecutionContext(ExecutionContext): 

1215 isinsert = False 

1216 isupdate = False 

1217 isdelete = False 

1218 is_crud = False 

1219 is_text = False 

1220 isddl = False 

1221 

1222 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE 

1223 

1224 compiled: Optional[Compiled] = None 

1225 result_column_struct: Optional[ 

1226 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool] 

1227 ] = None 

1228 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None 

1229 

1230 execution_options: _ExecuteOptions = util.EMPTY_DICT 

1231 

1232 cursor_fetch_strategy = _cursor._DEFAULT_FETCH 

1233 

1234 invoked_statement: Optional[Executable] = None 

1235 

1236 _is_implicit_returning = False 

1237 _is_explicit_returning = False 

1238 _is_supplemental_returning = False 

1239 _is_server_side = False 

1240 

1241 _soft_closed = False 

1242 

1243 _rowcount: Optional[int] = None 

1244 

1245 # a hook for SQLite's translation of 

1246 # result column names 

1247 # NOTE: pyhive is using this hook, can't remove it :( 

1248 _translate_colname: Optional[ 

1249 Callable[[str], Tuple[str, Optional[str]]] 

1250 ] = None 

1251 

1252 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict() 

1253 """used by set_input_sizes(). 

1254 

1255 This collection comes from ``ExpandedState.parameter_expansion``. 

1256 

1257 """ 

1258 

1259 cache_hit = NO_CACHE_KEY 

1260 

1261 root_connection: Connection 

1262 _dbapi_connection: PoolProxiedConnection 

1263 dialect: Dialect 

1264 unicode_statement: str 

1265 cursor: DBAPICursor 

1266 compiled_parameters: List[_MutableCoreSingleExecuteParams] 

1267 parameters: _DBAPIMultiExecuteParams 

1268 extracted_parameters: Optional[Sequence[BindParameter[Any]]] 

1269 

1270 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) 

1271 

1272 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None 

1273 _num_sentinel_cols: int = 0 

1274 

1275 @classmethod 

1276 def _init_ddl( 

1277 cls, 

1278 dialect: Dialect, 

1279 connection: Connection, 

1280 dbapi_connection: PoolProxiedConnection, 

1281 execution_options: _ExecuteOptions, 

1282 compiled_ddl: DDLCompiler, 

1283 ) -> ExecutionContext: 

1284 """Initialize execution context for an ExecutableDDLElement 

1285 construct.""" 

1286 

1287 self = cls.__new__(cls) 

1288 self.root_connection = connection 

1289 self._dbapi_connection = dbapi_connection 

1290 self.dialect = connection.dialect 

1291 

1292 self.compiled = compiled = compiled_ddl 

1293 self.isddl = True 

1294 

1295 self.execution_options = execution_options 

1296 

1297 self.unicode_statement = str(compiled) 

1298 if compiled.schema_translate_map: 

1299 schema_translate_map = self.execution_options.get( 

1300 "schema_translate_map", {} 

1301 ) 

1302 

1303 rst = compiled.preparer._render_schema_translates 

1304 self.unicode_statement = rst( 

1305 self.unicode_statement, schema_translate_map 

1306 ) 

1307 

1308 self.statement = self.unicode_statement 

1309 

1310 self.cursor = self.create_cursor() 

1311 self.compiled_parameters = [] 

1312 

1313 if dialect.positional: 

1314 self.parameters = [dialect.execute_sequence_format()] 

1315 else: 

1316 self.parameters = [self._empty_dict_params] 

1317 

1318 return self 

1319 

1320 @classmethod 

1321 def _init_compiled( 

1322 cls, 

1323 dialect: Dialect, 

1324 connection: Connection, 

1325 dbapi_connection: PoolProxiedConnection, 

1326 execution_options: _ExecuteOptions, 

1327 compiled: SQLCompiler, 

1328 parameters: _CoreMultiExecuteParams, 

1329 invoked_statement: Executable, 

1330 extracted_parameters: Optional[Sequence[BindParameter[Any]]], 

1331 cache_hit: CacheStats = CacheStats.CACHING_DISABLED, 

1332 param_dict: _CoreSingleExecuteParams | None = None, 

1333 ) -> ExecutionContext: 

1334 """Initialize execution context for a Compiled construct.""" 

1335 

1336 self = cls.__new__(cls) 

1337 self.root_connection = connection 

1338 self._dbapi_connection = dbapi_connection 

1339 self.dialect = connection.dialect 

1340 self.extracted_parameters = extracted_parameters 

1341 self.invoked_statement = invoked_statement 

1342 self.compiled = compiled 

1343 self.cache_hit = cache_hit 

1344 

1345 self.execution_options = execution_options 

1346 

1347 self.result_column_struct = ( 

1348 compiled._result_columns, 

1349 compiled._ordered_columns, 

1350 compiled._textual_ordered_columns, 

1351 compiled._ad_hoc_textual, 

1352 compiled._loose_column_name_matching, 

1353 ) 

1354 

1355 self.isinsert = ii = compiled.isinsert 

1356 self.isupdate = iu = compiled.isupdate 

1357 self.isdelete = id_ = compiled.isdelete 

1358 self.is_text = compiled.isplaintext 

1359 

1360 if ii or iu or id_: 

1361 dml_statement = compiled.compile_state.statement # type: ignore 

1362 if TYPE_CHECKING: 

1363 assert isinstance(dml_statement, UpdateBase) 

1364 self.is_crud = True 

1365 self._is_explicit_returning = ier = bool(dml_statement._returning) 

1366 self._is_implicit_returning = iir = bool( 

1367 compiled.implicit_returning 

1368 ) 

1369 if iir and dml_statement._supplemental_returning: 

1370 self._is_supplemental_returning = True 

1371 

1372 # dont mix implicit and explicit returning 

1373 assert not (iir and ier) 

1374 

1375 if (ier or iir) and compiled.for_executemany: 

1376 if ii and not self.dialect.insert_executemany_returning: 

1377 raise exc.InvalidRequestError( 

1378 f"Dialect {self.dialect.dialect_description} with " 

1379 f"current server capabilities does not support " 

1380 "INSERT..RETURNING when executemany is used" 

1381 ) 

1382 elif ( 

1383 ii 

1384 and dml_statement._sort_by_parameter_order 

1385 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501 

1386 ): 

1387 raise exc.InvalidRequestError( 

1388 f"Dialect {self.dialect.dialect_description} with " 

1389 f"current server capabilities does not support " 

1390 "INSERT..RETURNING with deterministic row ordering " 

1391 "when executemany is used" 

1392 ) 

1393 elif ( 

1394 ii 

1395 and self.dialect.use_insertmanyvalues 

1396 and not compiled._insertmanyvalues 

1397 ): 

1398 raise exc.InvalidRequestError( 

1399 'Statement does not have "insertmanyvalues" ' 

1400 "enabled, can't use INSERT..RETURNING with " 

1401 "executemany in this case." 

1402 ) 

1403 elif iu and not self.dialect.update_executemany_returning: 

1404 raise exc.InvalidRequestError( 

1405 f"Dialect {self.dialect.dialect_description} with " 

1406 f"current server capabilities does not support " 

1407 "UPDATE..RETURNING when executemany is used" 

1408 ) 

1409 elif id_ and not self.dialect.delete_executemany_returning: 

1410 raise exc.InvalidRequestError( 

1411 f"Dialect {self.dialect.dialect_description} with " 

1412 f"current server capabilities does not support " 

1413 "DELETE..RETURNING when executemany is used" 

1414 ) 

1415 

1416 if not parameters: 

1417 self.compiled_parameters = [ 

1418 compiled.construct_params( 

1419 extracted_parameters=extracted_parameters, 

1420 escape_names=False, 

1421 _collected_params=param_dict, 

1422 ) 

1423 ] 

1424 else: 

1425 self.compiled_parameters = [ 

1426 compiled.construct_params( 

1427 m, 

1428 escape_names=False, 

1429 _group_number=grp, 

1430 extracted_parameters=extracted_parameters, 

1431 _collected_params=param_dict, 

1432 ) 

1433 for grp, m in enumerate(parameters) 

1434 ] 

1435 

1436 if len(parameters) > 1: 

1437 if self.isinsert and compiled._insertmanyvalues: 

1438 self.execute_style = ExecuteStyle.INSERTMANYVALUES 

1439 

1440 imv = compiled._insertmanyvalues 

1441 if imv.sentinel_columns is not None: 

1442 self._num_sentinel_cols = imv.num_sentinel_columns 

1443 else: 

1444 self.execute_style = ExecuteStyle.EXECUTEMANY 

1445 

1446 self.unicode_statement = compiled.string 

1447 

1448 self.cursor = self.create_cursor() 

1449 

1450 if self.compiled.insert_prefetch or self.compiled.update_prefetch: 

1451 self._process_execute_defaults() 

1452 

1453 processors = compiled._bind_processors 

1454 

1455 flattened_processors: Mapping[ 

1456 str, _BindProcessorType[Any] 

1457 ] = processors # type: ignore[assignment] 

1458 

1459 if compiled.literal_execute_params or compiled.post_compile_params: 

1460 if self.executemany: 

1461 raise exc.InvalidRequestError( 

1462 "'literal_execute' or 'expanding' parameters can't be " 

1463 "used with executemany()" 

1464 ) 

1465 

1466 expanded_state = compiled._process_parameters_for_postcompile( 

1467 self.compiled_parameters[0] 

1468 ) 

1469 

1470 # re-assign self.unicode_statement 

1471 self.unicode_statement = expanded_state.statement 

1472 

1473 self._expanded_parameters = expanded_state.parameter_expansion 

1474 

1475 flattened_processors = dict(processors) # type: ignore 

1476 flattened_processors.update(expanded_state.processors) 

1477 positiontup = expanded_state.positiontup 

1478 elif compiled.positional: 

1479 positiontup = self.compiled.positiontup 

1480 else: 

1481 positiontup = None 

1482 

1483 if compiled.schema_translate_map: 

1484 schema_translate_map = self.execution_options.get( 

1485 "schema_translate_map", {} 

1486 ) 

1487 rst = compiled.preparer._render_schema_translates 

1488 self.unicode_statement = rst( 

1489 self.unicode_statement, schema_translate_map 

1490 ) 

1491 

1492 # final self.unicode_statement is now assigned, encode if needed 

1493 # by dialect 

1494 self.statement = self.unicode_statement 

1495 

1496 # Convert the dictionary of bind parameter values 

1497 # into a dict or list to be sent to the DBAPI's 

1498 # execute() or executemany() method. 

1499 

1500 if compiled.positional: 

1501 core_positional_parameters: MutableSequence[Sequence[Any]] = [] 

1502 assert positiontup is not None 

1503 for compiled_params in self.compiled_parameters: 

1504 l_param: List[Any] = [ 

1505 ( 

1506 flattened_processors[key](compiled_params[key]) 

1507 if key in flattened_processors 

1508 else compiled_params[key] 

1509 ) 

1510 for key in positiontup 

1511 ] 

1512 core_positional_parameters.append( 

1513 dialect.execute_sequence_format(l_param) 

1514 ) 

1515 

1516 self.parameters = core_positional_parameters 

1517 else: 

1518 core_dict_parameters: MutableSequence[Dict[str, Any]] = [] 

1519 escaped_names = compiled.escaped_bind_names 

1520 

1521 # note that currently, "expanded" parameters will be present 

1522 # in self.compiled_parameters in their quoted form. This is 

1523 # slightly inconsistent with the approach taken as of 

1524 # #8056 where self.compiled_parameters is meant to contain unquoted 

1525 # param names. 

1526 d_param: Dict[str, Any] 

1527 for compiled_params in self.compiled_parameters: 

1528 if escaped_names: 

1529 d_param = { 

1530 escaped_names.get(key, key): ( 

1531 flattened_processors[key](compiled_params[key]) 

1532 if key in flattened_processors 

1533 else compiled_params[key] 

1534 ) 

1535 for key in compiled_params 

1536 } 

1537 else: 

1538 d_param = { 

1539 key: ( 

1540 flattened_processors[key](compiled_params[key]) 

1541 if key in flattened_processors 

1542 else compiled_params[key] 

1543 ) 

1544 for key in compiled_params 

1545 } 

1546 

1547 core_dict_parameters.append(d_param) 

1548 

1549 self.parameters = core_dict_parameters 

1550 

1551 return self 

1552 

1553 @classmethod 

1554 def _init_statement( 

1555 cls, 

1556 dialect: Dialect, 

1557 connection: Connection, 

1558 dbapi_connection: PoolProxiedConnection, 

1559 execution_options: _ExecuteOptions, 

1560 statement: str, 

1561 parameters: _DBAPIMultiExecuteParams, 

1562 ) -> ExecutionContext: 

1563 """Initialize execution context for a string SQL statement.""" 

1564 

1565 self = cls.__new__(cls) 

1566 self.root_connection = connection 

1567 self._dbapi_connection = dbapi_connection 

1568 self.dialect = connection.dialect 

1569 self.is_text = True 

1570 

1571 self.execution_options = execution_options 

1572 

1573 if not parameters: 

1574 if self.dialect.positional: 

1575 self.parameters = [dialect.execute_sequence_format()] 

1576 else: 

1577 self.parameters = [self._empty_dict_params] 

1578 elif isinstance(parameters[0], dialect.execute_sequence_format): 

1579 self.parameters = parameters 

1580 elif isinstance(parameters[0], dict): 

1581 self.parameters = parameters 

1582 else: 

1583 self.parameters = [ 

1584 dialect.execute_sequence_format(p) for p in parameters 

1585 ] 

1586 

1587 if len(parameters) > 1: 

1588 self.execute_style = ExecuteStyle.EXECUTEMANY 

1589 

1590 self.statement = self.unicode_statement = statement 

1591 

1592 self.cursor = self.create_cursor() 

1593 return self 

1594 

1595 @classmethod 

1596 def _init_default( 

1597 cls, 

1598 dialect: Dialect, 

1599 connection: Connection, 

1600 dbapi_connection: PoolProxiedConnection, 

1601 execution_options: _ExecuteOptions, 

1602 ) -> ExecutionContext: 

1603 """Initialize execution context for a ColumnDefault construct.""" 

1604 

1605 self = cls.__new__(cls) 

1606 self.root_connection = connection 

1607 self._dbapi_connection = dbapi_connection 

1608 self.dialect = connection.dialect 

1609 

1610 self.execution_options = execution_options 

1611 

1612 self.cursor = self.create_cursor() 

1613 return self 

1614 

1615 def _get_cache_stats(self) -> str: 

1616 if self.compiled is None: 

1617 return "raw sql" 

1618 

1619 now = perf_counter() 

1620 

1621 ch = self.cache_hit 

1622 

1623 gen_time = self.compiled._gen_time 

1624 assert gen_time is not None 

1625 

1626 if ch is NO_CACHE_KEY: 

1627 return "no key %.5fs" % (now - gen_time,) 

1628 elif ch is CACHE_HIT: 

1629 return "cached since %.4gs ago" % (now - gen_time,) 

1630 elif ch is CACHE_MISS: 

1631 return "generated in %.5fs" % (now - gen_time,) 

1632 elif ch is CACHING_DISABLED: 

1633 if "_cache_disable_reason" in self.execution_options: 

1634 return "caching disabled (%s) %.5fs " % ( 

1635 self.execution_options["_cache_disable_reason"], 

1636 now - gen_time, 

1637 ) 

1638 else: 

1639 return "caching disabled %.5fs" % (now - gen_time,) 

1640 elif ch is NO_DIALECT_SUPPORT: 

1641 return "dialect %s+%s does not support caching %.5fs" % ( 

1642 self.dialect.name, 

1643 self.dialect.driver, 

1644 now - gen_time, 

1645 ) 

1646 else: 

1647 return "unknown" 

1648 

1649 @property 

1650 def executemany(self): # type: ignore[override] 

1651 return self.execute_style in ( 

1652 ExecuteStyle.EXECUTEMANY, 

1653 ExecuteStyle.INSERTMANYVALUES, 

1654 ) 

1655 

1656 @util.memoized_property 

1657 def identifier_preparer(self): 

1658 if self.compiled: 

1659 return self.compiled.preparer 

1660 elif "schema_translate_map" in self.execution_options: 

1661 return self.dialect.identifier_preparer._with_schema_translate( 

1662 self.execution_options["schema_translate_map"] 

1663 ) 

1664 else: 

1665 return self.dialect.identifier_preparer 

1666 

1667 @util.memoized_property 

1668 def engine(self): 

1669 return self.root_connection.engine 

1670 

1671 @util.memoized_property 

1672 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1673 if TYPE_CHECKING: 

1674 assert isinstance(self.compiled, SQLCompiler) 

1675 return self.compiled.postfetch 

1676 

1677 @util.memoized_property 

1678 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1679 if TYPE_CHECKING: 

1680 assert isinstance(self.compiled, SQLCompiler) 

1681 if self.isinsert: 

1682 return self.compiled.insert_prefetch 

1683 elif self.isupdate: 

1684 return self.compiled.update_prefetch 

1685 else: 

1686 return () 

1687 

1688 @util.memoized_property 

1689 def no_parameters(self): 

1690 return self.execution_options.get("no_parameters", False) 

1691 

1692 def _execute_scalar( 

1693 self, 

1694 stmt: str, 

1695 type_: Optional[TypeEngine[Any]], 

1696 parameters: Optional[_DBAPISingleExecuteParams] = None, 

1697 ) -> Any: 

1698 """Execute a string statement on the current cursor, returning a 

1699 scalar result. 

1700 

1701 Used to fire off sequences, default phrases, and "select lastrowid" 

1702 types of statements individually or in the context of a parent INSERT 

1703 or UPDATE statement. 

1704 

1705 """ 

1706 

1707 conn = self.root_connection 

1708 

1709 if "schema_translate_map" in self.execution_options: 

1710 schema_translate_map = self.execution_options.get( 

1711 "schema_translate_map", {} 

1712 ) 

1713 

1714 rst = self.identifier_preparer._render_schema_translates 

1715 stmt = rst(stmt, schema_translate_map) 

1716 

1717 if not parameters: 

1718 if self.dialect.positional: 

1719 parameters = self.dialect.execute_sequence_format() 

1720 else: 

1721 parameters = {} 

1722 

1723 conn._cursor_execute(self.cursor, stmt, parameters, context=self) 

1724 row = self.cursor.fetchone() 

1725 if row is not None: 

1726 r = row[0] 

1727 else: 

1728 r = None 

1729 if type_ is not None: 

1730 # apply type post processors to the result 

1731 proc = type_._cached_result_processor( 

1732 self.dialect, self.cursor.description[0][1] 

1733 ) 

1734 if proc: 

1735 return proc(r) 

1736 return r 

1737 

1738 @util.memoized_property 

1739 def connection(self): 

1740 return self.root_connection 

1741 

1742 def _use_server_side_cursor(self): 

1743 if not self.dialect.supports_server_side_cursors: 

1744 return False 

1745 

1746 if self.dialect.server_side_cursors: 

1747 # this is deprecated 

1748 use_server_side = self.execution_options.get( 

1749 "stream_results", True 

1750 ) and ( 

1751 self.compiled 

1752 and isinstance(self.compiled.statement, expression.Selectable) 

1753 or ( 

1754 ( 

1755 not self.compiled 

1756 or isinstance( 

1757 self.compiled.statement, expression.TextClause 

1758 ) 

1759 ) 

1760 and self.unicode_statement 

1761 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement) 

1762 ) 

1763 ) 

1764 else: 

1765 use_server_side = self.execution_options.get( 

1766 "stream_results", False 

1767 ) 

1768 

1769 return use_server_side 

1770 

1771 def create_cursor(self) -> DBAPICursor: 

1772 if ( 

1773 # inlining initial preference checks for SS cursors 

1774 self.dialect.supports_server_side_cursors 

1775 and ( 

1776 self.execution_options.get("stream_results", False) 

1777 or ( 

1778 self.dialect.server_side_cursors 

1779 and self._use_server_side_cursor() 

1780 ) 

1781 ) 

1782 ): 

1783 self._is_server_side = True 

1784 return self.create_server_side_cursor() 

1785 else: 

1786 self._is_server_side = False 

1787 return self.create_default_cursor() 

1788 

1789 def fetchall_for_returning(self, cursor): 

1790 return cursor.fetchall() 

1791 

1792 def create_default_cursor(self) -> DBAPICursor: 

1793 return self._dbapi_connection.cursor() 

1794 

1795 def create_server_side_cursor(self) -> DBAPICursor: 

1796 raise NotImplementedError() 

1797 

1798 def pre_exec(self): 

1799 pass 

1800 

1801 def get_out_parameter_values(self, names): 

1802 raise NotImplementedError( 

1803 "This dialect does not support OUT parameters" 

1804 ) 

1805 

1806 def post_exec(self): 

1807 pass 

1808 

1809 def get_result_processor( 

1810 self, type_: TypeEngine[Any], colname: str, coltype: DBAPIType 

1811 ) -> Optional[_ResultProcessorType[Any]]: 

1812 """Return a 'result processor' for a given type as present in 

1813 cursor.description. 

1814 

1815 This has a default implementation that dialects can override 

1816 for context-sensitive result type handling. 

1817 

1818 """ 

1819 return type_._cached_result_processor(self.dialect, coltype) 

1820 

1821 def get_lastrowid(self) -> int: 

1822 """return self.cursor.lastrowid, or equivalent, after an INSERT. 

1823 

1824 This may involve calling special cursor functions, issuing a new SELECT 

1825 on the cursor (or a new one), or returning a stored value that was 

1826 calculated within post_exec(). 

1827 

1828 This function will only be called for dialects which support "implicit" 

1829 primary key generation, keep preexecute_autoincrement_sequences set to 

1830 False, and when no explicit id value was bound to the statement. 

1831 

1832 The function is called once for an INSERT statement that would need to 

1833 return the last inserted primary key for those dialects that make use 

1834 of the lastrowid concept. In these cases, it is called directly after 

1835 :meth:`.ExecutionContext.post_exec`. 

1836 

1837 """ 

1838 return self.cursor.lastrowid 

1839 

1840 def handle_dbapi_exception(self, e): 

1841 pass 

1842 

1843 @util.non_memoized_property 

1844 def rowcount(self) -> int: 

1845 if self._rowcount is not None: 

1846 return self._rowcount 

1847 else: 

1848 return self.cursor.rowcount 

1849 

1850 @property 

1851 def _has_rowcount(self): 

1852 return self._rowcount is not None 

1853 

1854 def supports_sane_rowcount(self): 

1855 return self.dialect.supports_sane_rowcount 

1856 

1857 def supports_sane_multi_rowcount(self): 

1858 return self.dialect.supports_sane_multi_rowcount 

1859 

1860 def _setup_result_proxy(self): 

1861 exec_opt = self.execution_options 

1862 

1863 if self._rowcount is None and exec_opt.get("preserve_rowcount", False): 

1864 self._rowcount = self.cursor.rowcount 

1865 

1866 yp: Optional[Union[int, bool]] 

1867 if self.is_crud or self.is_text: 

1868 result = self._setup_dml_or_text_result() 

1869 yp = False 

1870 else: 

1871 yp = exec_opt.get("yield_per", None) 

1872 sr = self._is_server_side or exec_opt.get("stream_results", False) 

1873 strategy = self.cursor_fetch_strategy 

1874 if sr and strategy is _cursor._DEFAULT_FETCH: 

1875 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1876 self.cursor, self.execution_options 

1877 ) 

1878 cursor_description: _DBAPICursorDescription = ( 

1879 strategy.alternate_cursor_description 

1880 or self.cursor.description 

1881 ) 

1882 if cursor_description is None: 

1883 strategy = _cursor._NO_CURSOR_DQL 

1884 

1885 result = _cursor.CursorResult(self, strategy, cursor_description) 

1886 

1887 compiled = self.compiled 

1888 

1889 if ( 

1890 compiled 

1891 and not self.isddl 

1892 and cast(SQLCompiler, compiled).has_out_parameters 

1893 ): 

1894 self._setup_out_parameters(result) 

1895 

1896 self._soft_closed = result._soft_closed 

1897 

1898 if yp: 

1899 result = result.yield_per(yp) 

1900 

1901 return result 

1902 

1903 def _setup_out_parameters(self, result): 

1904 compiled = cast(SQLCompiler, self.compiled) 

1905 

1906 out_bindparams = [ 

1907 (param, name) 

1908 for param, name in compiled.bind_names.items() 

1909 if param.isoutparam 

1910 ] 

1911 out_parameters = {} 

1912 

1913 for bindparam, raw_value in zip( 

1914 [param for param, name in out_bindparams], 

1915 self.get_out_parameter_values( 

1916 [name for param, name in out_bindparams] 

1917 ), 

1918 ): 

1919 type_ = bindparam.type 

1920 impl_type = type_.dialect_impl(self.dialect) 

1921 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi) 

1922 result_processor = impl_type.result_processor( 

1923 self.dialect, dbapi_type 

1924 ) 

1925 if result_processor is not None: 

1926 raw_value = result_processor(raw_value) 

1927 out_parameters[bindparam.key] = raw_value 

1928 

1929 result.out_parameters = out_parameters 

1930 

1931 def _setup_dml_or_text_result(self): 

1932 compiled = cast(SQLCompiler, self.compiled) 

1933 

1934 strategy: ResultFetchStrategy = self.cursor_fetch_strategy 

1935 

1936 if self.isinsert: 

1937 if ( 

1938 self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1939 and compiled.effective_returning 

1940 ): 

1941 strategy = _cursor.FullyBufferedCursorFetchStrategy( 

1942 self.cursor, 

1943 initial_buffer=self._insertmanyvalues_rows, 

1944 # maintain alt cursor description if set by the 

1945 # dialect, e.g. mssql preserves it 

1946 alternate_description=( 

1947 strategy.alternate_cursor_description 

1948 ), 

1949 ) 

1950 

1951 if compiled.postfetch_lastrowid: 

1952 self.inserted_primary_key_rows = ( 

1953 self._setup_ins_pk_from_lastrowid() 

1954 ) 

1955 # else if not self._is_implicit_returning, 

1956 # the default inserted_primary_key_rows accessor will 

1957 # return an "empty" primary key collection when accessed. 

1958 

1959 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: 

1960 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1961 self.cursor, self.execution_options 

1962 ) 

1963 

1964 if strategy is _cursor._NO_CURSOR_DML: 

1965 cursor_description = None 

1966 else: 

1967 cursor_description = ( 

1968 strategy.alternate_cursor_description 

1969 or self.cursor.description 

1970 ) 

1971 

1972 if cursor_description is None: 

1973 strategy = _cursor._NO_CURSOR_DML 

1974 elif self._num_sentinel_cols: 

1975 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1976 # the sentinel columns are handled in CursorResult._init_metadata 

1977 # using essentially _reduce 

1978 

1979 result: _cursor.CursorResult[Any] = _cursor.CursorResult( 

1980 self, strategy, cursor_description 

1981 ) 

1982 

1983 if self.isinsert: 

1984 if self._is_implicit_returning: 

1985 rows = result.all() 

1986 

1987 self.returned_default_rows = rows 

1988 

1989 self.inserted_primary_key_rows = ( 

1990 self._setup_ins_pk_from_implicit_returning(result, rows) 

1991 ) 

1992 

1993 # test that it has a cursor metadata that is accurate. the 

1994 # first row will have been fetched and current assumptions 

1995 # are that the result has only one row, until executemany() 

1996 # support is added here. 

1997 assert result._metadata.returns_rows 

1998 

1999 # Insert statement has both return_defaults() and 

2000 # returning(). rewind the result on the list of rows 

2001 # we just used. 

2002 if self._is_supplemental_returning: 

2003 result._rewind(rows) 

2004 else: 

2005 result._soft_close() 

2006 elif not self._is_explicit_returning: 

2007 result._soft_close() 

2008 

2009 # we assume here the result does not return any rows. 

2010 # *usually*, this will be true. However, some dialects 

2011 # such as that of MSSQL/pyodbc need to SELECT a post fetch 

2012 # function so this is not necessarily true. 

2013 # assert not result.returns_rows 

2014 

2015 elif self._is_implicit_returning: 

2016 rows = result.all() 

2017 

2018 if rows: 

2019 self.returned_default_rows = rows 

2020 self._rowcount = len(rows) 

2021 

2022 if self._is_supplemental_returning: 

2023 result._rewind(rows) 

2024 else: 

2025 result._soft_close() 

2026 

2027 # test that it has a cursor metadata that is accurate. 

2028 # the rows have all been fetched however. 

2029 assert result._metadata.returns_rows 

2030 

2031 elif not result._metadata.returns_rows: 

2032 # no results, get rowcount 

2033 # (which requires open cursor on some drivers) 

2034 if self._rowcount is None: 

2035 self._rowcount = self.cursor.rowcount 

2036 result._soft_close() 

2037 elif self.isupdate or self.isdelete: 

2038 if self._rowcount is None: 

2039 self._rowcount = self.cursor.rowcount 

2040 return result 

2041 

2042 @util.memoized_property 

2043 def inserted_primary_key_rows(self): 

2044 # if no specific "get primary key" strategy was set up 

2045 # during execution, return a "default" primary key based 

2046 # on what's in the compiled_parameters and nothing else. 

2047 return self._setup_ins_pk_from_empty() 

2048 

2049 def _setup_ins_pk_from_lastrowid(self): 

2050 getter = cast( 

2051 SQLCompiler, self.compiled 

2052 )._inserted_primary_key_from_lastrowid_getter 

2053 lastrowid = self.get_lastrowid() 

2054 return [getter(lastrowid, self.compiled_parameters[0])] 

2055 

2056 def _setup_ins_pk_from_empty(self): 

2057 getter = cast( 

2058 SQLCompiler, self.compiled 

2059 )._inserted_primary_key_from_lastrowid_getter 

2060 return [getter(None, param) for param in self.compiled_parameters] 

2061 

2062 def _setup_ins_pk_from_implicit_returning(self, result, rows): 

2063 if not rows: 

2064 return [] 

2065 

2066 getter = cast( 

2067 SQLCompiler, self.compiled 

2068 )._inserted_primary_key_from_returning_getter 

2069 compiled_params = self.compiled_parameters 

2070 

2071 return [ 

2072 getter(row, param) for row, param in zip(rows, compiled_params) 

2073 ] 

2074 

2075 def lastrow_has_defaults(self) -> bool: 

2076 return (self.isinsert or self.isupdate) and bool( 

2077 cast(SQLCompiler, self.compiled).postfetch 

2078 ) 

2079 

2080 def _prepare_set_input_sizes( 

2081 self, 

2082 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]: 

2083 """Given a cursor and ClauseParameters, prepare arguments 

2084 in order to call the appropriate 

2085 style of ``setinputsizes()`` on the cursor, using DB-API types 

2086 from the bind parameter's ``TypeEngine`` objects. 

2087 

2088 This method only called by those dialects which set the 

2089 :attr:`.Dialect.bind_typing` attribute to 

2090 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are 

2091 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an 

2092 option. 

2093 

2094 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used 

2095 for pg8000 and asyncpg, which has been changed to inline rendering 

2096 of casts. 

2097 

2098 """ 

2099 if self.isddl or self.is_text: 

2100 return None 

2101 

2102 compiled = cast(SQLCompiler, self.compiled) 

2103 

2104 inputsizes = compiled._get_set_input_sizes_lookup() 

2105 

2106 if inputsizes is None: 

2107 return None 

2108 

2109 dialect = self.dialect 

2110 

2111 # all of the rest of this... cython? 

2112 

2113 if dialect._has_events: 

2114 inputsizes = dict(inputsizes) 

2115 dialect.dispatch.do_setinputsizes( 

2116 inputsizes, self.cursor, self.statement, self.parameters, self 

2117 ) 

2118 

2119 if compiled.escaped_bind_names: 

2120 escaped_bind_names = compiled.escaped_bind_names 

2121 else: 

2122 escaped_bind_names = None 

2123 

2124 if dialect.positional: 

2125 items = [ 

2126 (key, compiled.binds[key]) 

2127 for key in compiled.positiontup or () 

2128 ] 

2129 else: 

2130 items = [ 

2131 (key, bindparam) 

2132 for bindparam, key in compiled.bind_names.items() 

2133 ] 

2134 

2135 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = [] 

2136 for key, bindparam in items: 

2137 if bindparam in compiled.literal_execute_params: 

2138 continue 

2139 

2140 if key in self._expanded_parameters: 

2141 if is_tuple_type(bindparam.type): 

2142 num = len(bindparam.type.types) 

2143 dbtypes = inputsizes[bindparam] 

2144 generic_inputsizes.extend( 

2145 ( 

2146 ( 

2147 escaped_bind_names.get(paramname, paramname) 

2148 if escaped_bind_names is not None 

2149 else paramname 

2150 ), 

2151 dbtypes[idx % num], 

2152 bindparam.type.types[idx % num], 

2153 ) 

2154 for idx, paramname in enumerate( 

2155 self._expanded_parameters[key] 

2156 ) 

2157 ) 

2158 else: 

2159 dbtype = inputsizes.get(bindparam, None) 

2160 generic_inputsizes.extend( 

2161 ( 

2162 ( 

2163 escaped_bind_names.get(paramname, paramname) 

2164 if escaped_bind_names is not None 

2165 else paramname 

2166 ), 

2167 dbtype, 

2168 bindparam.type, 

2169 ) 

2170 for paramname in self._expanded_parameters[key] 

2171 ) 

2172 else: 

2173 dbtype = inputsizes.get(bindparam, None) 

2174 

2175 escaped_name = ( 

2176 escaped_bind_names.get(key, key) 

2177 if escaped_bind_names is not None 

2178 else key 

2179 ) 

2180 

2181 generic_inputsizes.append( 

2182 (escaped_name, dbtype, bindparam.type) 

2183 ) 

2184 

2185 return generic_inputsizes 

2186 

2187 def _exec_default(self, column, default, type_): 

2188 if default.is_sequence: 

2189 return self.fire_sequence(default, type_) 

2190 elif default.is_callable: 

2191 # this codepath is not normally used as it's inlined 

2192 # into _process_execute_defaults 

2193 self.current_column = column 

2194 return default.arg(self) 

2195 elif default.is_clause_element: 

2196 return self._exec_default_clause_element(column, default, type_) 

2197 else: 

2198 # this codepath is not normally used as it's inlined 

2199 # into _process_execute_defaults 

2200 return default.arg 

2201 

2202 def _exec_default_clause_element(self, column, default, type_): 

2203 # execute a default that's a complete clause element. Here, we have 

2204 # to re-implement a miniature version of the compile->parameters-> 

2205 # cursor.execute() sequence, since we don't want to modify the state 

2206 # of the connection / result in progress or create new connection/ 

2207 # result objects etc. 

2208 # .. versionchanged:: 1.4 

2209 

2210 if not default._arg_is_typed: 

2211 default_arg = expression.type_coerce(default.arg, type_) 

2212 else: 

2213 default_arg = default.arg 

2214 compiled = expression.select(default_arg).compile(dialect=self.dialect) 

2215 compiled_params = compiled.construct_params() 

2216 processors = compiled._bind_processors 

2217 if compiled.positional: 

2218 parameters = self.dialect.execute_sequence_format( 

2219 [ 

2220 ( 

2221 processors[key](compiled_params[key]) # type: ignore 

2222 if key in processors 

2223 else compiled_params[key] 

2224 ) 

2225 for key in compiled.positiontup or () 

2226 ] 

2227 ) 

2228 else: 

2229 parameters = { 

2230 key: ( 

2231 processors[key](compiled_params[key]) # type: ignore 

2232 if key in processors 

2233 else compiled_params[key] 

2234 ) 

2235 for key in compiled_params 

2236 } 

2237 return self._execute_scalar( 

2238 str(compiled), type_, parameters=parameters 

2239 ) 

2240 

2241 current_parameters: Optional[_CoreSingleExecuteParams] = None 

2242 """A dictionary of parameters applied to the current row. 

2243 

2244 This attribute is only available in the context of a user-defined default 

2245 generation function, e.g. as described at :ref:`context_default_functions`. 

2246 It consists of a dictionary which includes entries for each column/value 

2247 pair that is to be part of the INSERT or UPDATE statement. The keys of the 

2248 dictionary will be the key value of each :class:`_schema.Column`, 

2249 which is usually 

2250 synonymous with the name. 

2251 

2252 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute 

2253 does not accommodate for the "multi-values" feature of the 

2254 :meth:`_expression.Insert.values` method. The 

2255 :meth:`.DefaultExecutionContext.get_current_parameters` method should be 

2256 preferred. 

2257 

2258 .. seealso:: 

2259 

2260 :meth:`.DefaultExecutionContext.get_current_parameters` 

2261 

2262 :ref:`context_default_functions` 

2263 

2264 """ 

2265 

2266 def get_current_parameters(self, isolate_multiinsert_groups=True): 

2267 """Return a dictionary of parameters applied to the current row. 

2268 

2269 This method can only be used in the context of a user-defined default 

2270 generation function, e.g. as described at 

2271 :ref:`context_default_functions`. When invoked, a dictionary is 

2272 returned which includes entries for each column/value pair that is part 

2273 of the INSERT or UPDATE statement. The keys of the dictionary will be 

2274 the key value of each :class:`_schema.Column`, 

2275 which is usually synonymous 

2276 with the name. 

2277 

2278 :param isolate_multiinsert_groups=True: indicates that multi-valued 

2279 INSERT constructs created using :meth:`_expression.Insert.values` 

2280 should be 

2281 handled by returning only the subset of parameters that are local 

2282 to the current column default invocation. When ``False``, the 

2283 raw parameters of the statement are returned including the 

2284 naming convention used in the case of multi-valued INSERT. 

2285 

2286 .. seealso:: 

2287 

2288 :attr:`.DefaultExecutionContext.current_parameters` 

2289 

2290 :ref:`context_default_functions` 

2291 

2292 """ 

2293 try: 

2294 parameters = self.current_parameters 

2295 column = self.current_column 

2296 except AttributeError: 

2297 raise exc.InvalidRequestError( 

2298 "get_current_parameters() can only be invoked in the " 

2299 "context of a Python side column default function" 

2300 ) 

2301 else: 

2302 assert column is not None 

2303 assert parameters is not None 

2304 compile_state = cast( 

2305 "DMLState", cast(SQLCompiler, self.compiled).compile_state 

2306 ) 

2307 assert compile_state is not None 

2308 if ( 

2309 isolate_multiinsert_groups 

2310 and dml.isinsert(compile_state) 

2311 and compile_state._has_multi_parameters 

2312 ): 

2313 if column._is_multiparam_column: 

2314 index = column.index + 1 

2315 d = {column.original.key: parameters[column.key]} 

2316 else: 

2317 d = {column.key: parameters[column.key]} 

2318 index = 0 

2319 assert compile_state._dict_parameters is not None 

2320 keys = compile_state._dict_parameters.keys() 

2321 d.update( 

2322 (key, parameters["%s_m%d" % (key, index)]) for key in keys 

2323 ) 

2324 return d 

2325 else: 

2326 return parameters 

2327 

2328 def get_insert_default(self, column): 

2329 if column.default is None: 

2330 return None 

2331 else: 

2332 return self._exec_default(column, column.default, column.type) 

2333 

2334 def get_update_default(self, column): 

2335 if column.onupdate is None: 

2336 return None 

2337 else: 

2338 return self._exec_default(column, column.onupdate, column.type) 

2339 

2340 def _process_execute_defaults(self): 

2341 compiled = cast(SQLCompiler, self.compiled) 

2342 

2343 key_getter = compiled._within_exec_param_key_getter 

2344 

2345 sentinel_counter = 0 

2346 

2347 if compiled.insert_prefetch: 

2348 prefetch_recs = [ 

2349 ( 

2350 c, 

2351 key_getter(c), 

2352 c._default_description_tuple, 

2353 self.get_insert_default, 

2354 ) 

2355 for c in compiled.insert_prefetch 

2356 ] 

2357 elif compiled.update_prefetch: 

2358 prefetch_recs = [ 

2359 ( 

2360 c, 

2361 key_getter(c), 

2362 c._onupdate_description_tuple, 

2363 self.get_update_default, 

2364 ) 

2365 for c in compiled.update_prefetch 

2366 ] 

2367 else: 

2368 prefetch_recs = [] 

2369 

2370 for param in self.compiled_parameters: 

2371 self.current_parameters = param 

2372 

2373 for ( 

2374 c, 

2375 param_key, 

2376 (arg, is_scalar, is_callable, is_sentinel), 

2377 fallback, 

2378 ) in prefetch_recs: 

2379 if is_sentinel: 

2380 param[param_key] = sentinel_counter 

2381 sentinel_counter += 1 

2382 elif is_scalar: 

2383 param[param_key] = arg 

2384 elif is_callable: 

2385 self.current_column = c 

2386 param[param_key] = arg(self) 

2387 else: 

2388 val = fallback(c) 

2389 if val is not None: 

2390 param[param_key] = val 

2391 

2392 del self.current_parameters 

2393 

2394 

2395DefaultDialect.execution_ctx_cls = DefaultExecutionContext