Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1053 statements  

1# engine/default.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Default implementations of per-dialect sqlalchemy.engine classes. 

10 

11These are semi-private implementation classes which are only of importance 

12to database dialect authors; dialects will usually use the classes here 

13as the base class for their own corresponding classes. 

14 

15""" 

16 

17from __future__ import annotations 

18 

19import functools 

20import operator 

21import random 

22import re 

23from time import perf_counter 

24import typing 

25from typing import Any 

26from typing import Callable 

27from typing import cast 

28from typing import Dict 

29from typing import Final 

30from typing import List 

31from typing import Literal 

32from typing import Mapping 

33from typing import MutableMapping 

34from typing import MutableSequence 

35from typing import Optional 

36from typing import Sequence 

37from typing import Set 

38from typing import Tuple 

39from typing import Type 

40from typing import TYPE_CHECKING 

41from typing import Union 

42import weakref 

43 

44from . import characteristics 

45from . import cursor as _cursor 

46from . import interfaces 

47from .base import Connection 

48from .interfaces import CacheStats 

49from .interfaces import DBAPICursor 

50from .interfaces import Dialect 

51from .interfaces import ExecuteStyle 

52from .interfaces import ExecutionContext 

53from .reflection import ObjectKind 

54from .reflection import ObjectScope 

55from .. import event 

56from .. import exc 

57from .. import pool 

58from .. import util 

59from ..sql import compiler 

60from ..sql import dml 

61from ..sql import expression 

62from ..sql import type_api 

63from ..sql import util as sql_util 

64from ..sql._typing import is_tuple_type 

65from ..sql.base import _NoArg 

66from ..sql.compiler import AggregateOrderByStyle 

67from ..sql.compiler import DDLCompiler 

68from ..sql.compiler import InsertmanyvaluesSentinelOpts 

69from ..sql.compiler import SQLCompiler 

70from ..sql.elements import quoted_name 

71from ..util.typing import TupleAny 

72from ..util.typing import Unpack 

73 

74if typing.TYPE_CHECKING: 

75 from .base import Engine 

76 from .cursor import ResultFetchStrategy 

77 from .interfaces import _CoreMultiExecuteParams 

78 from .interfaces import _CoreSingleExecuteParams 

79 from .interfaces import _DBAPICursorDescription 

80 from .interfaces import _DBAPIMultiExecuteParams 

81 from .interfaces import _DBAPISingleExecuteParams 

82 from .interfaces import _ExecuteOptions 

83 from .interfaces import _MutableCoreSingleExecuteParams 

84 from .interfaces import _ParamStyle 

85 from .interfaces import ConnectArgsType 

86 from .interfaces import DBAPIConnection 

87 from .interfaces import DBAPIModule 

88 from .interfaces import DBAPIType 

89 from .interfaces import IsolationLevel 

90 from .row import Row 

91 from .url import URL 

92 from ..event import _ListenerFnType 

93 from ..pool import Pool 

94 from ..pool import PoolProxiedConnection 

95 from ..sql import Executable 

96 from ..sql.compiler import Compiled 

97 from ..sql.compiler import Linting 

98 from ..sql.compiler import ResultColumnsEntry 

99 from ..sql.dml import DMLState 

100 from ..sql.dml import UpdateBase 

101 from ..sql.elements import BindParameter 

102 from ..sql.schema import Column 

103 from ..sql.type_api import _BindProcessorType 

104 from ..sql.type_api import _ResultProcessorType 

105 from ..sql.type_api import TypeEngine 

106 

107 

108# When we're handed literal SQL, ensure it's a SELECT query 

109SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE) 

110 

111 

112( 

113 CACHE_HIT, 

114 CACHE_MISS, 

115 CACHING_DISABLED, 

116 NO_CACHE_KEY, 

117 NO_DIALECT_SUPPORT, 

118) = list(CacheStats) 

119 

120 

121class DefaultDialect(Dialect): 

122 """Default implementation of Dialect""" 

123 

124 statement_compiler = compiler.SQLCompiler 

125 ddl_compiler = compiler.DDLCompiler 

126 type_compiler_cls = compiler.GenericTypeCompiler 

127 

128 preparer = compiler.IdentifierPreparer 

129 supports_alter = True 

130 supports_comments = False 

131 supports_constraint_comments = False 

132 inline_comments = False 

133 supports_statement_cache = True 

134 

135 div_is_floordiv = True 

136 

137 bind_typing = interfaces.BindTyping.NONE 

138 

139 include_set_input_sizes: Optional[Set[Any]] = None 

140 exclude_set_input_sizes: Optional[Set[Any]] = None 

141 

142 # the first value we'd get for an autoincrement column. 

143 default_sequence_base = 1 

144 

145 # most DBAPIs happy with this for execute(). 

146 # not cx_oracle. 

147 execute_sequence_format = tuple 

148 

149 supports_schemas = True 

150 supports_views = True 

151 supports_sequences = False 

152 sequences_optional = False 

153 preexecute_autoincrement_sequences = False 

154 supports_identity_columns = False 

155 postfetch_lastrowid = True 

156 favor_returning_over_lastrowid = False 

157 insert_null_pk_still_autoincrements = False 

158 update_returning = False 

159 delete_returning = False 

160 update_returning_multifrom = False 

161 delete_returning_multifrom = False 

162 insert_returning = False 

163 

164 aggregate_order_by_style = AggregateOrderByStyle.INLINE 

165 

166 cte_follows_insert = False 

167 

168 supports_native_enum = False 

169 supports_native_boolean = False 

170 supports_native_uuid = False 

171 returns_native_bytes = False 

172 

173 non_native_boolean_check_constraint = True 

174 

175 supports_simple_order_by_label = True 

176 

177 tuple_in_values = False 

178 

179 connection_characteristics = util.immutabledict( 

180 { 

181 "isolation_level": characteristics.IsolationLevelCharacteristic(), 

182 "logging_token": characteristics.LoggingTokenCharacteristic(), 

183 } 

184 ) 

185 

186 engine_config_types: Mapping[str, Any] = util.immutabledict( 

187 { 

188 "pool_timeout": util.asint, 

189 "echo": util.bool_or_str("debug"), 

190 "echo_pool": util.bool_or_str("debug"), 

191 "pool_recycle": util.asint, 

192 "pool_size": util.asint, 

193 "max_overflow": util.asint, 

194 "future": util.asbool, 

195 } 

196 ) 

197 

198 # if the NUMERIC type 

199 # returns decimal.Decimal. 

200 # *not* the FLOAT type however. 

201 supports_native_decimal = False 

202 

203 name = "default" 

204 

205 # length at which to truncate 

206 # any identifier. 

207 max_identifier_length = 9999 

208 _user_defined_max_identifier_length: Optional[int] = None 

209 

210 isolation_level: Optional[str] = None 

211 

212 # sub-categories of max_identifier_length. 

213 # currently these accommodate for MySQL which allows alias names 

214 # of 255 but DDL names only of 64. 

215 max_index_name_length: Optional[int] = None 

216 max_constraint_name_length: Optional[int] = None 

217 

218 supports_sane_rowcount = True 

219 supports_sane_multi_rowcount = True 

220 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {} 

221 default_paramstyle = "named" 

222 

223 supports_default_values = False 

224 """dialect supports INSERT... DEFAULT VALUES syntax""" 

225 

226 supports_default_metavalue = False 

227 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

228 

229 default_metavalue_token = "DEFAULT" 

230 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

231 parenthesis.""" 

232 

233 # not sure if this is a real thing but the compiler will deliver it 

234 # if this is the only flag enabled. 

235 supports_empty_insert = True 

236 """dialect supports INSERT () VALUES ()""" 

237 

238 supports_multivalues_insert = False 

239 

240 use_insertmanyvalues: bool = False 

241 

242 use_insertmanyvalues_wo_returning: bool = False 

243 

244 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = ( 

245 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED 

246 ) 

247 

248 insertmanyvalues_page_size: int = 1000 

249 insertmanyvalues_max_parameters = 32700 

250 

251 supports_is_distinct_from = True 

252 

253 supports_server_side_cursors = False 

254 

255 server_side_cursors = False 

256 

257 # extra record-level locking features (#4860) 

258 supports_for_update_of = False 

259 

260 server_version_info = None 

261 

262 default_schema_name: Optional[str] = None 

263 

264 # indicates symbol names are 

265 # UPPERCASED if they are case insensitive 

266 # within the database. 

267 # if this is True, the methods normalize_name() 

268 # and denormalize_name() must be provided. 

269 requires_name_normalize = False 

270 

271 is_async = False 

272 

273 has_terminate = False 

274 

275 # TODO: this is not to be part of 2.0. implement rudimentary binary 

276 # literals for SQLite, PostgreSQL, MySQL only within 

277 # _Binary.literal_processor 

278 _legacy_binary_type_literal_encoding = "utf-8" 

279 

280 @util.deprecated_params( 

281 empty_in_strategy=( 

282 "1.4", 

283 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is " 

284 "deprecated, and no longer has any effect. All IN expressions " 

285 "are now rendered using " 

286 'the "expanding parameter" strategy which renders a set of bound' 

287 'expressions, or an "empty set" SELECT, at statement execution' 

288 "time.", 

289 ), 

290 server_side_cursors=( 

291 "1.4", 

292 "The :paramref:`_sa.create_engine.server_side_cursors` parameter " 

293 "is deprecated and will be removed in a future release. Please " 

294 "use the " 

295 ":paramref:`_engine.Connection.execution_options.stream_results` " 

296 "parameter.", 

297 ), 

298 ) 

299 def __init__( 

300 self, 

301 paramstyle: Optional[_ParamStyle] = None, 

302 isolation_level: Optional[IsolationLevel] = None, 

303 dbapi: Optional[DBAPIModule] = None, 

304 implicit_returning: Literal[True] = True, 

305 supports_native_boolean: Optional[bool] = None, 

306 max_identifier_length: Optional[int] = None, 

307 label_length: Optional[int] = None, 

308 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG, 

309 use_insertmanyvalues: Optional[bool] = None, 

310 # util.deprecated_params decorator cannot render the 

311 # Linting.NO_LINTING constant 

312 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore 

313 server_side_cursors: bool = False, 

314 skip_autocommit_rollback: bool = False, 

315 **kwargs: Any, 

316 ): 

317 if server_side_cursors: 

318 if not self.supports_server_side_cursors: 

319 raise exc.ArgumentError( 

320 "Dialect %s does not support server side cursors" % self 

321 ) 

322 else: 

323 self.server_side_cursors = True 

324 

325 if getattr(self, "use_setinputsizes", False): 

326 util.warn_deprecated( 

327 "The dialect-level use_setinputsizes attribute is " 

328 "deprecated. Please use " 

329 "bind_typing = BindTyping.SETINPUTSIZES", 

330 "2.0", 

331 ) 

332 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES 

333 

334 self.positional = False 

335 self._ischema = None 

336 

337 self.dbapi = dbapi 

338 

339 self.skip_autocommit_rollback = skip_autocommit_rollback 

340 

341 if paramstyle is not None: 

342 self.paramstyle = paramstyle 

343 elif self.dbapi is not None: 

344 self.paramstyle = self.dbapi.paramstyle 

345 else: 

346 self.paramstyle = self.default_paramstyle 

347 self.positional = self.paramstyle in ( 

348 "qmark", 

349 "format", 

350 "numeric", 

351 "numeric_dollar", 

352 ) 

353 self.identifier_preparer = self.preparer(self) 

354 self._on_connect_isolation_level = isolation_level 

355 

356 legacy_tt_callable = getattr(self, "type_compiler", None) 

357 if legacy_tt_callable is not None: 

358 tt_callable = cast( 

359 Type[compiler.GenericTypeCompiler], 

360 self.type_compiler, 

361 ) 

362 else: 

363 tt_callable = self.type_compiler_cls 

364 

365 self.type_compiler_instance = self.type_compiler = tt_callable(self) 

366 

367 if supports_native_boolean is not None: 

368 self.supports_native_boolean = supports_native_boolean 

369 

370 self._user_defined_max_identifier_length = max_identifier_length 

371 if self._user_defined_max_identifier_length: 

372 self.max_identifier_length = ( 

373 self._user_defined_max_identifier_length 

374 ) 

375 self.label_length = label_length 

376 self.compiler_linting = compiler_linting 

377 

378 if use_insertmanyvalues is not None: 

379 self.use_insertmanyvalues = use_insertmanyvalues 

380 

381 if insertmanyvalues_page_size is not _NoArg.NO_ARG: 

382 self.insertmanyvalues_page_size = insertmanyvalues_page_size 

383 

384 @property 

385 @util.deprecated( 

386 "2.0", 

387 "full_returning is deprecated, please use insert_returning, " 

388 "update_returning, delete_returning", 

389 ) 

390 def full_returning(self): 

391 return ( 

392 self.insert_returning 

393 and self.update_returning 

394 and self.delete_returning 

395 ) 

396 

397 @util.memoized_property 

398 def insert_executemany_returning(self): 

399 """Default implementation for insert_executemany_returning, if not 

400 otherwise overridden by the specific dialect. 

401 

402 The default dialect determines "insert_executemany_returning" is 

403 available if the dialect in use has opted into using the 

404 "use_insertmanyvalues" feature. If they haven't opted into that, then 

405 this attribute is False, unless the dialect in question overrides this 

406 and provides some other implementation (such as the Oracle Database 

407 dialects). 

408 

409 """ 

410 return self.insert_returning and self.use_insertmanyvalues 

411 

412 @util.memoized_property 

413 def insert_executemany_returning_sort_by_parameter_order(self): 

414 """Default implementation for 

415 insert_executemany_returning_deterministic_order, if not otherwise 

416 overridden by the specific dialect. 

417 

418 The default dialect determines "insert_executemany_returning" can have 

419 deterministic order only if the dialect in use has opted into using the 

420 "use_insertmanyvalues" feature, which implements deterministic ordering 

421 using client side sentinel columns only by default. The 

422 "insertmanyvalues" feature also features alternate forms that can 

423 use server-generated PK values as "sentinels", but those are only 

424 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel` 

425 bitflag enables those alternate SQL forms, which are disabled 

426 by default. 

427 

428 If the dialect in use hasn't opted into that, then this attribute is 

429 False, unless the dialect in question overrides this and provides some 

430 other implementation (such as the Oracle Database dialects). 

431 

432 """ 

433 return self.insert_returning and self.use_insertmanyvalues 

434 

435 update_executemany_returning = False 

436 delete_executemany_returning = False 

437 

438 @util.memoized_property 

439 def loaded_dbapi(self) -> DBAPIModule: 

440 if self.dbapi is None: 

441 raise exc.InvalidRequestError( 

442 f"Dialect {self} does not have a Python DBAPI established " 

443 "and cannot be used for actual database interaction" 

444 ) 

445 return self.dbapi 

446 

447 @util.memoized_property 

448 def _bind_typing_render_casts(self): 

449 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS 

450 

451 def _ensure_has_table_connection(self, arg: Connection) -> None: 

452 if not isinstance(arg, Connection): 

453 raise exc.ArgumentError( 

454 "The argument passed to Dialect.has_table() should be a " 

455 "%s, got %s. " 

456 "Additionally, the Dialect.has_table() method is for " 

457 "internal dialect " 

458 "use only; please use " 

459 "``inspect(some_engine).has_table(<tablename>>)`` " 

460 "for public API use." % (Connection, type(arg)) 

461 ) 

462 

463 @util.memoized_property 

464 def _supports_statement_cache(self): 

465 ssc = self.__class__.__dict__.get("supports_statement_cache", None) 

466 if ssc is None: 

467 util.warn( 

468 "Dialect %s:%s will not make use of SQL compilation caching " 

469 "as it does not set the 'supports_statement_cache' attribute " 

470 "to ``True``. This can have " 

471 "significant performance implications including some " 

472 "performance degradations in comparison to prior SQLAlchemy " 

473 "versions. Dialect maintainers should seek to set this " 

474 "attribute to True after appropriate development and testing " 

475 "for SQLAlchemy 1.4 caching support. Alternatively, this " 

476 "attribute may be set to False which will disable this " 

477 "warning." % (self.name, self.driver), 

478 code="cprf", 

479 ) 

480 

481 return bool(ssc) 

482 

483 @util.memoized_property 

484 def _type_memos(self): 

485 return weakref.WeakKeyDictionary() 

486 

487 @property 

488 def dialect_description(self): # type: ignore[override] 

489 return self.name + "+" + self.driver 

490 

491 @property 

492 def supports_sane_rowcount_returning(self): 

493 """True if this dialect supports sane rowcount even if RETURNING is 

494 in use. 

495 

496 For dialects that don't support RETURNING, this is synonymous with 

497 ``supports_sane_rowcount``. 

498 

499 """ 

500 return self.supports_sane_rowcount 

501 

502 @classmethod 

503 def get_pool_class(cls, url: URL) -> Type[Pool]: 

504 default: Type[pool.Pool] 

505 if cls.is_async: 

506 default = pool.AsyncAdaptedQueuePool 

507 else: 

508 default = pool.QueuePool 

509 

510 return getattr(cls, "poolclass", default) 

511 

512 def get_dialect_pool_class(self, url: URL) -> Type[Pool]: 

513 return self.get_pool_class(url) 

514 

515 @classmethod 

516 def load_provisioning(cls): 

517 package = ".".join(cls.__module__.split(".")[0:-1]) 

518 try: 

519 __import__(package + ".provision") 

520 except ImportError: 

521 pass 

522 

523 def _builtin_onconnect(self) -> Optional[_ListenerFnType]: 

524 if self._on_connect_isolation_level is not None: 

525 

526 def builtin_connect(dbapi_conn, conn_rec): 

527 self._assert_and_set_isolation_level( 

528 dbapi_conn, self._on_connect_isolation_level 

529 ) 

530 

531 return builtin_connect 

532 else: 

533 return None 

534 

535 def initialize(self, connection: Connection) -> None: 

536 try: 

537 self.server_version_info = self._get_server_version_info( 

538 connection 

539 ) 

540 except NotImplementedError: 

541 self.server_version_info = None 

542 try: 

543 self.default_schema_name = self._get_default_schema_name( 

544 connection 

545 ) 

546 except NotImplementedError: 

547 self.default_schema_name = None 

548 

549 try: 

550 self.default_isolation_level = self.get_default_isolation_level( 

551 connection.connection.dbapi_connection 

552 ) 

553 except NotImplementedError: 

554 self.default_isolation_level = None 

555 

556 if not self._user_defined_max_identifier_length: 

557 max_ident_length = self._check_max_identifier_length(connection) 

558 if max_ident_length: 

559 self.max_identifier_length = max_ident_length 

560 

561 if ( 

562 self.label_length 

563 and self.label_length > self.max_identifier_length 

564 ): 

565 raise exc.ArgumentError( 

566 "Label length of %d is greater than this dialect's" 

567 " maximum identifier length of %d" 

568 % (self.label_length, self.max_identifier_length) 

569 ) 

570 

571 def on_connect(self) -> Optional[Callable[[Any], None]]: 

572 # inherits the docstring from interfaces.Dialect.on_connect 

573 return None 

574 

575 def _check_max_identifier_length(self, connection): 

576 """Perform a connection / server version specific check to determine 

577 the max_identifier_length. 

578 

579 If the dialect's class level max_identifier_length should be used, 

580 can return None. 

581 

582 """ 

583 return None 

584 

585 def get_default_isolation_level(self, dbapi_conn): 

586 """Given a DBAPI connection, return its isolation level, or 

587 a default isolation level if one cannot be retrieved. 

588 

589 May be overridden by subclasses in order to provide a 

590 "fallback" isolation level for databases that cannot reliably 

591 retrieve the actual isolation level. 

592 

593 By default, calls the :meth:`_engine.Interfaces.get_isolation_level` 

594 method, propagating any exceptions raised. 

595 

596 """ 

597 return self.get_isolation_level(dbapi_conn) 

598 

599 def type_descriptor(self, typeobj): 

600 """Provide a database-specific :class:`.TypeEngine` object, given 

601 the generic object which comes from the types module. 

602 

603 This method looks for a dictionary called 

604 ``colspecs`` as a class or instance-level variable, 

605 and passes on to :func:`_types.adapt_type`. 

606 

607 """ 

608 return type_api.adapt_type(typeobj, self.colspecs) 

609 

610 def has_index(self, connection, table_name, index_name, schema=None, **kw): 

611 if not self.has_table(connection, table_name, schema=schema, **kw): 

612 return False 

613 for idx in self.get_indexes( 

614 connection, table_name, schema=schema, **kw 

615 ): 

616 if idx["name"] == index_name: 

617 return True 

618 else: 

619 return False 

620 

621 def has_schema( 

622 self, connection: Connection, schema_name: str, **kw: Any 

623 ) -> bool: 

624 return schema_name in self.get_schema_names(connection, **kw) 

625 

626 def validate_identifier(self, ident: str) -> None: 

627 if len(ident) > self.max_identifier_length: 

628 raise exc.IdentifierError( 

629 "Identifier '%s' exceeds maximum length of %d characters" 

630 % (ident, self.max_identifier_length) 

631 ) 

632 

633 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection: 

634 # inherits the docstring from interfaces.Dialect.connect 

635 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501 

636 

637 def create_connect_args(self, url: URL) -> ConnectArgsType: 

638 # inherits the docstring from interfaces.Dialect.create_connect_args 

639 opts = url.translate_connect_args() 

640 opts.update(url.query) 

641 return ([], opts) 

642 

643 def set_engine_execution_options( 

644 self, engine: Engine, opts: Mapping[str, Any] 

645 ) -> None: 

646 supported_names = set(self.connection_characteristics).intersection( 

647 opts 

648 ) 

649 if supported_names: 

650 characteristics: Mapping[str, Any] = util.immutabledict( 

651 (name, opts[name]) for name in supported_names 

652 ) 

653 

654 @event.listens_for(engine, "engine_connect") 

655 def set_connection_characteristics(connection): 

656 self._set_connection_characteristics( 

657 connection, characteristics 

658 ) 

659 

660 def set_connection_execution_options( 

661 self, connection: Connection, opts: Mapping[str, Any] 

662 ) -> None: 

663 supported_names = set(self.connection_characteristics).intersection( 

664 opts 

665 ) 

666 if supported_names: 

667 characteristics: Mapping[str, Any] = util.immutabledict( 

668 (name, opts[name]) for name in supported_names 

669 ) 

670 self._set_connection_characteristics(connection, characteristics) 

671 

672 def _set_connection_characteristics(self, connection, characteristics): 

673 characteristic_values = [ 

674 (name, self.connection_characteristics[name], value) 

675 for name, value in characteristics.items() 

676 ] 

677 

678 if connection.in_transaction(): 

679 trans_objs = [ 

680 (name, obj) 

681 for name, obj, _ in characteristic_values 

682 if obj.transactional 

683 ] 

684 if trans_objs: 

685 raise exc.InvalidRequestError( 

686 "This connection has already initialized a SQLAlchemy " 

687 "Transaction() object via begin() or autobegin; " 

688 "%s may not be altered unless rollback() or commit() " 

689 "is called first." 

690 % (", ".join(name for name, obj in trans_objs)) 

691 ) 

692 

693 dbapi_connection = connection.connection.dbapi_connection 

694 for _, characteristic, value in characteristic_values: 

695 characteristic.set_connection_characteristic( 

696 self, connection, dbapi_connection, value 

697 ) 

698 connection.connection._connection_record.finalize_callback.append( 

699 functools.partial(self._reset_characteristics, characteristics) 

700 ) 

701 

702 def _reset_characteristics(self, characteristics, dbapi_connection): 

703 for characteristic_name in characteristics: 

704 characteristic = self.connection_characteristics[ 

705 characteristic_name 

706 ] 

707 characteristic.reset_characteristic(self, dbapi_connection) 

708 

709 def do_begin(self, dbapi_connection): 

710 pass 

711 

712 def do_rollback(self, dbapi_connection): 

713 if self.skip_autocommit_rollback and self.detect_autocommit_setting( 

714 dbapi_connection 

715 ): 

716 return 

717 dbapi_connection.rollback() 

718 

719 def do_commit(self, dbapi_connection): 

720 dbapi_connection.commit() 

721 

722 def do_terminate(self, dbapi_connection): 

723 self.do_close(dbapi_connection) 

724 

725 def do_close(self, dbapi_connection): 

726 dbapi_connection.close() 

727 

728 @util.memoized_property 

729 def _dialect_specific_select_one(self): 

730 return str(expression.select(1).compile(dialect=self)) 

731 

732 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: 

733 try: 

734 return self.do_ping(dbapi_connection) 

735 except self.loaded_dbapi.Error as err: 

736 is_disconnect = self.is_disconnect(err, dbapi_connection, None) 

737 

738 if self._has_events: 

739 try: 

740 Connection._handle_dbapi_exception_noconnection( 

741 err, 

742 self, 

743 is_disconnect=is_disconnect, 

744 invalidate_pool_on_disconnect=False, 

745 is_pre_ping=True, 

746 ) 

747 except exc.StatementError as new_err: 

748 is_disconnect = new_err.connection_invalidated 

749 

750 if is_disconnect: 

751 return False 

752 else: 

753 raise 

754 

755 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool: 

756 cursor = dbapi_connection.cursor() 

757 try: 

758 cursor.execute(self._dialect_specific_select_one) 

759 finally: 

760 cursor.close() 

761 return True 

762 

763 def create_xid(self): 

764 """Create a random two-phase transaction ID. 

765 

766 This id will be passed to do_begin_twophase(), do_rollback_twophase(), 

767 do_commit_twophase(). Its format is unspecified. 

768 """ 

769 

770 return "_sa_%032x" % random.randint(0, 2**128) 

771 

772 def do_savepoint(self, connection, name): 

773 connection.execute(expression.SavepointClause(name)) 

774 

775 def do_rollback_to_savepoint(self, connection, name): 

776 connection.execute(expression.RollbackToSavepointClause(name)) 

777 

778 def do_release_savepoint(self, connection, name): 

779 connection.execute(expression.ReleaseSavepointClause(name)) 

780 

781 def _deliver_insertmanyvalues_batches( 

782 self, 

783 connection, 

784 cursor, 

785 statement, 

786 parameters, 

787 generic_setinputsizes, 

788 context, 

789 ): 

790 context = cast(DefaultExecutionContext, context) 

791 compiled = cast(SQLCompiler, context.compiled) 

792 

793 _composite_sentinel_proc: Sequence[ 

794 Optional[_ResultProcessorType[Any]] 

795 ] = () 

796 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None 

797 _sentinel_proc_initialized: bool = False 

798 

799 compiled_parameters = context.compiled_parameters 

800 

801 imv = compiled._insertmanyvalues 

802 assert imv is not None 

803 

804 is_returning: Final[bool] = bool(compiled.effective_returning) 

805 batch_size = context.execution_options.get( 

806 "insertmanyvalues_page_size", self.insertmanyvalues_page_size 

807 ) 

808 

809 if compiled.schema_translate_map: 

810 schema_translate_map = context.execution_options.get( 

811 "schema_translate_map", {} 

812 ) 

813 else: 

814 schema_translate_map = None 

815 

816 if is_returning: 

817 result: Optional[List[Any]] = [] 

818 context._insertmanyvalues_rows = result 

819 

820 sort_by_parameter_order = imv.sort_by_parameter_order 

821 

822 else: 

823 sort_by_parameter_order = False 

824 result = None 

825 

826 for imv_batch in compiled._deliver_insertmanyvalues_batches( 

827 statement, 

828 parameters, 

829 compiled_parameters, 

830 generic_setinputsizes, 

831 batch_size, 

832 sort_by_parameter_order, 

833 schema_translate_map, 

834 ): 

835 yield imv_batch 

836 

837 if is_returning: 

838 

839 try: 

840 rows = context.fetchall_for_returning(cursor) 

841 except BaseException as be: 

842 connection._handle_dbapi_exception( 

843 be, 

844 sql_util._long_statement(imv_batch.replaced_statement), 

845 imv_batch.replaced_parameters, 

846 None, 

847 context, 

848 is_sub_exec=True, 

849 ) 

850 

851 # I would have thought "is_returning: Final[bool]" 

852 # would have assured this but pylance thinks not 

853 assert result is not None 

854 

855 if imv.num_sentinel_columns and not imv_batch.is_downgraded: 

856 composite_sentinel = imv.num_sentinel_columns > 1 

857 if imv.implicit_sentinel: 

858 # for implicit sentinel, which is currently single-col 

859 # integer autoincrement, do a simple sort. 

860 assert not composite_sentinel 

861 result.extend( 

862 sorted(rows, key=operator.itemgetter(-1)) 

863 ) 

864 continue 

865 

866 # otherwise, create dictionaries to match up batches 

867 # with parameters 

868 assert imv.sentinel_param_keys 

869 assert imv.sentinel_columns 

870 

871 _nsc = imv.num_sentinel_columns 

872 

873 if not _sentinel_proc_initialized: 

874 if composite_sentinel: 

875 _composite_sentinel_proc = [ 

876 col.type._cached_result_processor( 

877 self, cursor_desc[1] 

878 ) 

879 for col, cursor_desc in zip( 

880 imv.sentinel_columns, 

881 cursor.description[-_nsc:], 

882 ) 

883 ] 

884 else: 

885 _scalar_sentinel_proc = ( 

886 imv.sentinel_columns[0] 

887 ).type._cached_result_processor( 

888 self, cursor.description[-1][1] 

889 ) 

890 _sentinel_proc_initialized = True 

891 

892 rows_by_sentinel: Union[ 

893 Dict[Tuple[Any, ...], Any], 

894 Dict[Any, Any], 

895 ] 

896 

897 if composite_sentinel: 

898 rows_by_sentinel = { 

899 tuple( 

900 (proc(val) if proc else val) 

901 for val, proc in zip( 

902 row[-_nsc:], _composite_sentinel_proc 

903 ) 

904 ): row 

905 for row in rows 

906 } 

907 elif _scalar_sentinel_proc: 

908 rows_by_sentinel = { 

909 _scalar_sentinel_proc(row[-1]): row for row in rows 

910 } 

911 else: 

912 rows_by_sentinel = {row[-1]: row for row in rows} 

913 

914 if len(rows_by_sentinel) != len(imv_batch.batch): 

915 # see test_insert_exec.py:: 

916 # IMVSentinelTest::test_sentinel_incorrect_rowcount 

917 # for coverage / demonstration 

918 raise exc.InvalidRequestError( 

919 f"Sentinel-keyed result set did not produce " 

920 f"correct number of rows {len(imv_batch.batch)}; " 

921 "produced " 

922 f"{len(rows_by_sentinel)}. Please ensure the " 

923 "sentinel column is fully unique and populated in " 

924 "all cases." 

925 ) 

926 

927 try: 

928 ordered_rows = [ 

929 rows_by_sentinel[sentinel_keys] 

930 for sentinel_keys in imv_batch.sentinel_values 

931 ] 

932 except KeyError as ke: 

933 # see test_insert_exec.py:: 

934 # IMVSentinelTest::test_sentinel_cant_match_keys 

935 # for coverage / demonstration 

936 raise exc.InvalidRequestError( 

937 f"Can't match sentinel values in result set to " 

938 f"parameter sets; key {ke.args[0]!r} was not " 

939 "found. " 

940 "There may be a mismatch between the datatype " 

941 "passed to the DBAPI driver vs. that which it " 

942 "returns in a result row. Ensure the given " 

943 "Python value matches the expected result type " 

944 "*exactly*, taking care to not rely upon implicit " 

945 "conversions which may occur such as when using " 

946 "strings in place of UUID or integer values, etc. " 

947 ) from ke 

948 

949 result.extend(ordered_rows) 

950 

951 else: 

952 result.extend(rows) 

953 

954 def do_executemany(self, cursor, statement, parameters, context=None): 

955 cursor.executemany(statement, parameters) 

956 

957 def do_execute(self, cursor, statement, parameters, context=None): 

958 cursor.execute(statement, parameters) 

959 

960 def do_execute_no_params(self, cursor, statement, context=None): 

961 cursor.execute(statement) 

962 

963 def is_disconnect( 

964 self, 

965 e: DBAPIModule.Error, 

966 connection: Union[ 

967 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None 

968 ], 

969 cursor: Optional[interfaces.DBAPICursor], 

970 ) -> bool: 

971 return False 

972 

973 @util.memoized_instancemethod 

974 def _gen_allowed_isolation_levels(self, dbapi_conn): 

975 try: 

976 raw_levels = list(self.get_isolation_level_values(dbapi_conn)) 

977 except NotImplementedError: 

978 return None 

979 else: 

980 normalized_levels = [ 

981 level.replace("_", " ").upper() for level in raw_levels 

982 ] 

983 if raw_levels != normalized_levels: 

984 raise ValueError( 

985 f"Dialect {self.name!r} get_isolation_level_values() " 

986 f"method should return names as UPPERCASE using spaces, " 

987 f"not underscores; got " 

988 f"{sorted(set(raw_levels).difference(normalized_levels))}" 

989 ) 

990 return tuple(normalized_levels) 

991 

992 def _assert_and_set_isolation_level(self, dbapi_conn, level): 

993 level = level.replace("_", " ").upper() 

994 

995 _allowed_isolation_levels = self._gen_allowed_isolation_levels( 

996 dbapi_conn 

997 ) 

998 if ( 

999 _allowed_isolation_levels 

1000 and level not in _allowed_isolation_levels 

1001 ): 

1002 raise exc.ArgumentError( 

1003 f"Invalid value {level!r} for isolation_level. " 

1004 f"Valid isolation levels for {self.name!r} are " 

1005 f"{', '.join(_allowed_isolation_levels)}" 

1006 ) 

1007 

1008 self.set_isolation_level(dbapi_conn, level) 

1009 

1010 def reset_isolation_level(self, dbapi_conn): 

1011 if self._on_connect_isolation_level is not None: 

1012 assert ( 

1013 self._on_connect_isolation_level == "AUTOCOMMIT" 

1014 or self._on_connect_isolation_level 

1015 == self.default_isolation_level 

1016 ) 

1017 self._assert_and_set_isolation_level( 

1018 dbapi_conn, self._on_connect_isolation_level 

1019 ) 

1020 else: 

1021 assert self.default_isolation_level is not None 

1022 self._assert_and_set_isolation_level( 

1023 dbapi_conn, 

1024 self.default_isolation_level, 

1025 ) 

1026 

1027 def normalize_name(self, name): 

1028 if name is None: 

1029 return None 

1030 

1031 name_lower = name.lower() 

1032 name_upper = name.upper() 

1033 

1034 if name_upper == name_lower: 

1035 # name has no upper/lower conversion, e.g. non-european characters. 

1036 # return unchanged 

1037 return name 

1038 elif name_upper == name and not ( 

1039 self.identifier_preparer._requires_quotes 

1040 )(name_lower): 

1041 # name is all uppercase and doesn't require quoting; normalize 

1042 # to all lower case 

1043 return name_lower 

1044 elif name_lower == name: 

1045 # name is all lower case, which if denormalized means we need to 

1046 # force quoting on it 

1047 return quoted_name(name, quote=True) 

1048 else: 

1049 # name is mixed case, means it will be quoted in SQL when used 

1050 # later, no normalizes 

1051 return name 

1052 

1053 def denormalize_name(self, name): 

1054 if name is None: 

1055 return None 

1056 

1057 name_lower = name.lower() 

1058 name_upper = name.upper() 

1059 

1060 if name_upper == name_lower: 

1061 # name has no upper/lower conversion, e.g. non-european characters. 

1062 # return unchanged 

1063 return name 

1064 elif name_lower == name and not ( 

1065 self.identifier_preparer._requires_quotes 

1066 )(name_lower): 

1067 name = name_upper 

1068 return name 

1069 

1070 def get_driver_connection(self, connection: DBAPIConnection) -> Any: 

1071 return connection 

1072 

1073 def _overrides_default(self, method): 

1074 return ( 

1075 getattr(type(self), method).__code__ 

1076 is not getattr(DefaultDialect, method).__code__ 

1077 ) 

1078 

1079 def _default_multi_reflect( 

1080 self, 

1081 single_tbl_method, 

1082 connection, 

1083 kind, 

1084 schema, 

1085 filter_names, 

1086 scope, 

1087 **kw, 

1088 ): 

1089 names_fns = [] 

1090 temp_names_fns = [] 

1091 if ObjectKind.TABLE in kind: 

1092 names_fns.append(self.get_table_names) 

1093 temp_names_fns.append(self.get_temp_table_names) 

1094 if ObjectKind.VIEW in kind: 

1095 names_fns.append(self.get_view_names) 

1096 temp_names_fns.append(self.get_temp_view_names) 

1097 if ObjectKind.MATERIALIZED_VIEW in kind: 

1098 names_fns.append(self.get_materialized_view_names) 

1099 # no temp materialized view at the moment 

1100 # temp_names_fns.append(self.get_temp_materialized_view_names) 

1101 

1102 unreflectable = kw.pop("unreflectable", {}) 

1103 

1104 if ( 

1105 filter_names 

1106 and scope is ObjectScope.ANY 

1107 and kind is ObjectKind.ANY 

1108 ): 

1109 # if names are given and no qualification on type of table 

1110 # (i.e. the Table(..., autoload) case), take the names as given, 

1111 # don't run names queries. If a table does not exit 

1112 # NoSuchTableError is raised and it's skipped 

1113 

1114 # this also suits the case for mssql where we can reflect 

1115 # individual temp tables but there's no temp_names_fn 

1116 names = filter_names 

1117 else: 

1118 names = [] 

1119 name_kw = {"schema": schema, **kw} 

1120 fns = [] 

1121 if ObjectScope.DEFAULT in scope: 

1122 fns.extend(names_fns) 

1123 if ObjectScope.TEMPORARY in scope: 

1124 fns.extend(temp_names_fns) 

1125 

1126 for fn in fns: 

1127 try: 

1128 names.extend(fn(connection, **name_kw)) 

1129 except NotImplementedError: 

1130 pass 

1131 

1132 if filter_names: 

1133 filter_names = set(filter_names) 

1134 

1135 # iterate over all the tables/views and call the single table method 

1136 for table in names: 

1137 if not filter_names or table in filter_names: 

1138 key = (schema, table) 

1139 try: 

1140 yield ( 

1141 key, 

1142 single_tbl_method( 

1143 connection, table, schema=schema, **kw 

1144 ), 

1145 ) 

1146 except exc.UnreflectableTableError as err: 

1147 if key not in unreflectable: 

1148 unreflectable[key] = err 

1149 except exc.NoSuchTableError: 

1150 pass 

1151 

1152 def get_multi_table_options(self, connection, **kw): 

1153 return self._default_multi_reflect( 

1154 self.get_table_options, connection, **kw 

1155 ) 

1156 

1157 def get_multi_columns(self, connection, **kw): 

1158 return self._default_multi_reflect(self.get_columns, connection, **kw) 

1159 

1160 def get_multi_pk_constraint(self, connection, **kw): 

1161 return self._default_multi_reflect( 

1162 self.get_pk_constraint, connection, **kw 

1163 ) 

1164 

1165 def get_multi_foreign_keys(self, connection, **kw): 

1166 return self._default_multi_reflect( 

1167 self.get_foreign_keys, connection, **kw 

1168 ) 

1169 

1170 def get_multi_indexes(self, connection, **kw): 

1171 return self._default_multi_reflect(self.get_indexes, connection, **kw) 

1172 

1173 def get_multi_unique_constraints(self, connection, **kw): 

1174 return self._default_multi_reflect( 

1175 self.get_unique_constraints, connection, **kw 

1176 ) 

1177 

1178 def get_multi_check_constraints(self, connection, **kw): 

1179 return self._default_multi_reflect( 

1180 self.get_check_constraints, connection, **kw 

1181 ) 

1182 

1183 def get_multi_table_comment(self, connection, **kw): 

1184 return self._default_multi_reflect( 

1185 self.get_table_comment, connection, **kw 

1186 ) 

1187 

1188 

1189class StrCompileDialect(DefaultDialect): 

1190 statement_compiler = compiler.StrSQLCompiler 

1191 ddl_compiler = compiler.DDLCompiler 

1192 type_compiler_cls = compiler.StrSQLTypeCompiler 

1193 preparer = compiler.IdentifierPreparer 

1194 

1195 insert_returning = True 

1196 update_returning = True 

1197 delete_returning = True 

1198 

1199 supports_statement_cache = True 

1200 

1201 supports_identity_columns = True 

1202 

1203 supports_sequences = True 

1204 sequences_optional = True 

1205 preexecute_autoincrement_sequences = False 

1206 

1207 supports_native_boolean = True 

1208 

1209 supports_multivalues_insert = True 

1210 supports_simple_order_by_label = True 

1211 

1212 

1213class DefaultExecutionContext(ExecutionContext): 

1214 isinsert = False 

1215 isupdate = False 

1216 isdelete = False 

1217 is_crud = False 

1218 is_text = False 

1219 isddl = False 

1220 

1221 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE 

1222 

1223 compiled: Optional[Compiled] = None 

1224 result_column_struct: Optional[ 

1225 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool] 

1226 ] = None 

1227 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None 

1228 

1229 execution_options: _ExecuteOptions = util.EMPTY_DICT 

1230 

1231 cursor_fetch_strategy = _cursor._DEFAULT_FETCH 

1232 

1233 invoked_statement: Optional[Executable] = None 

1234 

1235 _is_implicit_returning = False 

1236 _is_explicit_returning = False 

1237 _is_supplemental_returning = False 

1238 _is_server_side = False 

1239 

1240 _soft_closed = False 

1241 

1242 _rowcount: Optional[int] = None 

1243 

1244 # a hook for SQLite's translation of 

1245 # result column names 

1246 # NOTE: pyhive is using this hook, can't remove it :( 

1247 _translate_colname: Optional[ 

1248 Callable[[str], Tuple[str, Optional[str]]] 

1249 ] = None 

1250 

1251 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict() 

1252 """used by set_input_sizes(). 

1253 

1254 This collection comes from ``ExpandedState.parameter_expansion``. 

1255 

1256 """ 

1257 

1258 cache_hit = NO_CACHE_KEY 

1259 

1260 root_connection: Connection 

1261 _dbapi_connection: PoolProxiedConnection 

1262 dialect: Dialect 

1263 unicode_statement: str 

1264 cursor: DBAPICursor 

1265 compiled_parameters: List[_MutableCoreSingleExecuteParams] 

1266 parameters: _DBAPIMultiExecuteParams 

1267 extracted_parameters: Optional[Sequence[BindParameter[Any]]] 

1268 

1269 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) 

1270 

1271 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None 

1272 _num_sentinel_cols: int = 0 

1273 

1274 @classmethod 

1275 def _init_ddl( 

1276 cls, 

1277 dialect: Dialect, 

1278 connection: Connection, 

1279 dbapi_connection: PoolProxiedConnection, 

1280 execution_options: _ExecuteOptions, 

1281 compiled_ddl: DDLCompiler, 

1282 ) -> ExecutionContext: 

1283 """Initialize execution context for an ExecutableDDLElement 

1284 construct.""" 

1285 

1286 self = cls.__new__(cls) 

1287 self.root_connection = connection 

1288 self._dbapi_connection = dbapi_connection 

1289 self.dialect = connection.dialect 

1290 

1291 self.compiled = compiled = compiled_ddl 

1292 self.isddl = True 

1293 

1294 self.execution_options = execution_options 

1295 

1296 self.unicode_statement = str(compiled) 

1297 if compiled.schema_translate_map: 

1298 schema_translate_map = self.execution_options.get( 

1299 "schema_translate_map", {} 

1300 ) 

1301 

1302 rst = compiled.preparer._render_schema_translates 

1303 self.unicode_statement = rst( 

1304 self.unicode_statement, schema_translate_map 

1305 ) 

1306 

1307 self.statement = self.unicode_statement 

1308 

1309 self.cursor = self.create_cursor() 

1310 self.compiled_parameters = [] 

1311 

1312 if dialect.positional: 

1313 self.parameters = [dialect.execute_sequence_format()] 

1314 else: 

1315 self.parameters = [self._empty_dict_params] 

1316 

1317 return self 

1318 

1319 @classmethod 

1320 def _init_compiled( 

1321 cls, 

1322 dialect: Dialect, 

1323 connection: Connection, 

1324 dbapi_connection: PoolProxiedConnection, 

1325 execution_options: _ExecuteOptions, 

1326 compiled: SQLCompiler, 

1327 parameters: _CoreMultiExecuteParams, 

1328 invoked_statement: Executable, 

1329 extracted_parameters: Optional[Sequence[BindParameter[Any]]], 

1330 cache_hit: CacheStats = CacheStats.CACHING_DISABLED, 

1331 param_dict: _CoreSingleExecuteParams | None = None, 

1332 ) -> ExecutionContext: 

1333 """Initialize execution context for a Compiled construct.""" 

1334 

1335 self = cls.__new__(cls) 

1336 self.root_connection = connection 

1337 self._dbapi_connection = dbapi_connection 

1338 self.dialect = connection.dialect 

1339 self.extracted_parameters = extracted_parameters 

1340 self.invoked_statement = invoked_statement 

1341 self.compiled = compiled 

1342 self.cache_hit = cache_hit 

1343 

1344 self.execution_options = execution_options 

1345 

1346 self.result_column_struct = ( 

1347 compiled._result_columns, 

1348 compiled._ordered_columns, 

1349 compiled._textual_ordered_columns, 

1350 compiled._ad_hoc_textual, 

1351 compiled._loose_column_name_matching, 

1352 ) 

1353 

1354 self.isinsert = ii = compiled.isinsert 

1355 self.isupdate = iu = compiled.isupdate 

1356 self.isdelete = id_ = compiled.isdelete 

1357 self.is_text = compiled.isplaintext 

1358 

1359 if ii or iu or id_: 

1360 dml_statement = compiled.compile_state.statement # type: ignore 

1361 if TYPE_CHECKING: 

1362 assert isinstance(dml_statement, UpdateBase) 

1363 self.is_crud = True 

1364 self._is_explicit_returning = ier = bool(dml_statement._returning) 

1365 self._is_implicit_returning = iir = bool( 

1366 compiled.implicit_returning 

1367 ) 

1368 if iir and dml_statement._supplemental_returning: 

1369 self._is_supplemental_returning = True 

1370 

1371 # dont mix implicit and explicit returning 

1372 assert not (iir and ier) 

1373 

1374 if (ier or iir) and compiled.for_executemany: 

1375 if ii and not self.dialect.insert_executemany_returning: 

1376 raise exc.InvalidRequestError( 

1377 f"Dialect {self.dialect.dialect_description} with " 

1378 f"current server capabilities does not support " 

1379 "INSERT..RETURNING when executemany is used" 

1380 ) 

1381 elif ( 

1382 ii 

1383 and dml_statement._sort_by_parameter_order 

1384 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501 

1385 ): 

1386 raise exc.InvalidRequestError( 

1387 f"Dialect {self.dialect.dialect_description} with " 

1388 f"current server capabilities does not support " 

1389 "INSERT..RETURNING with deterministic row ordering " 

1390 "when executemany is used" 

1391 ) 

1392 elif ( 

1393 ii 

1394 and self.dialect.use_insertmanyvalues 

1395 and not compiled._insertmanyvalues 

1396 ): 

1397 raise exc.InvalidRequestError( 

1398 'Statement does not have "insertmanyvalues" ' 

1399 "enabled, can't use INSERT..RETURNING with " 

1400 "executemany in this case." 

1401 ) 

1402 elif iu and not self.dialect.update_executemany_returning: 

1403 raise exc.InvalidRequestError( 

1404 f"Dialect {self.dialect.dialect_description} with " 

1405 f"current server capabilities does not support " 

1406 "UPDATE..RETURNING when executemany is used" 

1407 ) 

1408 elif id_ and not self.dialect.delete_executemany_returning: 

1409 raise exc.InvalidRequestError( 

1410 f"Dialect {self.dialect.dialect_description} with " 

1411 f"current server capabilities does not support " 

1412 "DELETE..RETURNING when executemany is used" 

1413 ) 

1414 

1415 if not parameters: 

1416 self.compiled_parameters = [ 

1417 compiled.construct_params( 

1418 extracted_parameters=extracted_parameters, 

1419 escape_names=False, 

1420 _collected_params=param_dict, 

1421 ) 

1422 ] 

1423 else: 

1424 self.compiled_parameters = [ 

1425 compiled.construct_params( 

1426 m, 

1427 escape_names=False, 

1428 _group_number=grp, 

1429 extracted_parameters=extracted_parameters, 

1430 _collected_params=param_dict, 

1431 ) 

1432 for grp, m in enumerate(parameters) 

1433 ] 

1434 

1435 if len(parameters) > 1: 

1436 if self.isinsert and compiled._insertmanyvalues: 

1437 self.execute_style = ExecuteStyle.INSERTMANYVALUES 

1438 

1439 imv = compiled._insertmanyvalues 

1440 if imv.sentinel_columns is not None: 

1441 self._num_sentinel_cols = imv.num_sentinel_columns 

1442 else: 

1443 self.execute_style = ExecuteStyle.EXECUTEMANY 

1444 

1445 self.unicode_statement = compiled.string 

1446 

1447 self.cursor = self.create_cursor() 

1448 

1449 if self.compiled.insert_prefetch or self.compiled.update_prefetch: 

1450 self._process_execute_defaults() 

1451 

1452 processors = compiled._bind_processors 

1453 

1454 flattened_processors: Mapping[ 

1455 str, _BindProcessorType[Any] 

1456 ] = processors # type: ignore[assignment] 

1457 

1458 if compiled.literal_execute_params or compiled.post_compile_params: 

1459 if self.executemany: 

1460 raise exc.InvalidRequestError( 

1461 "'literal_execute' or 'expanding' parameters can't be " 

1462 "used with executemany()" 

1463 ) 

1464 

1465 expanded_state = compiled._process_parameters_for_postcompile( 

1466 self.compiled_parameters[0] 

1467 ) 

1468 

1469 # re-assign self.unicode_statement 

1470 self.unicode_statement = expanded_state.statement 

1471 

1472 self._expanded_parameters = expanded_state.parameter_expansion 

1473 

1474 flattened_processors = dict(processors) # type: ignore 

1475 flattened_processors.update(expanded_state.processors) 

1476 positiontup = expanded_state.positiontup 

1477 elif compiled.positional: 

1478 positiontup = self.compiled.positiontup 

1479 else: 

1480 positiontup = None 

1481 

1482 if compiled.schema_translate_map: 

1483 schema_translate_map = self.execution_options.get( 

1484 "schema_translate_map", {} 

1485 ) 

1486 rst = compiled.preparer._render_schema_translates 

1487 self.unicode_statement = rst( 

1488 self.unicode_statement, schema_translate_map 

1489 ) 

1490 

1491 # final self.unicode_statement is now assigned, encode if needed 

1492 # by dialect 

1493 self.statement = self.unicode_statement 

1494 

1495 # Convert the dictionary of bind parameter values 

1496 # into a dict or list to be sent to the DBAPI's 

1497 # execute() or executemany() method. 

1498 

1499 if compiled.positional: 

1500 core_positional_parameters: MutableSequence[Sequence[Any]] = [] 

1501 assert positiontup is not None 

1502 for compiled_params in self.compiled_parameters: 

1503 l_param: List[Any] = [ 

1504 ( 

1505 flattened_processors[key](compiled_params[key]) 

1506 if key in flattened_processors 

1507 else compiled_params[key] 

1508 ) 

1509 for key in positiontup 

1510 ] 

1511 core_positional_parameters.append( 

1512 dialect.execute_sequence_format(l_param) 

1513 ) 

1514 

1515 self.parameters = core_positional_parameters 

1516 else: 

1517 core_dict_parameters: MutableSequence[Dict[str, Any]] = [] 

1518 escaped_names = compiled.escaped_bind_names 

1519 

1520 # note that currently, "expanded" parameters will be present 

1521 # in self.compiled_parameters in their quoted form. This is 

1522 # slightly inconsistent with the approach taken as of 

1523 # #8056 where self.compiled_parameters is meant to contain unquoted 

1524 # param names. 

1525 d_param: Dict[str, Any] 

1526 for compiled_params in self.compiled_parameters: 

1527 if escaped_names: 

1528 d_param = { 

1529 escaped_names.get(key, key): ( 

1530 flattened_processors[key](compiled_params[key]) 

1531 if key in flattened_processors 

1532 else compiled_params[key] 

1533 ) 

1534 for key in compiled_params 

1535 } 

1536 else: 

1537 d_param = { 

1538 key: ( 

1539 flattened_processors[key](compiled_params[key]) 

1540 if key in flattened_processors 

1541 else compiled_params[key] 

1542 ) 

1543 for key in compiled_params 

1544 } 

1545 

1546 core_dict_parameters.append(d_param) 

1547 

1548 self.parameters = core_dict_parameters 

1549 

1550 return self 

1551 

1552 @classmethod 

1553 def _init_statement( 

1554 cls, 

1555 dialect: Dialect, 

1556 connection: Connection, 

1557 dbapi_connection: PoolProxiedConnection, 

1558 execution_options: _ExecuteOptions, 

1559 statement: str, 

1560 parameters: _DBAPIMultiExecuteParams, 

1561 ) -> ExecutionContext: 

1562 """Initialize execution context for a string SQL statement.""" 

1563 

1564 self = cls.__new__(cls) 

1565 self.root_connection = connection 

1566 self._dbapi_connection = dbapi_connection 

1567 self.dialect = connection.dialect 

1568 self.is_text = True 

1569 

1570 self.execution_options = execution_options 

1571 

1572 if not parameters: 

1573 if self.dialect.positional: 

1574 self.parameters = [dialect.execute_sequence_format()] 

1575 else: 

1576 self.parameters = [self._empty_dict_params] 

1577 elif isinstance(parameters[0], dialect.execute_sequence_format): 

1578 self.parameters = parameters 

1579 elif isinstance(parameters[0], dict): 

1580 self.parameters = parameters 

1581 else: 

1582 self.parameters = [ 

1583 dialect.execute_sequence_format(p) for p in parameters 

1584 ] 

1585 

1586 if len(parameters) > 1: 

1587 self.execute_style = ExecuteStyle.EXECUTEMANY 

1588 

1589 self.statement = self.unicode_statement = statement 

1590 

1591 self.cursor = self.create_cursor() 

1592 return self 

1593 

1594 @classmethod 

1595 def _init_default( 

1596 cls, 

1597 dialect: Dialect, 

1598 connection: Connection, 

1599 dbapi_connection: PoolProxiedConnection, 

1600 execution_options: _ExecuteOptions, 

1601 ) -> ExecutionContext: 

1602 """Initialize execution context for a ColumnDefault construct.""" 

1603 

1604 self = cls.__new__(cls) 

1605 self.root_connection = connection 

1606 self._dbapi_connection = dbapi_connection 

1607 self.dialect = connection.dialect 

1608 

1609 self.execution_options = execution_options 

1610 

1611 self.cursor = self.create_cursor() 

1612 return self 

1613 

1614 def _get_cache_stats(self) -> str: 

1615 if self.compiled is None: 

1616 return "raw sql" 

1617 

1618 now = perf_counter() 

1619 

1620 ch = self.cache_hit 

1621 

1622 gen_time = self.compiled._gen_time 

1623 assert gen_time is not None 

1624 

1625 if ch is NO_CACHE_KEY: 

1626 return "no key %.5fs" % (now - gen_time,) 

1627 elif ch is CACHE_HIT: 

1628 return "cached since %.4gs ago" % (now - gen_time,) 

1629 elif ch is CACHE_MISS: 

1630 return "generated in %.5fs" % (now - gen_time,) 

1631 elif ch is CACHING_DISABLED: 

1632 if "_cache_disable_reason" in self.execution_options: 

1633 return "caching disabled (%s) %.5fs " % ( 

1634 self.execution_options["_cache_disable_reason"], 

1635 now - gen_time, 

1636 ) 

1637 else: 

1638 return "caching disabled %.5fs" % (now - gen_time,) 

1639 elif ch is NO_DIALECT_SUPPORT: 

1640 return "dialect %s+%s does not support caching %.5fs" % ( 

1641 self.dialect.name, 

1642 self.dialect.driver, 

1643 now - gen_time, 

1644 ) 

1645 else: 

1646 return "unknown" 

1647 

1648 @property 

1649 def executemany(self): # type: ignore[override] 

1650 return self.execute_style in ( 

1651 ExecuteStyle.EXECUTEMANY, 

1652 ExecuteStyle.INSERTMANYVALUES, 

1653 ) 

1654 

1655 @util.memoized_property 

1656 def identifier_preparer(self): 

1657 if self.compiled: 

1658 return self.compiled.preparer 

1659 elif "schema_translate_map" in self.execution_options: 

1660 return self.dialect.identifier_preparer._with_schema_translate( 

1661 self.execution_options["schema_translate_map"] 

1662 ) 

1663 else: 

1664 return self.dialect.identifier_preparer 

1665 

1666 @util.memoized_property 

1667 def engine(self): 

1668 return self.root_connection.engine 

1669 

1670 @util.memoized_property 

1671 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1672 if TYPE_CHECKING: 

1673 assert isinstance(self.compiled, SQLCompiler) 

1674 return self.compiled.postfetch 

1675 

1676 @util.memoized_property 

1677 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1678 if TYPE_CHECKING: 

1679 assert isinstance(self.compiled, SQLCompiler) 

1680 if self.isinsert: 

1681 return self.compiled.insert_prefetch 

1682 elif self.isupdate: 

1683 return self.compiled.update_prefetch 

1684 else: 

1685 return () 

1686 

1687 @util.memoized_property 

1688 def no_parameters(self): 

1689 return self.execution_options.get("no_parameters", False) 

1690 

1691 def _execute_scalar( 

1692 self, 

1693 stmt: str, 

1694 type_: Optional[TypeEngine[Any]], 

1695 parameters: Optional[_DBAPISingleExecuteParams] = None, 

1696 ) -> Any: 

1697 """Execute a string statement on the current cursor, returning a 

1698 scalar result. 

1699 

1700 Used to fire off sequences, default phrases, and "select lastrowid" 

1701 types of statements individually or in the context of a parent INSERT 

1702 or UPDATE statement. 

1703 

1704 """ 

1705 

1706 conn = self.root_connection 

1707 

1708 if "schema_translate_map" in self.execution_options: 

1709 schema_translate_map = self.execution_options.get( 

1710 "schema_translate_map", {} 

1711 ) 

1712 

1713 rst = self.identifier_preparer._render_schema_translates 

1714 stmt = rst(stmt, schema_translate_map) 

1715 

1716 if not parameters: 

1717 if self.dialect.positional: 

1718 parameters = self.dialect.execute_sequence_format() 

1719 else: 

1720 parameters = {} 

1721 

1722 conn._cursor_execute(self.cursor, stmt, parameters, context=self) 

1723 row = self.cursor.fetchone() 

1724 if row is not None: 

1725 r = row[0] 

1726 else: 

1727 r = None 

1728 if type_ is not None: 

1729 # apply type post processors to the result 

1730 proc = type_._cached_result_processor( 

1731 self.dialect, self.cursor.description[0][1] 

1732 ) 

1733 if proc: 

1734 return proc(r) 

1735 return r 

1736 

1737 @util.memoized_property 

1738 def connection(self): 

1739 return self.root_connection 

1740 

1741 def _use_server_side_cursor(self): 

1742 if not self.dialect.supports_server_side_cursors: 

1743 return False 

1744 

1745 if self.dialect.server_side_cursors: 

1746 # this is deprecated 

1747 use_server_side = self.execution_options.get( 

1748 "stream_results", True 

1749 ) and ( 

1750 self.compiled 

1751 and isinstance(self.compiled.statement, expression.Selectable) 

1752 or ( 

1753 ( 

1754 not self.compiled 

1755 or isinstance( 

1756 self.compiled.statement, expression.TextClause 

1757 ) 

1758 ) 

1759 and self.unicode_statement 

1760 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement) 

1761 ) 

1762 ) 

1763 else: 

1764 use_server_side = self.execution_options.get( 

1765 "stream_results", False 

1766 ) 

1767 

1768 return use_server_side 

1769 

1770 def create_cursor(self) -> DBAPICursor: 

1771 if ( 

1772 # inlining initial preference checks for SS cursors 

1773 self.dialect.supports_server_side_cursors 

1774 and ( 

1775 self.execution_options.get("stream_results", False) 

1776 or ( 

1777 self.dialect.server_side_cursors 

1778 and self._use_server_side_cursor() 

1779 ) 

1780 ) 

1781 ): 

1782 self._is_server_side = True 

1783 return self.create_server_side_cursor() 

1784 else: 

1785 self._is_server_side = False 

1786 return self.create_default_cursor() 

1787 

1788 def fetchall_for_returning(self, cursor): 

1789 return cursor.fetchall() 

1790 

1791 def create_default_cursor(self) -> DBAPICursor: 

1792 return self._dbapi_connection.cursor() 

1793 

1794 def create_server_side_cursor(self) -> DBAPICursor: 

1795 raise NotImplementedError() 

1796 

1797 def pre_exec(self): 

1798 pass 

1799 

1800 def get_out_parameter_values(self, names): 

1801 raise NotImplementedError( 

1802 "This dialect does not support OUT parameters" 

1803 ) 

1804 

1805 def post_exec(self): 

1806 pass 

1807 

1808 def get_result_processor( 

1809 self, type_: TypeEngine[Any], colname: str, coltype: DBAPIType 

1810 ) -> Optional[_ResultProcessorType[Any]]: 

1811 """Return a 'result processor' for a given type as present in 

1812 cursor.description. 

1813 

1814 This has a default implementation that dialects can override 

1815 for context-sensitive result type handling. 

1816 

1817 """ 

1818 return type_._cached_result_processor(self.dialect, coltype) 

1819 

1820 def get_lastrowid(self) -> int: 

1821 """return self.cursor.lastrowid, or equivalent, after an INSERT. 

1822 

1823 This may involve calling special cursor functions, issuing a new SELECT 

1824 on the cursor (or a new one), or returning a stored value that was 

1825 calculated within post_exec(). 

1826 

1827 This function will only be called for dialects which support "implicit" 

1828 primary key generation, keep preexecute_autoincrement_sequences set to 

1829 False, and when no explicit id value was bound to the statement. 

1830 

1831 The function is called once for an INSERT statement that would need to 

1832 return the last inserted primary key for those dialects that make use 

1833 of the lastrowid concept. In these cases, it is called directly after 

1834 :meth:`.ExecutionContext.post_exec`. 

1835 

1836 """ 

1837 return self.cursor.lastrowid 

1838 

1839 def handle_dbapi_exception(self, e): 

1840 pass 

1841 

1842 @util.non_memoized_property 

1843 def rowcount(self) -> int: 

1844 if self._rowcount is not None: 

1845 return self._rowcount 

1846 else: 

1847 return self.cursor.rowcount 

1848 

1849 @property 

1850 def _has_rowcount(self): 

1851 return self._rowcount is not None 

1852 

1853 def supports_sane_rowcount(self): 

1854 return self.dialect.supports_sane_rowcount 

1855 

1856 def supports_sane_multi_rowcount(self): 

1857 return self.dialect.supports_sane_multi_rowcount 

1858 

1859 def _setup_result_proxy(self): 

1860 exec_opt = self.execution_options 

1861 

1862 if self._rowcount is None and exec_opt.get("preserve_rowcount", False): 

1863 self._rowcount = self.cursor.rowcount 

1864 

1865 yp: Optional[Union[int, bool]] 

1866 if self.is_crud or self.is_text: 

1867 result = self._setup_dml_or_text_result() 

1868 yp = False 

1869 else: 

1870 yp = exec_opt.get("yield_per", None) 

1871 sr = self._is_server_side or exec_opt.get("stream_results", False) 

1872 strategy = self.cursor_fetch_strategy 

1873 if sr and strategy is _cursor._DEFAULT_FETCH: 

1874 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1875 self.cursor, self.execution_options 

1876 ) 

1877 cursor_description: _DBAPICursorDescription = ( 

1878 strategy.alternate_cursor_description 

1879 or self.cursor.description 

1880 ) 

1881 if cursor_description is None: 

1882 strategy = _cursor._NO_CURSOR_DQL 

1883 

1884 result = _cursor.CursorResult(self, strategy, cursor_description) 

1885 

1886 compiled = self.compiled 

1887 

1888 if ( 

1889 compiled 

1890 and not self.isddl 

1891 and cast(SQLCompiler, compiled).has_out_parameters 

1892 ): 

1893 self._setup_out_parameters(result) 

1894 

1895 self._soft_closed = result._soft_closed 

1896 

1897 if yp: 

1898 result = result.yield_per(yp) 

1899 

1900 return result 

1901 

1902 def _setup_out_parameters(self, result): 

1903 compiled = cast(SQLCompiler, self.compiled) 

1904 

1905 out_bindparams = [ 

1906 (param, name) 

1907 for param, name in compiled.bind_names.items() 

1908 if param.isoutparam 

1909 ] 

1910 out_parameters = {} 

1911 

1912 for bindparam, raw_value in zip( 

1913 [param for param, name in out_bindparams], 

1914 self.get_out_parameter_values( 

1915 [name for param, name in out_bindparams] 

1916 ), 

1917 ): 

1918 type_ = bindparam.type 

1919 impl_type = type_.dialect_impl(self.dialect) 

1920 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi) 

1921 result_processor = impl_type.result_processor( 

1922 self.dialect, dbapi_type 

1923 ) 

1924 if result_processor is not None: 

1925 raw_value = result_processor(raw_value) 

1926 out_parameters[bindparam.key] = raw_value 

1927 

1928 result.out_parameters = out_parameters 

1929 

1930 def _setup_dml_or_text_result(self): 

1931 compiled = cast(SQLCompiler, self.compiled) 

1932 

1933 strategy: ResultFetchStrategy = self.cursor_fetch_strategy 

1934 

1935 if self.isinsert: 

1936 if ( 

1937 self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1938 and compiled.effective_returning 

1939 ): 

1940 strategy = _cursor.FullyBufferedCursorFetchStrategy( 

1941 self.cursor, 

1942 initial_buffer=self._insertmanyvalues_rows, 

1943 # maintain alt cursor description if set by the 

1944 # dialect, e.g. mssql preserves it 

1945 alternate_description=( 

1946 strategy.alternate_cursor_description 

1947 ), 

1948 ) 

1949 

1950 if compiled.postfetch_lastrowid: 

1951 self.inserted_primary_key_rows = ( 

1952 self._setup_ins_pk_from_lastrowid() 

1953 ) 

1954 # else if not self._is_implicit_returning, 

1955 # the default inserted_primary_key_rows accessor will 

1956 # return an "empty" primary key collection when accessed. 

1957 

1958 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: 

1959 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1960 self.cursor, self.execution_options 

1961 ) 

1962 

1963 if strategy is _cursor._NO_CURSOR_DML: 

1964 cursor_description = None 

1965 else: 

1966 cursor_description = ( 

1967 strategy.alternate_cursor_description 

1968 or self.cursor.description 

1969 ) 

1970 

1971 if cursor_description is None: 

1972 strategy = _cursor._NO_CURSOR_DML 

1973 elif self._num_sentinel_cols: 

1974 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES 

1975 # the sentinel columns are handled in CursorResult._init_metadata 

1976 # using essentially _reduce 

1977 

1978 result: _cursor.CursorResult[Any] = _cursor.CursorResult( 

1979 self, strategy, cursor_description 

1980 ) 

1981 

1982 if self.isinsert: 

1983 if self._is_implicit_returning: 

1984 rows = result.all() 

1985 

1986 self.returned_default_rows = rows 

1987 

1988 self.inserted_primary_key_rows = ( 

1989 self._setup_ins_pk_from_implicit_returning(result, rows) 

1990 ) 

1991 

1992 # test that it has a cursor metadata that is accurate. the 

1993 # first row will have been fetched and current assumptions 

1994 # are that the result has only one row, until executemany() 

1995 # support is added here. 

1996 assert result._metadata.returns_rows 

1997 

1998 # Insert statement has both return_defaults() and 

1999 # returning(). rewind the result on the list of rows 

2000 # we just used. 

2001 if self._is_supplemental_returning: 

2002 result._rewind(rows) 

2003 else: 

2004 result._soft_close() 

2005 elif not self._is_explicit_returning: 

2006 result._soft_close() 

2007 

2008 # we assume here the result does not return any rows. 

2009 # *usually*, this will be true. However, some dialects 

2010 # such as that of MSSQL/pyodbc need to SELECT a post fetch 

2011 # function so this is not necessarily true. 

2012 # assert not result.returns_rows 

2013 

2014 elif self._is_implicit_returning: 

2015 rows = result.all() 

2016 

2017 if rows: 

2018 self.returned_default_rows = rows 

2019 self._rowcount = len(rows) 

2020 

2021 if self._is_supplemental_returning: 

2022 result._rewind(rows) 

2023 else: 

2024 result._soft_close() 

2025 

2026 # test that it has a cursor metadata that is accurate. 

2027 # the rows have all been fetched however. 

2028 assert result._metadata.returns_rows 

2029 

2030 elif not result._metadata.returns_rows: 

2031 # no results, get rowcount 

2032 # (which requires open cursor on some drivers) 

2033 if self._rowcount is None: 

2034 self._rowcount = self.cursor.rowcount 

2035 result._soft_close() 

2036 elif self.isupdate or self.isdelete: 

2037 if self._rowcount is None: 

2038 self._rowcount = self.cursor.rowcount 

2039 return result 

2040 

2041 @util.memoized_property 

2042 def inserted_primary_key_rows(self): 

2043 # if no specific "get primary key" strategy was set up 

2044 # during execution, return a "default" primary key based 

2045 # on what's in the compiled_parameters and nothing else. 

2046 return self._setup_ins_pk_from_empty() 

2047 

2048 def _setup_ins_pk_from_lastrowid(self): 

2049 getter = cast( 

2050 SQLCompiler, self.compiled 

2051 )._inserted_primary_key_from_lastrowid_getter 

2052 lastrowid = self.get_lastrowid() 

2053 return [getter(lastrowid, self.compiled_parameters[0])] 

2054 

2055 def _setup_ins_pk_from_empty(self): 

2056 getter = cast( 

2057 SQLCompiler, self.compiled 

2058 )._inserted_primary_key_from_lastrowid_getter 

2059 return [getter(None, param) for param in self.compiled_parameters] 

2060 

2061 def _setup_ins_pk_from_implicit_returning(self, result, rows): 

2062 if not rows: 

2063 return [] 

2064 

2065 getter = cast( 

2066 SQLCompiler, self.compiled 

2067 )._inserted_primary_key_from_returning_getter 

2068 compiled_params = self.compiled_parameters 

2069 

2070 return [ 

2071 getter(row, param) for row, param in zip(rows, compiled_params) 

2072 ] 

2073 

2074 def lastrow_has_defaults(self) -> bool: 

2075 return (self.isinsert or self.isupdate) and bool( 

2076 cast(SQLCompiler, self.compiled).postfetch 

2077 ) 

2078 

2079 def _prepare_set_input_sizes( 

2080 self, 

2081 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]: 

2082 """Given a cursor and ClauseParameters, prepare arguments 

2083 in order to call the appropriate 

2084 style of ``setinputsizes()`` on the cursor, using DB-API types 

2085 from the bind parameter's ``TypeEngine`` objects. 

2086 

2087 This method only called by those dialects which set the 

2088 :attr:`.Dialect.bind_typing` attribute to 

2089 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are 

2090 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an 

2091 option. 

2092 

2093 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used 

2094 for pg8000 and asyncpg, which has been changed to inline rendering 

2095 of casts. 

2096 

2097 """ 

2098 if self.isddl or self.is_text: 

2099 return None 

2100 

2101 compiled = cast(SQLCompiler, self.compiled) 

2102 

2103 inputsizes = compiled._get_set_input_sizes_lookup() 

2104 

2105 if inputsizes is None: 

2106 return None 

2107 

2108 dialect = self.dialect 

2109 

2110 # all of the rest of this... cython? 

2111 

2112 if dialect._has_events: 

2113 inputsizes = dict(inputsizes) 

2114 dialect.dispatch.do_setinputsizes( 

2115 inputsizes, self.cursor, self.statement, self.parameters, self 

2116 ) 

2117 

2118 if compiled.escaped_bind_names: 

2119 escaped_bind_names = compiled.escaped_bind_names 

2120 else: 

2121 escaped_bind_names = None 

2122 

2123 if dialect.positional: 

2124 items = [ 

2125 (key, compiled.binds[key]) 

2126 for key in compiled.positiontup or () 

2127 ] 

2128 else: 

2129 items = [ 

2130 (key, bindparam) 

2131 for bindparam, key in compiled.bind_names.items() 

2132 ] 

2133 

2134 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = [] 

2135 for key, bindparam in items: 

2136 if bindparam in compiled.literal_execute_params: 

2137 continue 

2138 

2139 if key in self._expanded_parameters: 

2140 if is_tuple_type(bindparam.type): 

2141 num = len(bindparam.type.types) 

2142 dbtypes = inputsizes[bindparam] 

2143 generic_inputsizes.extend( 

2144 ( 

2145 ( 

2146 escaped_bind_names.get(paramname, paramname) 

2147 if escaped_bind_names is not None 

2148 else paramname 

2149 ), 

2150 dbtypes[idx % num], 

2151 bindparam.type.types[idx % num], 

2152 ) 

2153 for idx, paramname in enumerate( 

2154 self._expanded_parameters[key] 

2155 ) 

2156 ) 

2157 else: 

2158 dbtype = inputsizes.get(bindparam, None) 

2159 generic_inputsizes.extend( 

2160 ( 

2161 ( 

2162 escaped_bind_names.get(paramname, paramname) 

2163 if escaped_bind_names is not None 

2164 else paramname 

2165 ), 

2166 dbtype, 

2167 bindparam.type, 

2168 ) 

2169 for paramname in self._expanded_parameters[key] 

2170 ) 

2171 else: 

2172 dbtype = inputsizes.get(bindparam, None) 

2173 

2174 escaped_name = ( 

2175 escaped_bind_names.get(key, key) 

2176 if escaped_bind_names is not None 

2177 else key 

2178 ) 

2179 

2180 generic_inputsizes.append( 

2181 (escaped_name, dbtype, bindparam.type) 

2182 ) 

2183 

2184 return generic_inputsizes 

2185 

2186 def _exec_default(self, column, default, type_): 

2187 if default.is_sequence: 

2188 return self.fire_sequence(default, type_) 

2189 elif default.is_callable: 

2190 # this codepath is not normally used as it's inlined 

2191 # into _process_execute_defaults 

2192 self.current_column = column 

2193 return default.arg(self) 

2194 elif default.is_clause_element: 

2195 return self._exec_default_clause_element(column, default, type_) 

2196 else: 

2197 # this codepath is not normally used as it's inlined 

2198 # into _process_execute_defaults 

2199 return default.arg 

2200 

2201 def _exec_default_clause_element(self, column, default, type_): 

2202 # execute a default that's a complete clause element. Here, we have 

2203 # to re-implement a miniature version of the compile->parameters-> 

2204 # cursor.execute() sequence, since we don't want to modify the state 

2205 # of the connection / result in progress or create new connection/ 

2206 # result objects etc. 

2207 # .. versionchanged:: 1.4 

2208 

2209 if not default._arg_is_typed: 

2210 default_arg = expression.type_coerce(default.arg, type_) 

2211 else: 

2212 default_arg = default.arg 

2213 compiled = expression.select(default_arg).compile(dialect=self.dialect) 

2214 compiled_params = compiled.construct_params() 

2215 processors = compiled._bind_processors 

2216 if compiled.positional: 

2217 parameters = self.dialect.execute_sequence_format( 

2218 [ 

2219 ( 

2220 processors[key](compiled_params[key]) # type: ignore 

2221 if key in processors 

2222 else compiled_params[key] 

2223 ) 

2224 for key in compiled.positiontup or () 

2225 ] 

2226 ) 

2227 else: 

2228 parameters = { 

2229 key: ( 

2230 processors[key](compiled_params[key]) # type: ignore 

2231 if key in processors 

2232 else compiled_params[key] 

2233 ) 

2234 for key in compiled_params 

2235 } 

2236 return self._execute_scalar( 

2237 str(compiled), type_, parameters=parameters 

2238 ) 

2239 

2240 current_parameters: Optional[_CoreSingleExecuteParams] = None 

2241 """A dictionary of parameters applied to the current row. 

2242 

2243 This attribute is only available in the context of a user-defined default 

2244 generation function, e.g. as described at :ref:`context_default_functions`. 

2245 It consists of a dictionary which includes entries for each column/value 

2246 pair that is to be part of the INSERT or UPDATE statement. The keys of the 

2247 dictionary will be the key value of each :class:`_schema.Column`, 

2248 which is usually 

2249 synonymous with the name. 

2250 

2251 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute 

2252 does not accommodate for the "multi-values" feature of the 

2253 :meth:`_expression.Insert.values` method. The 

2254 :meth:`.DefaultExecutionContext.get_current_parameters` method should be 

2255 preferred. 

2256 

2257 .. seealso:: 

2258 

2259 :meth:`.DefaultExecutionContext.get_current_parameters` 

2260 

2261 :ref:`context_default_functions` 

2262 

2263 """ 

2264 

2265 def get_current_parameters(self, isolate_multiinsert_groups=True): 

2266 """Return a dictionary of parameters applied to the current row. 

2267 

2268 This method can only be used in the context of a user-defined default 

2269 generation function, e.g. as described at 

2270 :ref:`context_default_functions`. When invoked, a dictionary is 

2271 returned which includes entries for each column/value pair that is part 

2272 of the INSERT or UPDATE statement. The keys of the dictionary will be 

2273 the key value of each :class:`_schema.Column`, 

2274 which is usually synonymous 

2275 with the name. 

2276 

2277 :param isolate_multiinsert_groups=True: indicates that multi-valued 

2278 INSERT constructs created using :meth:`_expression.Insert.values` 

2279 should be 

2280 handled by returning only the subset of parameters that are local 

2281 to the current column default invocation. When ``False``, the 

2282 raw parameters of the statement are returned including the 

2283 naming convention used in the case of multi-valued INSERT. 

2284 

2285 .. seealso:: 

2286 

2287 :attr:`.DefaultExecutionContext.current_parameters` 

2288 

2289 :ref:`context_default_functions` 

2290 

2291 """ 

2292 try: 

2293 parameters = self.current_parameters 

2294 column = self.current_column 

2295 except AttributeError: 

2296 raise exc.InvalidRequestError( 

2297 "get_current_parameters() can only be invoked in the " 

2298 "context of a Python side column default function" 

2299 ) 

2300 else: 

2301 assert column is not None 

2302 assert parameters is not None 

2303 compile_state = cast( 

2304 "DMLState", cast(SQLCompiler, self.compiled).compile_state 

2305 ) 

2306 assert compile_state is not None 

2307 if ( 

2308 isolate_multiinsert_groups 

2309 and dml.isinsert(compile_state) 

2310 and compile_state._has_multi_parameters 

2311 ): 

2312 if column._is_multiparam_column: 

2313 index = column.index + 1 

2314 d = {column.original.key: parameters[column.key]} 

2315 else: 

2316 d = {column.key: parameters[column.key]} 

2317 index = 0 

2318 assert compile_state._dict_parameters is not None 

2319 keys = compile_state._dict_parameters.keys() 

2320 d.update( 

2321 (key, parameters["%s_m%d" % (key, index)]) for key in keys 

2322 ) 

2323 return d 

2324 else: 

2325 return parameters 

2326 

2327 def get_insert_default(self, column): 

2328 if column.default is None: 

2329 return None 

2330 else: 

2331 return self._exec_default(column, column.default, column.type) 

2332 

2333 def get_update_default(self, column): 

2334 if column.onupdate is None: 

2335 return None 

2336 else: 

2337 return self._exec_default(column, column.onupdate, column.type) 

2338 

2339 def _process_execute_defaults(self): 

2340 compiled = cast(SQLCompiler, self.compiled) 

2341 

2342 key_getter = compiled._within_exec_param_key_getter 

2343 

2344 sentinel_counter = 0 

2345 

2346 if compiled.insert_prefetch: 

2347 prefetch_recs = [ 

2348 ( 

2349 c, 

2350 key_getter(c), 

2351 c._default_description_tuple, 

2352 self.get_insert_default, 

2353 ) 

2354 for c in compiled.insert_prefetch 

2355 ] 

2356 elif compiled.update_prefetch: 

2357 prefetch_recs = [ 

2358 ( 

2359 c, 

2360 key_getter(c), 

2361 c._onupdate_description_tuple, 

2362 self.get_update_default, 

2363 ) 

2364 for c in compiled.update_prefetch 

2365 ] 

2366 else: 

2367 prefetch_recs = [] 

2368 

2369 for param in self.compiled_parameters: 

2370 self.current_parameters = param 

2371 

2372 for ( 

2373 c, 

2374 param_key, 

2375 (arg, is_scalar, is_callable, is_sentinel), 

2376 fallback, 

2377 ) in prefetch_recs: 

2378 if is_sentinel: 

2379 param[param_key] = sentinel_counter 

2380 sentinel_counter += 1 

2381 elif is_scalar: 

2382 param[param_key] = arg 

2383 elif is_callable: 

2384 self.current_column = c 

2385 param[param_key] = arg(self) 

2386 else: 

2387 val = fallback(c) 

2388 if val is not None: 

2389 param[param_key] = val 

2390 

2391 del self.current_parameters 

2392 

2393 

2394DefaultDialect.execution_ctx_cls = DefaultExecutionContext