Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1098 statements  

1# engine/default.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Default implementations of per-dialect sqlalchemy.engine classes. 

10 

11These are semi-private implementation classes which are only of importance 

12to database dialect authors; dialects will usually use the classes here 

13as the base class for their own corresponding classes. 

14 

15""" 

16 

17from __future__ import annotations 

18 

19import functools 

20import operator 

21import random 

22import re 

23from time import perf_counter 

24import typing 

25from typing import Any 

26from typing import Callable 

27from typing import cast 

28from typing import Dict 

29from typing import Final 

30from typing import List 

31from typing import Literal 

32from typing import Mapping 

33from typing import MutableMapping 

34from typing import MutableSequence 

35from typing import Optional 

36from typing import Sequence 

37from typing import Set 

38from typing import Tuple 

39from typing import Type 

40from typing import TYPE_CHECKING 

41from typing import Union 

42import weakref 

43 

44from . import characteristics 

45from . import cursor as _cursor 

46from . import interfaces 

47from . import reflection 

48from .base import Connection 

49from .interfaces import CacheStats 

50from .interfaces import DBAPICursor 

51from .interfaces import Dialect 

52from .interfaces import ExecuteStyle 

53from .interfaces import ExecutionContext 

54from .reflection import ObjectKind 

55from .reflection import ObjectScope 

56from .. import event 

57from .. import exc 

58from .. import pool 

59from .. import util 

60from ..sql import compiler 

61from ..sql import dml 

62from ..sql import expression 

63from ..sql import type_api 

64from ..sql import util as sql_util 

65from ..sql._typing import is_tuple_type 

66from ..sql.base import _NoArg 

67from ..sql.compiler import AggregateOrderByStyle 

68from ..sql.compiler import DDLCompiler 

69from ..sql.compiler import InsertmanyvaluesSentinelOpts 

70from ..sql.compiler import SQLCompiler 

71from ..sql.elements import quoted_name 

72from ..util.typing import TupleAny 

73from ..util.typing import Unpack 

74 

75if typing.TYPE_CHECKING: 

76 from .base import Engine 

77 from .cursor import ResultFetchStrategy 

78 from .interfaces import _CoreMultiExecuteParams 

79 from .interfaces import _CoreSingleExecuteParams 

80 from .interfaces import _DBAPICursorDescription 

81 from .interfaces import _DBAPIMultiExecuteParams 

82 from .interfaces import _DBAPISingleExecuteParams 

83 from .interfaces import _ExecuteOptions 

84 from .interfaces import _MutableCoreSingleExecuteParams 

85 from .interfaces import _ParamStyle 

86 from .interfaces import ConnectArgsType 

87 from .interfaces import DBAPIConnection 

88 from .interfaces import DBAPIModule 

89 from .interfaces import DBAPIType 

90 from .interfaces import IsolationLevel 

91 from .row import Row 

92 from .url import URL 

93 from ..event import _ListenerFnType 

94 from ..pool import Pool 

95 from ..pool import PoolProxiedConnection 

96 from ..sql import Executable 

97 from ..sql.compiler import Compiled 

98 from ..sql.compiler import Linting 

99 from ..sql.compiler import ResultColumnsEntry 

100 from ..sql.dml import DMLState 

101 from ..sql.dml import UpdateBase 

102 from ..sql.elements import BindParameter 

103 from ..sql.schema import Column 

104 from ..sql.sqltypes import _JSON_VALUE 

105 from ..sql.type_api import _BindProcessorType 

106 from ..sql.type_api import _ResultProcessorType 

107 from ..sql.type_api import TypeEngine 

108 

109 

110# When we're handed literal SQL, ensure it's a SELECT query 

111SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE) 

112 

113 

114( 

115 CACHE_HIT, 

116 CACHE_MISS, 

117 CACHING_DISABLED, 

118 NO_CACHE_KEY, 

119 NO_DIALECT_SUPPORT, 

120) = list(CacheStats) 

121 

122 

123class _BackendsMultiReflection(Dialect): 

124 """Mixin providing single-table reflection wrappers that delegate to 

125 the corresponding ``get_multi_*`` methods. 

126 

127 Used by dialects that implement native multi-table reflection 

128 (PostgreSQL, Oracle, MSSQL). 

129 """ 

130 

131 def _value_or_raise(self, data, table, schema): 

132 try: 

133 return dict(data)[(schema, table)] 

134 except KeyError: 

135 raise exc.NoSuchTableError( 

136 f"{schema}.{table}" if schema else table 

137 ) from None 

138 

139 @reflection.cache 

140 def get_columns(self, connection, table_name, schema=None, **kw): 

141 data = self.get_multi_columns( 

142 connection, 

143 schema=schema, 

144 filter_names=[table_name], 

145 scope=ObjectScope.ANY, 

146 kind=ObjectKind.ANY, 

147 **kw, 

148 ) 

149 return self._value_or_raise(data, table_name, schema) 

150 

151 @reflection.cache 

152 def get_table_options(self, connection, table_name, schema=None, **kw): 

153 data = self.get_multi_table_options( 

154 connection, 

155 schema=schema, 

156 filter_names=[table_name], 

157 scope=ObjectScope.ANY, 

158 kind=ObjectKind.ANY, 

159 **kw, 

160 ) 

161 return self._value_or_raise(data, table_name, schema) 

162 

163 @reflection.cache 

164 def get_pk_constraint(self, connection, table_name, schema=None, **kw): 

165 data = self.get_multi_pk_constraint( 

166 connection, 

167 schema=schema, 

168 filter_names=[table_name], 

169 scope=ObjectScope.ANY, 

170 kind=ObjectKind.ANY, 

171 **kw, 

172 ) 

173 return self._value_or_raise(data, table_name, schema) 

174 

175 @reflection.cache 

176 def get_foreign_keys(self, connection, table_name, schema=None, **kw): 

177 data = self.get_multi_foreign_keys( 

178 connection, 

179 schema=schema, 

180 filter_names=[table_name], 

181 scope=ObjectScope.ANY, 

182 kind=ObjectKind.ANY, 

183 **kw, 

184 ) 

185 return self._value_or_raise(data, table_name, schema) 

186 

187 @reflection.cache 

188 def get_indexes(self, connection, table_name, schema=None, **kw): 

189 data = self.get_multi_indexes( 

190 connection, 

191 schema=schema, 

192 filter_names=[table_name], 

193 scope=ObjectScope.ANY, 

194 kind=ObjectKind.ANY, 

195 **kw, 

196 ) 

197 return self._value_or_raise(data, table_name, schema) 

198 

199 @reflection.cache 

200 def get_unique_constraints( 

201 self, connection, table_name, schema=None, **kw 

202 ): 

203 data = self.get_multi_unique_constraints( 

204 connection, 

205 schema=schema, 

206 filter_names=[table_name], 

207 scope=ObjectScope.ANY, 

208 kind=ObjectKind.ANY, 

209 **kw, 

210 ) 

211 return self._value_or_raise(data, table_name, schema) 

212 

213 @reflection.cache 

214 def get_check_constraints(self, connection, table_name, schema=None, **kw): 

215 data = self.get_multi_check_constraints( 

216 connection, 

217 schema=schema, 

218 filter_names=[table_name], 

219 scope=ObjectScope.ANY, 

220 kind=ObjectKind.ANY, 

221 **kw, 

222 ) 

223 return self._value_or_raise(data, table_name, schema) 

224 

225 @reflection.cache 

226 def get_table_comment(self, connection, table_name, schema=None, **kw): 

227 data = self.get_multi_table_comment( 

228 connection, 

229 schema=schema, 

230 filter_names=[table_name], 

231 scope=ObjectScope.ANY, 

232 kind=ObjectKind.ANY, 

233 **kw, 

234 ) 

235 return self._value_or_raise(data, table_name, schema) 

236 

237 

238class DefaultDialect(Dialect): 

239 """Default implementation of Dialect""" 

240 

241 statement_compiler = compiler.SQLCompiler 

242 ddl_compiler = compiler.DDLCompiler 

243 type_compiler_cls = compiler.GenericTypeCompiler 

244 

245 preparer = compiler.IdentifierPreparer 

246 supports_alter = True 

247 supports_comments = False 

248 supports_constraint_comments = False 

249 inline_comments = False 

250 supports_statement_cache = True 

251 

252 div_is_floordiv = True 

253 

254 bind_typing = interfaces.BindTyping.NONE 

255 

256 include_set_input_sizes: Optional[Set[Any]] = None 

257 exclude_set_input_sizes: Optional[Set[Any]] = None 

258 

259 # the first value we'd get for an autoincrement column. 

260 default_sequence_base = 1 

261 

262 # most DBAPIs happy with this for execute(). 

263 # not cx_oracle. 

264 execute_sequence_format = tuple 

265 

266 supports_schemas = True 

267 supports_views = True 

268 supports_sequences = False 

269 sequences_optional = False 

270 preexecute_autoincrement_sequences = False 

271 supports_identity_columns = False 

272 postfetch_lastrowid = True 

273 favor_returning_over_lastrowid = False 

274 insert_null_pk_still_autoincrements = False 

275 update_returning = False 

276 delete_returning = False 

277 update_returning_multifrom = False 

278 delete_returning_multifrom = False 

279 insert_returning = False 

280 

281 aggregate_order_by_style = AggregateOrderByStyle.INLINE 

282 

283 cte_follows_insert = False 

284 

285 supports_native_enum = False 

286 supports_native_boolean = False 

287 supports_native_uuid = False 

288 returns_native_bytes = False 

289 

290 supports_native_json_serialization = False 

291 supports_native_json_deserialization = False 

292 dialect_injects_custom_json_deserializer = False 

293 _json_serializer: Callable[[_JSON_VALUE], str] | None = None 

294 

295 _json_deserializer: Callable[[str], _JSON_VALUE] | None = None 

296 

297 non_native_boolean_check_constraint = True 

298 

299 supports_simple_order_by_label = True 

300 

301 tuple_in_values = False 

302 

303 connection_characteristics = util.immutabledict( 

304 { 

305 "isolation_level": characteristics.IsolationLevelCharacteristic(), 

306 "logging_token": characteristics.LoggingTokenCharacteristic(), 

307 } 

308 ) 

309 

310 engine_config_types: Mapping[str, Any] = util.immutabledict( 

311 { 

312 "pool_timeout": util.asint, 

313 "echo": util.bool_or_str("debug"), 

314 "echo_pool": util.bool_or_str("debug"), 

315 "pool_recycle": util.asint, 

316 "pool_size": util.asint, 

317 "max_overflow": util.asint, 

318 "future": util.asbool, 

319 } 

320 ) 

321 

322 # if the NUMERIC type 

323 # returns decimal.Decimal. 

324 # *not* the FLOAT type however. 

325 supports_native_decimal = False 

326 

327 name = "default" 

328 

329 # length at which to truncate 

330 # any identifier. 

331 max_identifier_length = 9999 

332 _user_defined_max_identifier_length: Optional[int] = None 

333 

334 isolation_level: Optional[str] = None 

335 

336 # sub-categories of max_identifier_length. 

337 # currently these accommodate for MySQL which allows alias names 

338 # of 255 but DDL names only of 64. 

339 max_index_name_length: Optional[int] = None 

340 max_constraint_name_length: Optional[int] = None 

341 

342 supports_sane_rowcount = True 

343 supports_sane_multi_rowcount = True 

344 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {} 

345 default_paramstyle = "named" 

346 

347 supports_default_values = False 

348 """dialect supports INSERT... DEFAULT VALUES syntax""" 

349 

350 supports_default_metavalue = False 

351 """dialect supports INSERT... VALUES (DEFAULT) syntax""" 

352 

353 default_metavalue_token = "DEFAULT" 

354 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 

355 parenthesis.""" 

356 

357 # not sure if this is a real thing but the compiler will deliver it 

358 # if this is the only flag enabled. 

359 supports_empty_insert = True 

360 """dialect supports INSERT () VALUES ()""" 

361 

362 supports_multivalues_insert = False 

363 

364 use_insertmanyvalues: bool = False 

365 

366 use_insertmanyvalues_wo_returning: bool = False 

367 

368 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = ( 

369 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED 

370 ) 

371 

372 insertmanyvalues_page_size: int = 1000 

373 insertmanyvalues_max_parameters = 32700 

374 

375 supports_is_distinct_from = True 

376 

377 supports_server_side_cursors = False 

378 

379 server_side_cursors = False 

380 

381 # extra record-level locking features (#4860) 

382 supports_for_update_of = False 

383 

384 server_version_info = None 

385 

386 default_schema_name: Optional[str] = None 

387 

388 # indicates symbol names are 

389 # UPPERCASED if they are case insensitive 

390 # within the database. 

391 # if this is True, the methods normalize_name() 

392 # and denormalize_name() must be provided. 

393 requires_name_normalize = False 

394 

395 is_async = False 

396 

397 has_terminate = False 

398 

399 # TODO: this is not to be part of 2.0. implement rudimentary binary 

400 # literals for SQLite, PostgreSQL, MySQL only within 

401 # _Binary.literal_processor 

402 _legacy_binary_type_literal_encoding = "utf-8" 

403 

404 @util.deprecated_params( 

405 empty_in_strategy=( 

406 "1.4", 

407 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is " 

408 "deprecated, and no longer has any effect. All IN expressions " 

409 "are now rendered using " 

410 'the "expanding parameter" strategy which renders a set of bound' 

411 'expressions, or an "empty set" SELECT, at statement execution' 

412 "time.", 

413 ), 

414 server_side_cursors=( 

415 "1.4", 

416 "The :paramref:`_sa.create_engine.server_side_cursors` parameter " 

417 "is deprecated and will be removed in a future release. Please " 

418 "use the " 

419 ":paramref:`_engine.Connection.execution_options.stream_results` " 

420 "parameter.", 

421 ), 

422 ) 

423 def __init__( 

424 self, 

425 paramstyle: Optional[_ParamStyle] = None, 

426 isolation_level: Optional[IsolationLevel] = None, 

427 dbapi: Optional[DBAPIModule] = None, 

428 implicit_returning: Literal[True] = True, 

429 supports_native_boolean: Optional[bool] = None, 

430 max_identifier_length: Optional[int] = None, 

431 label_length: Optional[int] = None, 

432 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG, 

433 use_insertmanyvalues: Optional[bool] = None, 

434 # util.deprecated_params decorator cannot render the 

435 # Linting.NO_LINTING constant 

436 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore 

437 server_side_cursors: bool = False, 

438 skip_autocommit_rollback: bool = False, 

439 **kwargs: Any, 

440 ): 

441 if server_side_cursors: 

442 if not self.supports_server_side_cursors: 

443 raise exc.ArgumentError( 

444 "Dialect %s does not support server side cursors" % self 

445 ) 

446 else: 

447 self.server_side_cursors = True 

448 

449 if getattr(self, "use_setinputsizes", False): 

450 util.warn_deprecated( 

451 "The dialect-level use_setinputsizes attribute is " 

452 "deprecated. Please use " 

453 "bind_typing = BindTyping.SETINPUTSIZES", 

454 "2.0", 

455 ) 

456 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES 

457 

458 self.positional = False 

459 self._ischema = None 

460 

461 self.dbapi = dbapi 

462 

463 self.skip_autocommit_rollback = skip_autocommit_rollback 

464 

465 if paramstyle is not None: 

466 self.paramstyle = paramstyle 

467 elif self.dbapi is not None: 

468 self.paramstyle = self.dbapi.paramstyle 

469 else: 

470 self.paramstyle = self.default_paramstyle 

471 self.positional = self.paramstyle in ( 

472 "qmark", 

473 "format", 

474 "numeric", 

475 "numeric_dollar", 

476 ) 

477 self.identifier_preparer = self.preparer(self) 

478 self._on_connect_isolation_level = isolation_level 

479 

480 legacy_tt_callable = getattr(self, "type_compiler", None) 

481 if legacy_tt_callable is not None: 

482 tt_callable = cast( 

483 Type[compiler.GenericTypeCompiler], 

484 self.type_compiler, 

485 ) 

486 else: 

487 tt_callable = self.type_compiler_cls 

488 

489 self.type_compiler_instance = self.type_compiler = tt_callable(self) 

490 

491 if supports_native_boolean is not None: 

492 self.supports_native_boolean = supports_native_boolean 

493 

494 self._user_defined_max_identifier_length = max_identifier_length 

495 if self._user_defined_max_identifier_length: 

496 self.max_identifier_length = ( 

497 self._user_defined_max_identifier_length 

498 ) 

499 self.label_length = label_length 

500 self.compiler_linting = compiler_linting 

501 

502 if use_insertmanyvalues is not None: 

503 self.use_insertmanyvalues = use_insertmanyvalues 

504 

505 if insertmanyvalues_page_size is not _NoArg.NO_ARG: 

506 self.insertmanyvalues_page_size = insertmanyvalues_page_size 

507 

508 @property 

509 @util.deprecated( 

510 "2.0", 

511 "full_returning is deprecated, please use insert_returning, " 

512 "update_returning, delete_returning", 

513 ) 

514 def full_returning(self): 

515 return ( 

516 self.insert_returning 

517 and self.update_returning 

518 and self.delete_returning 

519 ) 

520 

521 @util.memoized_property 

522 def insert_executemany_returning(self): 

523 """Default implementation for insert_executemany_returning, if not 

524 otherwise overridden by the specific dialect. 

525 

526 The default dialect determines "insert_executemany_returning" is 

527 available if the dialect in use has opted into using the 

528 "use_insertmanyvalues" feature. If they haven't opted into that, then 

529 this attribute is False, unless the dialect in question overrides this 

530 and provides some other implementation (such as the Oracle Database 

531 dialects). 

532 

533 """ 

534 return self.insert_returning and self.use_insertmanyvalues 

535 

536 @util.memoized_property 

537 def insert_executemany_returning_sort_by_parameter_order(self): 

538 """Default implementation for 

539 insert_executemany_returning_deterministic_order, if not otherwise 

540 overridden by the specific dialect. 

541 

542 The default dialect determines "insert_executemany_returning" can have 

543 deterministic order only if the dialect in use has opted into using the 

544 "use_insertmanyvalues" feature, which implements deterministic ordering 

545 using client side sentinel columns only by default. The 

546 "insertmanyvalues" feature also features alternate forms that can 

547 use server-generated PK values as "sentinels", but those are only 

548 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel` 

549 bitflag enables those alternate SQL forms, which are disabled 

550 by default. 

551 

552 If the dialect in use hasn't opted into that, then this attribute is 

553 False, unless the dialect in question overrides this and provides some 

554 other implementation (such as the Oracle Database dialects). 

555 

556 """ 

557 return self.insert_returning and self.use_insertmanyvalues 

558 

559 update_executemany_returning = False 

560 delete_executemany_returning = False 

561 

562 @util.memoized_property 

563 def loaded_dbapi(self) -> DBAPIModule: 

564 if self.dbapi is None: 

565 raise exc.InvalidRequestError( 

566 f"Dialect {self} does not have a Python DBAPI established " 

567 "and cannot be used for actual database interaction" 

568 ) 

569 return self.dbapi 

570 

571 @util.memoized_property 

572 def _bind_typing_render_casts(self): 

573 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS 

574 

575 def _ensure_has_table_connection(self, arg: Connection) -> None: 

576 if not isinstance(arg, Connection): 

577 raise exc.ArgumentError( 

578 "The argument passed to Dialect.has_table() should be a " 

579 "%s, got %s. " 

580 "Additionally, the Dialect.has_table() method is for " 

581 "internal dialect " 

582 "use only; please use " 

583 "``inspect(some_engine).has_table(<tablename>>)`` " 

584 "for public API use." % (Connection, type(arg)) 

585 ) 

586 

587 @util.memoized_property 

588 def _supports_statement_cache(self): 

589 ssc = self.__class__.__dict__.get("supports_statement_cache", None) 

590 if ssc is None: 

591 util.warn( 

592 "Dialect %s:%s will not make use of SQL compilation caching " 

593 "as it does not set the 'supports_statement_cache' attribute " 

594 "to ``True``. This can have " 

595 "significant performance implications including some " 

596 "performance degradations in comparison to prior SQLAlchemy " 

597 "versions. Dialect maintainers should seek to set this " 

598 "attribute to True after appropriate development and testing " 

599 "for SQLAlchemy 1.4 caching support. Alternatively, this " 

600 "attribute may be set to False which will disable this " 

601 "warning." % (self.name, self.driver), 

602 code="cprf", 

603 ) 

604 

605 return bool(ssc) 

606 

607 @util.memoized_property 

608 def _type_memos(self): 

609 return weakref.WeakKeyDictionary() 

610 

611 @property 

612 def dialect_description(self): # type: ignore[override] 

613 return self.name + "+" + self.driver 

614 

615 @property 

616 def supports_sane_rowcount_returning(self): 

617 """True if this dialect supports sane rowcount even if RETURNING is 

618 in use. 

619 

620 For dialects that don't support RETURNING, this is synonymous with 

621 ``supports_sane_rowcount``. 

622 

623 """ 

624 return self.supports_sane_rowcount 

625 

626 @classmethod 

627 def get_pool_class(cls, url: URL) -> Type[Pool]: 

628 default: Type[pool.Pool] 

629 if cls.is_async: 

630 default = pool.AsyncAdaptedQueuePool 

631 else: 

632 default = pool.QueuePool 

633 

634 return getattr(cls, "poolclass", default) 

635 

636 def get_dialect_pool_class(self, url: URL) -> Type[Pool]: 

637 return self.get_pool_class(url) 

638 

639 @classmethod 

640 def load_provisioning(cls): 

641 package = ".".join(cls.__module__.split(".")[0:-1]) 

642 try: 

643 __import__(package + ".provision") 

644 except ImportError: 

645 pass 

646 

647 def _builtin_onconnect(self) -> Optional[_ListenerFnType]: 

648 if self._on_connect_isolation_level is not None: 

649 

650 def builtin_connect(dbapi_conn, conn_rec): 

651 self._assert_and_set_isolation_level( 

652 dbapi_conn, self._on_connect_isolation_level 

653 ) 

654 

655 return builtin_connect 

656 else: 

657 return None 

658 

659 def initialize(self, connection: Connection) -> None: 

660 try: 

661 self.server_version_info = self._get_server_version_info( 

662 connection 

663 ) 

664 except NotImplementedError: 

665 self.server_version_info = None 

666 try: 

667 self.default_schema_name = self._get_default_schema_name( 

668 connection 

669 ) 

670 except NotImplementedError: 

671 self.default_schema_name = None 

672 

673 try: 

674 self.default_isolation_level = self.get_default_isolation_level( 

675 connection.connection.dbapi_connection 

676 ) 

677 except NotImplementedError: 

678 self.default_isolation_level = None 

679 

680 if not self._user_defined_max_identifier_length: 

681 max_ident_length = self._check_max_identifier_length(connection) 

682 if max_ident_length: 

683 self.max_identifier_length = max_ident_length 

684 

685 if ( 

686 self.label_length 

687 and self.label_length > self.max_identifier_length 

688 ): 

689 raise exc.ArgumentError( 

690 "Label length of %d is greater than this dialect's" 

691 " maximum identifier length of %d" 

692 % (self.label_length, self.max_identifier_length) 

693 ) 

694 

695 def on_connect(self) -> Optional[Callable[[Any], None]]: 

696 # inherits the docstring from interfaces.Dialect.on_connect 

697 return None 

698 

699 def _check_max_identifier_length(self, connection): 

700 """Perform a connection / server version specific check to determine 

701 the max_identifier_length. 

702 

703 If the dialect's class level max_identifier_length should be used, 

704 can return None. 

705 

706 """ 

707 return None 

708 

709 def get_default_isolation_level(self, dbapi_conn): 

710 """Given a DBAPI connection, return its isolation level, or 

711 a default isolation level if one cannot be retrieved. 

712 

713 May be overridden by subclasses in order to provide a 

714 "fallback" isolation level for databases that cannot reliably 

715 retrieve the actual isolation level. 

716 

717 By default, calls the :meth:`_engine.Interfaces.get_isolation_level` 

718 method, propagating any exceptions raised. 

719 

720 """ 

721 return self.get_isolation_level(dbapi_conn) 

722 

723 def type_descriptor(self, typeobj): 

724 """Provide a database-specific :class:`.TypeEngine` object, given 

725 the generic object which comes from the types module. 

726 

727 This method looks for a dictionary called 

728 ``colspecs`` as a class or instance-level variable, 

729 and passes on to :func:`_types.adapt_type`. 

730 

731 """ 

732 return type_api.adapt_type(typeobj, self.colspecs) 

733 

734 def has_index(self, connection, table_name, index_name, schema=None, **kw): 

735 if not self.has_table(connection, table_name, schema=schema, **kw): 

736 return False 

737 for idx in self.get_indexes( 

738 connection, table_name, schema=schema, **kw 

739 ): 

740 if idx["name"] == index_name: 

741 return True 

742 else: 

743 return False 

744 

745 def has_schema( 

746 self, connection: Connection, schema_name: str, **kw: Any 

747 ) -> bool: 

748 return schema_name in self.get_schema_names(connection, **kw) 

749 

750 def validate_identifier(self, ident: str) -> None: 

751 if len(ident) > self.max_identifier_length: 

752 raise exc.IdentifierError( 

753 "Identifier '%s' exceeds maximum length of %d characters" 

754 % (ident, self.max_identifier_length) 

755 ) 

756 

757 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection: 

758 # inherits the docstring from interfaces.Dialect.connect 

759 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501 

760 

761 def create_connect_args(self, url: URL) -> ConnectArgsType: 

762 # inherits the docstring from interfaces.Dialect.create_connect_args 

763 opts = url.translate_connect_args() 

764 opts.update(url.query) 

765 return ([], opts) 

766 

767 def set_engine_execution_options( 

768 self, engine: Engine, opts: Mapping[str, Any] 

769 ) -> None: 

770 supported_names = set(self.connection_characteristics).intersection( 

771 opts 

772 ) 

773 if supported_names: 

774 characteristics: Mapping[str, Any] = util.immutabledict( 

775 (name, opts[name]) for name in supported_names 

776 ) 

777 

778 @event.listens_for(engine, "engine_connect") 

779 def set_connection_characteristics(connection): 

780 self._set_connection_characteristics( 

781 connection, characteristics 

782 ) 

783 

784 def set_connection_execution_options( 

785 self, connection: Connection, opts: Mapping[str, Any] 

786 ) -> None: 

787 supported_names = set(self.connection_characteristics).intersection( 

788 opts 

789 ) 

790 if supported_names: 

791 characteristics: Mapping[str, Any] = util.immutabledict( 

792 (name, opts[name]) for name in supported_names 

793 ) 

794 self._set_connection_characteristics(connection, characteristics) 

795 

796 def _set_connection_characteristics(self, connection, characteristics): 

797 characteristic_values = [ 

798 (name, self.connection_characteristics[name], value) 

799 for name, value in characteristics.items() 

800 ] 

801 

802 if connection.in_transaction(): 

803 trans_objs = [ 

804 (name, obj) 

805 for name, obj, _ in characteristic_values 

806 if obj.transactional 

807 ] 

808 if trans_objs: 

809 raise exc.InvalidRequestError( 

810 "This connection has already initialized a SQLAlchemy " 

811 "Transaction() object via begin() or autobegin; " 

812 "%s may not be altered unless rollback() or commit() " 

813 "is called first." 

814 % (", ".join(name for name, obj in trans_objs)) 

815 ) 

816 

817 dbapi_connection = connection.connection.dbapi_connection 

818 for _, characteristic, value in characteristic_values: 

819 characteristic.set_connection_characteristic( 

820 self, connection, dbapi_connection, value 

821 ) 

822 connection.connection._connection_record.finalize_callback.append( 

823 functools.partial(self._reset_characteristics, characteristics) 

824 ) 

825 

826 def _reset_characteristics(self, characteristics, dbapi_connection): 

827 for characteristic_name in characteristics: 

828 characteristic = self.connection_characteristics[ 

829 characteristic_name 

830 ] 

831 characteristic.reset_characteristic(self, dbapi_connection) 

832 

833 def do_begin(self, dbapi_connection): 

834 pass 

835 

836 def do_rollback(self, dbapi_connection): 

837 if self.skip_autocommit_rollback and self.detect_autocommit_setting( 

838 dbapi_connection 

839 ): 

840 return 

841 dbapi_connection.rollback() 

842 

843 def do_commit(self, dbapi_connection): 

844 dbapi_connection.commit() 

845 

846 def do_terminate(self, dbapi_connection): 

847 self.do_close(dbapi_connection) 

848 

849 def do_close(self, dbapi_connection): 

850 dbapi_connection.close() 

851 

852 @util.memoized_property 

853 def _dialect_specific_select_one(self): 

854 return str(expression.select(1).compile(dialect=self)) 

855 

856 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: 

857 try: 

858 return self.do_ping(dbapi_connection) 

859 except self.loaded_dbapi.Error as err: 

860 is_disconnect = self.is_disconnect(err, dbapi_connection, None) 

861 

862 if self._has_events: 

863 try: 

864 Connection._handle_dbapi_exception_noconnection( 

865 err, 

866 self, 

867 is_disconnect=is_disconnect, 

868 invalidate_pool_on_disconnect=False, 

869 is_pre_ping=True, 

870 ) 

871 except exc.StatementError as new_err: 

872 is_disconnect = new_err.connection_invalidated 

873 

874 if is_disconnect: 

875 return False 

876 else: 

877 raise 

878 

879 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool: 

880 cursor = dbapi_connection.cursor() 

881 try: 

882 cursor.execute(self._dialect_specific_select_one) 

883 finally: 

884 cursor.close() 

885 return True 

886 

887 def create_xid(self): 

888 """Create a random two-phase transaction ID. 

889 

890 This id will be passed to do_begin_twophase(), do_rollback_twophase(), 

891 do_commit_twophase(). Its format is unspecified. 

892 """ 

893 

894 return "_sa_%032x" % random.randint(0, 2**128) 

895 

896 def do_savepoint(self, connection, name): 

897 connection.execute(expression.SavepointClause(name)) 

898 

899 def do_rollback_to_savepoint(self, connection, name): 

900 connection.execute(expression.RollbackToSavepointClause(name)) 

901 

902 def do_release_savepoint(self, connection, name): 

903 connection.execute(expression.ReleaseSavepointClause(name)) 

904 

905 def _deliver_insertmanyvalues_batches( 

906 self, 

907 connection, 

908 cursor, 

909 statement, 

910 parameters, 

911 generic_setinputsizes, 

912 context, 

913 ): 

914 context = cast(DefaultExecutionContext, context) 

915 compiled = cast(SQLCompiler, context.compiled) 

916 

917 _composite_sentinel_proc: Sequence[ 

918 Optional[_ResultProcessorType[Any]] 

919 ] = () 

920 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None 

921 _sentinel_proc_initialized: bool = False 

922 

923 compiled_parameters = context.compiled_parameters 

924 

925 imv = compiled._insertmanyvalues 

926 assert imv is not None 

927 

928 is_returning: Final[bool] = bool(compiled.effective_returning) 

929 batch_size = context.execution_options.get( 

930 "insertmanyvalues_page_size", self.insertmanyvalues_page_size 

931 ) 

932 

933 if compiled.schema_translate_map: 

934 schema_translate_map = context.execution_options.get( 

935 "schema_translate_map", {} 

936 ) 

937 else: 

938 schema_translate_map = None 

939 

940 if is_returning: 

941 result: Optional[List[Any]] = [] 

942 context._insertmanyvalues_rows = result 

943 

944 sort_by_parameter_order = imv.sort_by_parameter_order 

945 

946 else: 

947 sort_by_parameter_order = False 

948 result = None 

949 

950 for imv_batch in compiled._deliver_insertmanyvalues_batches( 

951 statement, 

952 parameters, 

953 compiled_parameters, 

954 generic_setinputsizes, 

955 batch_size, 

956 sort_by_parameter_order, 

957 schema_translate_map, 

958 ): 

959 yield imv_batch 

960 

961 if is_returning: 

962 

963 try: 

964 rows = context.fetchall_for_returning(cursor) 

965 except BaseException as be: 

966 connection._handle_dbapi_exception( 

967 be, 

968 sql_util._long_statement(imv_batch.replaced_statement), 

969 imv_batch.replaced_parameters, 

970 None, 

971 context, 

972 is_sub_exec=True, 

973 ) 

974 

975 # I would have thought "is_returning: Final[bool]" 

976 # would have assured this but pylance thinks not 

977 assert result is not None 

978 

979 if imv.num_sentinel_columns and not imv_batch.is_downgraded: 

980 composite_sentinel = imv.num_sentinel_columns > 1 

981 if imv.implicit_sentinel: 

982 # for implicit sentinel, which is currently single-col 

983 # integer autoincrement, do a simple sort. 

984 assert not composite_sentinel 

985 result.extend( 

986 sorted(rows, key=operator.itemgetter(-1)) 

987 ) 

988 continue 

989 

990 # otherwise, create dictionaries to match up batches 

991 # with parameters 

992 assert imv.sentinel_param_keys 

993 assert imv.sentinel_columns 

994 

995 _nsc = imv.num_sentinel_columns 

996 

997 if not _sentinel_proc_initialized: 

998 if composite_sentinel: 

999 _composite_sentinel_proc = [ 

1000 col.type._cached_result_processor( 

1001 self, cursor_desc[1] 

1002 ) 

1003 for col, cursor_desc in zip( 

1004 imv.sentinel_columns, 

1005 cursor.description[-_nsc:], 

1006 ) 

1007 ] 

1008 else: 

1009 _scalar_sentinel_proc = ( 

1010 imv.sentinel_columns[0] 

1011 ).type._cached_result_processor( 

1012 self, cursor.description[-1][1] 

1013 ) 

1014 _sentinel_proc_initialized = True 

1015 

1016 rows_by_sentinel: Union[ 

1017 Dict[Tuple[Any, ...], Any], 

1018 Dict[Any, Any], 

1019 ] 

1020 

1021 if composite_sentinel: 

1022 rows_by_sentinel = { 

1023 tuple( 

1024 (proc(val) if proc else val) 

1025 for val, proc in zip( 

1026 row[-_nsc:], _composite_sentinel_proc 

1027 ) 

1028 ): row 

1029 for row in rows 

1030 } 

1031 elif _scalar_sentinel_proc: 

1032 rows_by_sentinel = { 

1033 _scalar_sentinel_proc(row[-1]): row for row in rows 

1034 } 

1035 else: 

1036 rows_by_sentinel = {row[-1]: row for row in rows} 

1037 

1038 if len(rows_by_sentinel) != len(imv_batch.batch): 

1039 # see test_insert_exec.py:: 

1040 # IMVSentinelTest::test_sentinel_incorrect_rowcount 

1041 # for coverage / demonstration 

1042 raise exc.InvalidRequestError( 

1043 f"Sentinel-keyed result set did not produce " 

1044 f"correct number of rows {len(imv_batch.batch)}; " 

1045 "produced " 

1046 f"{len(rows_by_sentinel)}. Please ensure the " 

1047 "sentinel column is fully unique and populated in " 

1048 "all cases." 

1049 ) 

1050 

1051 try: 

1052 ordered_rows = [ 

1053 rows_by_sentinel[sentinel_keys] 

1054 for sentinel_keys in imv_batch.sentinel_values 

1055 ] 

1056 except KeyError as ke: 

1057 # see test_insert_exec.py:: 

1058 # IMVSentinelTest::test_sentinel_cant_match_keys 

1059 # for coverage / demonstration 

1060 raise exc.InvalidRequestError( 

1061 f"Can't match sentinel values in result set to " 

1062 f"parameter sets; key {ke.args[0]!r} was not " 

1063 "found. " 

1064 "There may be a mismatch between the datatype " 

1065 "passed to the DBAPI driver vs. that which it " 

1066 "returns in a result row. Ensure the given " 

1067 "Python value matches the expected result type " 

1068 "*exactly*, taking care to not rely upon implicit " 

1069 "conversions which may occur such as when using " 

1070 "strings in place of UUID or integer values, etc. " 

1071 ) from ke 

1072 

1073 result.extend(ordered_rows) 

1074 

1075 else: 

1076 result.extend(rows) 

1077 

1078 def do_executemany(self, cursor, statement, parameters, context=None): 

1079 cursor.executemany(statement, parameters) 

1080 

1081 def do_execute(self, cursor, statement, parameters, context=None): 

1082 cursor.execute(statement, parameters) 

1083 

1084 def do_execute_no_params(self, cursor, statement, context=None): 

1085 cursor.execute(statement) 

1086 

1087 def is_disconnect( 

1088 self, 

1089 e: DBAPIModule.Error, 

1090 connection: Union[ 

1091 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None 

1092 ], 

1093 cursor: Optional[interfaces.DBAPICursor], 

1094 ) -> bool: 

1095 return False 

1096 

1097 @util.memoized_instancemethod 

1098 def _gen_allowed_isolation_levels(self, dbapi_conn): 

1099 try: 

1100 raw_levels = list(self.get_isolation_level_values(dbapi_conn)) 

1101 except NotImplementedError: 

1102 return None 

1103 else: 

1104 normalized_levels = [ 

1105 level.replace("_", " ").upper() for level in raw_levels 

1106 ] 

1107 if raw_levels != normalized_levels: 

1108 raise ValueError( 

1109 f"Dialect {self.name!r} get_isolation_level_values() " 

1110 f"method should return names as UPPERCASE using spaces, " 

1111 f"not underscores; got " 

1112 f"{sorted(set(raw_levels).difference(normalized_levels))}" 

1113 ) 

1114 return tuple(normalized_levels) 

1115 

1116 def _assert_and_set_isolation_level(self, dbapi_conn, level): 

1117 level = level.replace("_", " ").upper() 

1118 

1119 _allowed_isolation_levels = self._gen_allowed_isolation_levels( 

1120 dbapi_conn 

1121 ) 

1122 if ( 

1123 _allowed_isolation_levels 

1124 and level not in _allowed_isolation_levels 

1125 ): 

1126 raise exc.ArgumentError( 

1127 f"Invalid value {level!r} for isolation_level. " 

1128 f"Valid isolation levels for {self.name!r} are " 

1129 f"{', '.join(_allowed_isolation_levels)}" 

1130 ) 

1131 

1132 self.set_isolation_level(dbapi_conn, level) 

1133 

1134 def reset_isolation_level(self, dbapi_conn): 

1135 if self._on_connect_isolation_level is not None: 

1136 assert ( 

1137 self._on_connect_isolation_level == "AUTOCOMMIT" 

1138 or self._on_connect_isolation_level 

1139 == self.default_isolation_level 

1140 ) 

1141 self._assert_and_set_isolation_level( 

1142 dbapi_conn, self._on_connect_isolation_level 

1143 ) 

1144 else: 

1145 assert self.default_isolation_level is not None 

1146 self._assert_and_set_isolation_level( 

1147 dbapi_conn, 

1148 self.default_isolation_level, 

1149 ) 

1150 

1151 def normalize_name(self, name): 

1152 if name is None: 

1153 return None 

1154 

1155 name_lower = name.lower() 

1156 name_upper = name.upper() 

1157 

1158 if name_upper == name_lower: 

1159 # name has no upper/lower conversion, e.g. non-european characters. 

1160 # return unchanged 

1161 return name 

1162 elif name_upper == name and not ( 

1163 self.identifier_preparer._requires_quotes 

1164 )(name_lower): 

1165 # name is all uppercase and doesn't require quoting; normalize 

1166 # to all lower case 

1167 return name_lower 

1168 elif name_lower == name: 

1169 # name is all lower case, which if denormalized means we need to 

1170 # force quoting on it 

1171 return quoted_name(name, quote=True) 

1172 else: 

1173 # name is mixed case, means it will be quoted in SQL when used 

1174 # later, no normalizes 

1175 return name 

1176 

1177 def denormalize_name(self, name): 

1178 if name is None: 

1179 return None 

1180 

1181 name_lower = name.lower() 

1182 name_upper = name.upper() 

1183 

1184 if name_upper == name_lower: 

1185 # name has no upper/lower conversion, e.g. non-european characters. 

1186 # return unchanged 

1187 return name 

1188 elif name_lower == name and not ( 

1189 self.identifier_preparer._requires_quotes 

1190 )(name_lower): 

1191 name = name_upper 

1192 return name 

1193 

1194 def get_driver_connection(self, connection: DBAPIConnection) -> Any: 

1195 return connection 

1196 

1197 def _overrides_default(self, method): 

1198 return ( 

1199 getattr(type(self), method).__code__ 

1200 is not getattr(DefaultDialect, method).__code__ 

1201 ) 

1202 

1203 def _default_multi_reflect( 

1204 self, 

1205 single_tbl_method, 

1206 connection, 

1207 kind, 

1208 schema, 

1209 filter_names, 

1210 scope, 

1211 **kw, 

1212 ): 

1213 names_fns = [] 

1214 temp_names_fns = [] 

1215 if ObjectKind.TABLE in kind: 

1216 names_fns.append(self.get_table_names) 

1217 temp_names_fns.append(self.get_temp_table_names) 

1218 if ObjectKind.VIEW in kind: 

1219 names_fns.append(self.get_view_names) 

1220 temp_names_fns.append(self.get_temp_view_names) 

1221 if ObjectKind.MATERIALIZED_VIEW in kind: 

1222 names_fns.append(self.get_materialized_view_names) 

1223 # no temp materialized view at the moment 

1224 # temp_names_fns.append(self.get_temp_materialized_view_names) 

1225 

1226 unreflectable = kw.pop("unreflectable", {}) 

1227 

1228 if ( 

1229 filter_names 

1230 and scope is ObjectScope.ANY 

1231 and kind is ObjectKind.ANY 

1232 ): 

1233 # if names are given and no qualification on type of table 

1234 # (i.e. the Table(..., autoload) case), take the names as given, 

1235 # don't run names queries. If a table does not exit 

1236 # NoSuchTableError is raised and it's skipped 

1237 

1238 # this also suits the case for mssql where we can reflect 

1239 # individual temp tables but there's no temp_names_fn 

1240 names = filter_names 

1241 else: 

1242 names = [] 

1243 name_kw = {"schema": schema, **kw} 

1244 fns = [] 

1245 if ObjectScope.DEFAULT in scope: 

1246 fns.extend(names_fns) 

1247 if ObjectScope.TEMPORARY in scope: 

1248 fns.extend(temp_names_fns) 

1249 

1250 for fn in fns: 

1251 try: 

1252 names.extend(fn(connection, **name_kw)) 

1253 except NotImplementedError: 

1254 pass 

1255 

1256 if filter_names: 

1257 filter_names = set(filter_names) 

1258 

1259 # iterate over all the tables/views and call the single table method 

1260 for table in names: 

1261 if not filter_names or table in filter_names: 

1262 key = (schema, table) 

1263 try: 

1264 yield ( 

1265 key, 

1266 single_tbl_method( 

1267 connection, table, schema=schema, **kw 

1268 ), 

1269 ) 

1270 except exc.UnreflectableTableError as err: 

1271 if key not in unreflectable: 

1272 unreflectable[key] = err 

1273 except exc.NoSuchTableError: 

1274 pass 

1275 

1276 def get_multi_table_options(self, connection, **kw): 

1277 return self._default_multi_reflect( 

1278 self.get_table_options, connection, **kw 

1279 ) 

1280 

1281 def get_multi_columns(self, connection, **kw): 

1282 return self._default_multi_reflect(self.get_columns, connection, **kw) 

1283 

1284 def get_multi_pk_constraint(self, connection, **kw): 

1285 return self._default_multi_reflect( 

1286 self.get_pk_constraint, connection, **kw 

1287 ) 

1288 

1289 def get_multi_foreign_keys(self, connection, **kw): 

1290 return self._default_multi_reflect( 

1291 self.get_foreign_keys, connection, **kw 

1292 ) 

1293 

1294 def get_multi_indexes(self, connection, **kw): 

1295 return self._default_multi_reflect(self.get_indexes, connection, **kw) 

1296 

1297 def get_multi_unique_constraints(self, connection, **kw): 

1298 return self._default_multi_reflect( 

1299 self.get_unique_constraints, connection, **kw 

1300 ) 

1301 

1302 def get_multi_check_constraints(self, connection, **kw): 

1303 return self._default_multi_reflect( 

1304 self.get_check_constraints, connection, **kw 

1305 ) 

1306 

1307 def get_multi_table_comment(self, connection, **kw): 

1308 return self._default_multi_reflect( 

1309 self.get_table_comment, connection, **kw 

1310 ) 

1311 

1312 

1313class StrCompileDialect(DefaultDialect): 

1314 statement_compiler = compiler.StrSQLCompiler 

1315 ddl_compiler = compiler.DDLCompiler 

1316 type_compiler_cls = compiler.StrSQLTypeCompiler 

1317 preparer = compiler.IdentifierPreparer 

1318 

1319 insert_returning = True 

1320 update_returning = True 

1321 delete_returning = True 

1322 

1323 supports_statement_cache = True 

1324 

1325 supports_identity_columns = True 

1326 

1327 supports_sequences = True 

1328 sequences_optional = True 

1329 preexecute_autoincrement_sequences = False 

1330 

1331 supports_native_boolean = True 

1332 

1333 supports_multivalues_insert = True 

1334 supports_simple_order_by_label = True 

1335 

1336 

1337class DefaultExecutionContext(ExecutionContext): 

1338 isinsert = False 

1339 isupdate = False 

1340 isdelete = False 

1341 is_crud = False 

1342 is_text = False 

1343 isddl = False 

1344 

1345 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE 

1346 

1347 compiled: Optional[Compiled] = None 

1348 result_column_struct: Optional[ 

1349 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool] 

1350 ] = None 

1351 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None 

1352 

1353 execution_options: _ExecuteOptions = util.EMPTY_DICT 

1354 

1355 cursor_fetch_strategy = _cursor._DEFAULT_FETCH 

1356 

1357 invoked_statement: Optional[Executable] = None 

1358 

1359 _is_implicit_returning = False 

1360 _is_explicit_returning = False 

1361 _is_supplemental_returning = False 

1362 _is_server_side = False 

1363 

1364 _soft_closed = False 

1365 

1366 _rowcount: Optional[int] = None 

1367 

1368 # a hook for SQLite's translation of 

1369 # result column names 

1370 # NOTE: pyhive is using this hook, can't remove it :( 

1371 _translate_colname: Optional[ 

1372 Callable[[str], Tuple[str, Optional[str]]] 

1373 ] = None 

1374 

1375 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict() 

1376 """used by set_input_sizes(). 

1377 

1378 This collection comes from ``ExpandedState.parameter_expansion``. 

1379 

1380 """ 

1381 

1382 cache_hit = NO_CACHE_KEY 

1383 

1384 root_connection: Connection 

1385 _dbapi_connection: PoolProxiedConnection 

1386 dialect: Dialect 

1387 unicode_statement: str 

1388 cursor: DBAPICursor 

1389 compiled_parameters: List[_MutableCoreSingleExecuteParams] 

1390 parameters: _DBAPIMultiExecuteParams 

1391 extracted_parameters: Optional[Sequence[BindParameter[Any]]] 

1392 

1393 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) 

1394 

1395 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None 

1396 _num_sentinel_cols: int = 0 

1397 

1398 @classmethod 

1399 def _init_ddl( 

1400 cls, 

1401 dialect: Dialect, 

1402 connection: Connection, 

1403 dbapi_connection: PoolProxiedConnection, 

1404 execution_options: _ExecuteOptions, 

1405 compiled_ddl: DDLCompiler, 

1406 ) -> ExecutionContext: 

1407 """Initialize execution context for an ExecutableDDLElement 

1408 construct.""" 

1409 

1410 self = cls.__new__(cls) 

1411 self.root_connection = connection 

1412 self._dbapi_connection = dbapi_connection 

1413 self.dialect = connection.dialect 

1414 

1415 self.compiled = compiled = compiled_ddl 

1416 self.isddl = True 

1417 

1418 self.execution_options = execution_options 

1419 

1420 self.unicode_statement = str(compiled) 

1421 if compiled.schema_translate_map: 

1422 schema_translate_map = self.execution_options.get( 

1423 "schema_translate_map", {} 

1424 ) 

1425 

1426 rst = compiled.preparer._render_schema_translates 

1427 self.unicode_statement = rst( 

1428 self.unicode_statement, schema_translate_map 

1429 ) 

1430 

1431 self.statement = self.unicode_statement 

1432 

1433 self.cursor = self.create_cursor() 

1434 self.compiled_parameters = [] 

1435 

1436 if dialect.positional: 

1437 self.parameters = [dialect.execute_sequence_format()] 

1438 else: 

1439 self.parameters = [self._empty_dict_params] 

1440 

1441 return self 

1442 

1443 @classmethod 

1444 def _init_compiled( 

1445 cls, 

1446 dialect: Dialect, 

1447 connection: Connection, 

1448 dbapi_connection: PoolProxiedConnection, 

1449 execution_options: _ExecuteOptions, 

1450 compiled: SQLCompiler, 

1451 parameters: _CoreMultiExecuteParams, 

1452 invoked_statement: Executable, 

1453 extracted_parameters: Optional[Sequence[BindParameter[Any]]], 

1454 cache_hit: CacheStats = CacheStats.CACHING_DISABLED, 

1455 param_dict: _CoreSingleExecuteParams | None = None, 

1456 ) -> ExecutionContext: 

1457 """Initialize execution context for a Compiled construct.""" 

1458 

1459 self = cls.__new__(cls) 

1460 self.root_connection = connection 

1461 self._dbapi_connection = dbapi_connection 

1462 self.dialect = connection.dialect 

1463 self.extracted_parameters = extracted_parameters 

1464 self.invoked_statement = invoked_statement 

1465 self.compiled = compiled 

1466 self.cache_hit = cache_hit 

1467 

1468 self.execution_options = execution_options 

1469 

1470 self.result_column_struct = ( 

1471 compiled._result_columns, 

1472 compiled._ordered_columns, 

1473 compiled._textual_ordered_columns, 

1474 compiled._ad_hoc_textual, 

1475 compiled._loose_column_name_matching, 

1476 ) 

1477 

1478 self.isinsert = ii = compiled.isinsert 

1479 self.isupdate = iu = compiled.isupdate 

1480 self.isdelete = id_ = compiled.isdelete 

1481 self.is_text = compiled.isplaintext 

1482 

1483 if ii or iu or id_: 

1484 dml_statement = compiled.compile_state.statement # type: ignore 

1485 if TYPE_CHECKING: 

1486 assert isinstance(dml_statement, UpdateBase) 

1487 self.is_crud = True 

1488 self._is_explicit_returning = ier = bool(dml_statement._returning) 

1489 self._is_implicit_returning = iir = bool( 

1490 compiled.implicit_returning 

1491 ) 

1492 if iir and dml_statement._supplemental_returning: 

1493 self._is_supplemental_returning = True 

1494 

1495 # dont mix implicit and explicit returning 

1496 assert not (iir and ier) 

1497 

1498 if (ier or iir) and compiled.for_executemany: 

1499 if ii and not self.dialect.insert_executemany_returning: 

1500 raise exc.InvalidRequestError( 

1501 f"Dialect {self.dialect.dialect_description} with " 

1502 f"current server capabilities does not support " 

1503 "INSERT..RETURNING when executemany is used" 

1504 ) 

1505 elif ( 

1506 ii 

1507 and dml_statement._sort_by_parameter_order 

1508 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501 

1509 ): 

1510 raise exc.InvalidRequestError( 

1511 f"Dialect {self.dialect.dialect_description} with " 

1512 f"current server capabilities does not support " 

1513 "INSERT..RETURNING with deterministic row ordering " 

1514 "when executemany is used" 

1515 ) 

1516 elif ( 

1517 ii 

1518 and self.dialect.use_insertmanyvalues 

1519 and not compiled._insertmanyvalues 

1520 ): 

1521 raise exc.InvalidRequestError( 

1522 'Statement does not have "insertmanyvalues" ' 

1523 "enabled, can't use INSERT..RETURNING with " 

1524 "executemany in this case." 

1525 ) 

1526 elif iu and not self.dialect.update_executemany_returning: 

1527 raise exc.InvalidRequestError( 

1528 f"Dialect {self.dialect.dialect_description} with " 

1529 f"current server capabilities does not support " 

1530 "UPDATE..RETURNING when executemany is used" 

1531 ) 

1532 elif id_ and not self.dialect.delete_executemany_returning: 

1533 raise exc.InvalidRequestError( 

1534 f"Dialect {self.dialect.dialect_description} with " 

1535 f"current server capabilities does not support " 

1536 "DELETE..RETURNING when executemany is used" 

1537 ) 

1538 

1539 if not parameters: 

1540 self.compiled_parameters = [ 

1541 compiled.construct_params( 

1542 extracted_parameters=extracted_parameters, 

1543 escape_names=False, 

1544 _collected_params=param_dict, 

1545 ) 

1546 ] 

1547 else: 

1548 self.compiled_parameters = [ 

1549 compiled.construct_params( 

1550 m, 

1551 escape_names=False, 

1552 _group_number=grp, 

1553 extracted_parameters=extracted_parameters, 

1554 _collected_params=param_dict, 

1555 ) 

1556 for grp, m in enumerate(parameters) 

1557 ] 

1558 

1559 if len(parameters) > 1: 

1560 if self.isinsert and compiled._insertmanyvalues: 

1561 self.execute_style = ExecuteStyle.INSERTMANYVALUES 

1562 

1563 imv = compiled._insertmanyvalues 

1564 if imv.sentinel_columns is not None: 

1565 self._num_sentinel_cols = imv.num_sentinel_columns 

1566 else: 

1567 self.execute_style = ExecuteStyle.EXECUTEMANY 

1568 

1569 self.unicode_statement = compiled.string 

1570 

1571 self.cursor = self.create_cursor() 

1572 

1573 if self.compiled.insert_prefetch or self.compiled.update_prefetch: 

1574 self._process_execute_defaults() 

1575 

1576 processors = compiled._bind_processors 

1577 

1578 flattened_processors: Mapping[ 

1579 str, _BindProcessorType[Any] 

1580 ] = processors # type: ignore[assignment] 

1581 

1582 if compiled.literal_execute_params or compiled.post_compile_params: 

1583 if self.executemany: 

1584 raise exc.InvalidRequestError( 

1585 "'literal_execute' or 'expanding' parameters can't be " 

1586 "used with executemany()" 

1587 ) 

1588 

1589 expanded_state = compiled._process_parameters_for_postcompile( 

1590 self.compiled_parameters[0] 

1591 ) 

1592 

1593 # re-assign self.unicode_statement 

1594 self.unicode_statement = expanded_state.statement 

1595 

1596 self._expanded_parameters = expanded_state.parameter_expansion 

1597 

1598 flattened_processors = dict(processors) # type: ignore 

1599 flattened_processors.update(expanded_state.processors) 

1600 positiontup = expanded_state.positiontup 

1601 elif compiled.positional: 

1602 positiontup = self.compiled.positiontup 

1603 else: 

1604 positiontup = None 

1605 

1606 if compiled.schema_translate_map: 

1607 schema_translate_map = self.execution_options.get( 

1608 "schema_translate_map", {} 

1609 ) 

1610 rst = compiled.preparer._render_schema_translates 

1611 self.unicode_statement = rst( 

1612 self.unicode_statement, schema_translate_map 

1613 ) 

1614 

1615 # final self.unicode_statement is now assigned, encode if needed 

1616 # by dialect 

1617 self.statement = self.unicode_statement 

1618 

1619 # Convert the dictionary of bind parameter values 

1620 # into a dict or list to be sent to the DBAPI's 

1621 # execute() or executemany() method. 

1622 

1623 if compiled.positional: 

1624 core_positional_parameters: MutableSequence[Sequence[Any]] = [] 

1625 assert positiontup is not None 

1626 for compiled_params in self.compiled_parameters: 

1627 l_param: List[Any] = [ 

1628 ( 

1629 flattened_processors[key](compiled_params[key]) 

1630 if key in flattened_processors 

1631 else compiled_params[key] 

1632 ) 

1633 for key in positiontup 

1634 ] 

1635 core_positional_parameters.append( 

1636 dialect.execute_sequence_format(l_param) 

1637 ) 

1638 

1639 self.parameters = core_positional_parameters 

1640 else: 

1641 core_dict_parameters: MutableSequence[Dict[str, Any]] = [] 

1642 escaped_names = compiled.escaped_bind_names 

1643 

1644 # note that currently, "expanded" parameters will be present 

1645 # in self.compiled_parameters in their quoted form. This is 

1646 # slightly inconsistent with the approach taken as of 

1647 # #8056 where self.compiled_parameters is meant to contain unquoted 

1648 # param names. 

1649 d_param: Dict[str, Any] 

1650 for compiled_params in self.compiled_parameters: 

1651 if escaped_names: 

1652 d_param = { 

1653 escaped_names.get(key, key): ( 

1654 flattened_processors[key](compiled_params[key]) 

1655 if key in flattened_processors 

1656 else compiled_params[key] 

1657 ) 

1658 for key in compiled_params 

1659 } 

1660 else: 

1661 d_param = { 

1662 key: ( 

1663 flattened_processors[key](compiled_params[key]) 

1664 if key in flattened_processors 

1665 else compiled_params[key] 

1666 ) 

1667 for key in compiled_params 

1668 } 

1669 

1670 core_dict_parameters.append(d_param) 

1671 

1672 self.parameters = core_dict_parameters 

1673 

1674 return self 

1675 

1676 @classmethod 

1677 def _init_statement( 

1678 cls, 

1679 dialect: Dialect, 

1680 connection: Connection, 

1681 dbapi_connection: PoolProxiedConnection, 

1682 execution_options: _ExecuteOptions, 

1683 statement: str, 

1684 parameters: _DBAPIMultiExecuteParams, 

1685 ) -> ExecutionContext: 

1686 """Initialize execution context for a string SQL statement.""" 

1687 

1688 self = cls.__new__(cls) 

1689 self.root_connection = connection 

1690 self._dbapi_connection = dbapi_connection 

1691 self.dialect = connection.dialect 

1692 self.is_text = True 

1693 

1694 self.execution_options = execution_options 

1695 

1696 if not parameters: 

1697 if self.dialect.positional: 

1698 self.parameters = [dialect.execute_sequence_format()] 

1699 else: 

1700 self.parameters = [self._empty_dict_params] 

1701 elif isinstance(parameters[0], dialect.execute_sequence_format): 

1702 self.parameters = parameters 

1703 elif isinstance(parameters[0], dict): 

1704 self.parameters = parameters 

1705 else: 

1706 self.parameters = [ 

1707 dialect.execute_sequence_format(p) for p in parameters 

1708 ] 

1709 

1710 if len(parameters) > 1: 

1711 self.execute_style = ExecuteStyle.EXECUTEMANY 

1712 

1713 self.statement = self.unicode_statement = statement 

1714 

1715 self.cursor = self.create_cursor() 

1716 return self 

1717 

1718 @classmethod 

1719 def _init_default( 

1720 cls, 

1721 dialect: Dialect, 

1722 connection: Connection, 

1723 dbapi_connection: PoolProxiedConnection, 

1724 execution_options: _ExecuteOptions, 

1725 ) -> ExecutionContext: 

1726 """Initialize execution context for a ColumnDefault construct.""" 

1727 

1728 self = cls.__new__(cls) 

1729 self.root_connection = connection 

1730 self._dbapi_connection = dbapi_connection 

1731 self.dialect = connection.dialect 

1732 

1733 self.execution_options = execution_options 

1734 

1735 self.cursor = self.create_cursor() 

1736 return self 

1737 

1738 def _get_cache_stats(self) -> str: 

1739 if self.compiled is None: 

1740 return "raw sql" 

1741 

1742 now = perf_counter() 

1743 

1744 ch = self.cache_hit 

1745 

1746 gen_time = self.compiled._gen_time 

1747 assert gen_time is not None 

1748 

1749 if ch is NO_CACHE_KEY: 

1750 return "no key %.5fs" % (now - gen_time,) 

1751 elif ch is CACHE_HIT: 

1752 return "cached since %.4gs ago" % (now - gen_time,) 

1753 elif ch is CACHE_MISS: 

1754 return "generated in %.5fs" % (now - gen_time,) 

1755 elif ch is CACHING_DISABLED: 

1756 if "_cache_disable_reason" in self.execution_options: 

1757 return "caching disabled (%s) %.5fs " % ( 

1758 self.execution_options["_cache_disable_reason"], 

1759 now - gen_time, 

1760 ) 

1761 else: 

1762 return "caching disabled %.5fs" % (now - gen_time,) 

1763 elif ch is NO_DIALECT_SUPPORT: 

1764 return "dialect %s+%s does not support caching %.5fs" % ( 

1765 self.dialect.name, 

1766 self.dialect.driver, 

1767 now - gen_time, 

1768 ) 

1769 else: 

1770 return "unknown" 

1771 

1772 @property 

1773 def executemany(self): # type: ignore[override] 

1774 return self.execute_style in ( 

1775 ExecuteStyle.EXECUTEMANY, 

1776 ExecuteStyle.INSERTMANYVALUES, 

1777 ) 

1778 

1779 @util.memoized_property 

1780 def identifier_preparer(self): 

1781 if self.compiled: 

1782 return self.compiled.preparer 

1783 elif "schema_translate_map" in self.execution_options: 

1784 return self.dialect.identifier_preparer._with_schema_translate( 

1785 self.execution_options["schema_translate_map"] 

1786 ) 

1787 else: 

1788 return self.dialect.identifier_preparer 

1789 

1790 @util.memoized_property 

1791 def engine(self): 

1792 return self.root_connection.engine 

1793 

1794 @util.memoized_property 

1795 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1796 if TYPE_CHECKING: 

1797 assert isinstance(self.compiled, SQLCompiler) 

1798 return self.compiled.postfetch 

1799 

1800 @util.memoized_property 

1801 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

1802 if TYPE_CHECKING: 

1803 assert isinstance(self.compiled, SQLCompiler) 

1804 if self.isinsert: 

1805 return self.compiled.insert_prefetch 

1806 elif self.isupdate: 

1807 return self.compiled.update_prefetch 

1808 else: 

1809 return () 

1810 

1811 @util.memoized_property 

1812 def no_parameters(self): 

1813 return self.execution_options.get("no_parameters", False) 

1814 

1815 def _execute_scalar( 

1816 self, 

1817 stmt: str, 

1818 type_: Optional[TypeEngine[Any]], 

1819 parameters: Optional[_DBAPISingleExecuteParams] = None, 

1820 ) -> Any: 

1821 """Execute a string statement on the current cursor, returning a 

1822 scalar result. 

1823 

1824 Used to fire off sequences, default phrases, and "select lastrowid" 

1825 types of statements individually or in the context of a parent INSERT 

1826 or UPDATE statement. 

1827 

1828 """ 

1829 

1830 conn = self.root_connection 

1831 

1832 if "schema_translate_map" in self.execution_options: 

1833 schema_translate_map = self.execution_options.get( 

1834 "schema_translate_map", {} 

1835 ) 

1836 

1837 rst = self.identifier_preparer._render_schema_translates 

1838 stmt = rst(stmt, schema_translate_map) 

1839 

1840 if not parameters: 

1841 if self.dialect.positional: 

1842 parameters = self.dialect.execute_sequence_format() 

1843 else: 

1844 parameters = {} 

1845 

1846 conn._cursor_execute(self.cursor, stmt, parameters, context=self) 

1847 row = self.cursor.fetchone() 

1848 if row is not None: 

1849 r = row[0] 

1850 else: 

1851 r = None 

1852 if type_ is not None: 

1853 # apply type post processors to the result 

1854 proc = type_._cached_result_processor( 

1855 self.dialect, self.cursor.description[0][1] 

1856 ) 

1857 if proc: 

1858 return proc(r) 

1859 return r 

1860 

1861 @util.memoized_property 

1862 def connection(self): 

1863 return self.root_connection 

1864 

1865 def _use_server_side_cursor(self): 

1866 if not self.dialect.supports_server_side_cursors: 

1867 return False 

1868 

1869 if self.dialect.server_side_cursors: 

1870 # this is deprecated 

1871 use_server_side = self.execution_options.get( 

1872 "stream_results", True 

1873 ) and ( 

1874 self.compiled 

1875 and isinstance(self.compiled.statement, expression.Selectable) 

1876 or ( 

1877 ( 

1878 not self.compiled 

1879 or isinstance( 

1880 self.compiled.statement, expression.TextClause 

1881 ) 

1882 ) 

1883 and self.unicode_statement 

1884 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement) 

1885 ) 

1886 ) 

1887 else: 

1888 use_server_side = self.execution_options.get( 

1889 "stream_results", False 

1890 ) 

1891 

1892 return use_server_side 

1893 

1894 def create_cursor(self) -> DBAPICursor: 

1895 if ( 

1896 # inlining initial preference checks for SS cursors 

1897 self.dialect.supports_server_side_cursors 

1898 and ( 

1899 self.execution_options.get("stream_results", False) 

1900 or ( 

1901 self.dialect.server_side_cursors 

1902 and self._use_server_side_cursor() 

1903 ) 

1904 ) 

1905 ): 

1906 self._is_server_side = True 

1907 return self.create_server_side_cursor() 

1908 else: 

1909 self._is_server_side = False 

1910 return self.create_default_cursor() 

1911 

1912 def fetchall_for_returning(self, cursor): 

1913 return cursor.fetchall() 

1914 

1915 def create_default_cursor(self) -> DBAPICursor: 

1916 return self._dbapi_connection.cursor() 

1917 

1918 def create_server_side_cursor(self) -> DBAPICursor: 

1919 raise NotImplementedError() 

1920 

1921 def pre_exec(self): 

1922 pass 

1923 

1924 def get_out_parameter_values(self, names): 

1925 raise NotImplementedError( 

1926 "This dialect does not support OUT parameters" 

1927 ) 

1928 

1929 def post_exec(self): 

1930 pass 

1931 

1932 def get_result_processor( 

1933 self, type_: TypeEngine[Any], colname: str, coltype: DBAPIType 

1934 ) -> Optional[_ResultProcessorType[Any]]: 

1935 """Return a 'result processor' for a given type as present in 

1936 cursor.description. 

1937 

1938 This has a default implementation that dialects can override 

1939 for context-sensitive result type handling. 

1940 

1941 """ 

1942 return type_._cached_result_processor(self.dialect, coltype) 

1943 

1944 def get_lastrowid(self) -> int: 

1945 """return self.cursor.lastrowid, or equivalent, after an INSERT. 

1946 

1947 This may involve calling special cursor functions, issuing a new SELECT 

1948 on the cursor (or a new one), or returning a stored value that was 

1949 calculated within post_exec(). 

1950 

1951 This function will only be called for dialects which support "implicit" 

1952 primary key generation, keep preexecute_autoincrement_sequences set to 

1953 False, and when no explicit id value was bound to the statement. 

1954 

1955 The function is called once for an INSERT statement that would need to 

1956 return the last inserted primary key for those dialects that make use 

1957 of the lastrowid concept. In these cases, it is called directly after 

1958 :meth:`.ExecutionContext.post_exec`. 

1959 

1960 """ 

1961 return self.cursor.lastrowid 

1962 

1963 def handle_dbapi_exception(self, e): 

1964 pass 

1965 

1966 @util.non_memoized_property 

1967 def rowcount(self) -> int: 

1968 if self._rowcount is not None: 

1969 return self._rowcount 

1970 else: 

1971 return self.cursor.rowcount 

1972 

1973 @property 

1974 def _has_rowcount(self): 

1975 return self._rowcount is not None 

1976 

1977 def supports_sane_rowcount(self): 

1978 return self.dialect.supports_sane_rowcount 

1979 

1980 def supports_sane_multi_rowcount(self): 

1981 return self.dialect.supports_sane_multi_rowcount 

1982 

1983 def _setup_result_proxy(self): 

1984 exec_opt = self.execution_options 

1985 

1986 if self._rowcount is None and exec_opt.get("preserve_rowcount", False): 

1987 self._rowcount = self.cursor.rowcount 

1988 

1989 yp: Optional[Union[int, bool]] 

1990 if self.is_crud or self.is_text: 

1991 result = self._setup_dml_or_text_result() 

1992 yp = False 

1993 else: 

1994 yp = exec_opt.get("yield_per", None) 

1995 sr = self._is_server_side or exec_opt.get("stream_results", False) 

1996 strategy = self.cursor_fetch_strategy 

1997 if sr and strategy is _cursor._DEFAULT_FETCH: 

1998 strategy = _cursor.BufferedRowCursorFetchStrategy( 

1999 self.cursor, self.execution_options 

2000 ) 

2001 cursor_description: _DBAPICursorDescription = ( 

2002 strategy.alternate_cursor_description 

2003 or self.cursor.description 

2004 ) 

2005 if cursor_description is None: 

2006 strategy = _cursor._NO_CURSOR_DQL 

2007 

2008 result = _cursor.CursorResult(self, strategy, cursor_description) 

2009 

2010 compiled = self.compiled 

2011 

2012 if ( 

2013 compiled 

2014 and not self.isddl 

2015 and cast(SQLCompiler, compiled).has_out_parameters 

2016 ): 

2017 self._setup_out_parameters(result) 

2018 

2019 self._soft_closed = result._soft_closed 

2020 

2021 if yp: 

2022 result = result.yield_per(yp) 

2023 

2024 return result 

2025 

2026 def _setup_out_parameters(self, result): 

2027 compiled = cast(SQLCompiler, self.compiled) 

2028 

2029 out_bindparams = [ 

2030 (param, name) 

2031 for param, name in compiled.bind_names.items() 

2032 if param.isoutparam 

2033 ] 

2034 out_parameters = {} 

2035 

2036 for bindparam, raw_value in zip( 

2037 [param for param, name in out_bindparams], 

2038 self.get_out_parameter_values( 

2039 [name for param, name in out_bindparams] 

2040 ), 

2041 ): 

2042 type_ = bindparam.type 

2043 impl_type = type_.dialect_impl(self.dialect) 

2044 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi) 

2045 result_processor = impl_type.result_processor( 

2046 self.dialect, dbapi_type 

2047 ) 

2048 if result_processor is not None: 

2049 raw_value = result_processor(raw_value) 

2050 out_parameters[bindparam.key] = raw_value 

2051 

2052 result.out_parameters = out_parameters 

2053 

2054 def _setup_dml_or_text_result(self): 

2055 compiled = cast(SQLCompiler, self.compiled) 

2056 

2057 strategy: ResultFetchStrategy = self.cursor_fetch_strategy 

2058 

2059 if self.isinsert: 

2060 if ( 

2061 self.execute_style is ExecuteStyle.INSERTMANYVALUES 

2062 and compiled.effective_returning 

2063 ): 

2064 strategy = _cursor.FullyBufferedCursorFetchStrategy( 

2065 self.cursor, 

2066 initial_buffer=self._insertmanyvalues_rows, 

2067 # maintain alt cursor description if set by the 

2068 # dialect, e.g. mssql preserves it 

2069 alternate_description=( 

2070 strategy.alternate_cursor_description 

2071 ), 

2072 ) 

2073 

2074 if compiled.postfetch_lastrowid: 

2075 self.inserted_primary_key_rows = ( 

2076 self._setup_ins_pk_from_lastrowid() 

2077 ) 

2078 # else if not self._is_implicit_returning, 

2079 # the default inserted_primary_key_rows accessor will 

2080 # return an "empty" primary key collection when accessed. 

2081 

2082 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: 

2083 strategy = _cursor.BufferedRowCursorFetchStrategy( 

2084 self.cursor, self.execution_options 

2085 ) 

2086 

2087 if strategy is _cursor._NO_CURSOR_DML: 

2088 cursor_description = None 

2089 else: 

2090 cursor_description = ( 

2091 strategy.alternate_cursor_description 

2092 or self.cursor.description 

2093 ) 

2094 

2095 if cursor_description is None: 

2096 strategy = _cursor._NO_CURSOR_DML 

2097 elif self._num_sentinel_cols: 

2098 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES 

2099 # the sentinel columns are handled in CursorResult._init_metadata 

2100 # using essentially _reduce 

2101 

2102 result: _cursor.CursorResult[Any] = _cursor.CursorResult( 

2103 self, strategy, cursor_description 

2104 ) 

2105 

2106 if self.isinsert: 

2107 if self._is_implicit_returning: 

2108 rows = result.all() 

2109 

2110 self.returned_default_rows = rows 

2111 

2112 self.inserted_primary_key_rows = ( 

2113 self._setup_ins_pk_from_implicit_returning(result, rows) 

2114 ) 

2115 

2116 # test that it has a cursor metadata that is accurate. the 

2117 # first row will have been fetched and current assumptions 

2118 # are that the result has only one row, until executemany() 

2119 # support is added here. 

2120 assert result._metadata.returns_rows 

2121 

2122 # Insert statement has both return_defaults() and 

2123 # returning(). rewind the result on the list of rows 

2124 # we just used. 

2125 if self._is_supplemental_returning: 

2126 result._rewind(rows) 

2127 else: 

2128 result._soft_close() 

2129 elif not self._is_explicit_returning: 

2130 result._soft_close() 

2131 

2132 # we assume here the result does not return any rows. 

2133 # *usually*, this will be true. However, some dialects 

2134 # such as that of MSSQL/pyodbc need to SELECT a post fetch 

2135 # function so this is not necessarily true. 

2136 # assert not result.returns_rows 

2137 

2138 elif self._is_implicit_returning: 

2139 rows = result.all() 

2140 

2141 if rows: 

2142 self.returned_default_rows = rows 

2143 self._rowcount = len(rows) 

2144 

2145 if self._is_supplemental_returning: 

2146 result._rewind(rows) 

2147 else: 

2148 result._soft_close() 

2149 

2150 # test that it has a cursor metadata that is accurate. 

2151 # the rows have all been fetched however. 

2152 assert result._metadata.returns_rows 

2153 

2154 elif not result._metadata.returns_rows: 

2155 # no results, get rowcount 

2156 # (which requires open cursor on some drivers) 

2157 if self._rowcount is None: 

2158 self._rowcount = self.cursor.rowcount 

2159 result._soft_close() 

2160 elif self.isupdate or self.isdelete: 

2161 if self._rowcount is None: 

2162 self._rowcount = self.cursor.rowcount 

2163 return result 

2164 

2165 @util.memoized_property 

2166 def inserted_primary_key_rows(self): 

2167 # if no specific "get primary key" strategy was set up 

2168 # during execution, return a "default" primary key based 

2169 # on what's in the compiled_parameters and nothing else. 

2170 return self._setup_ins_pk_from_empty() 

2171 

2172 def _setup_ins_pk_from_lastrowid(self): 

2173 getter = cast( 

2174 SQLCompiler, self.compiled 

2175 )._inserted_primary_key_from_lastrowid_getter 

2176 lastrowid = self.get_lastrowid() 

2177 return [getter(lastrowid, self.compiled_parameters[0])] 

2178 

2179 def _setup_ins_pk_from_empty(self): 

2180 getter = cast( 

2181 SQLCompiler, self.compiled 

2182 )._inserted_primary_key_from_lastrowid_getter 

2183 return [getter(None, param) for param in self.compiled_parameters] 

2184 

2185 def _setup_ins_pk_from_implicit_returning(self, result, rows): 

2186 if not rows: 

2187 return [] 

2188 

2189 getter = cast( 

2190 SQLCompiler, self.compiled 

2191 )._inserted_primary_key_from_returning_getter 

2192 compiled_params = self.compiled_parameters 

2193 

2194 return [ 

2195 getter(row, param) for row, param in zip(rows, compiled_params) 

2196 ] 

2197 

2198 def lastrow_has_defaults(self) -> bool: 

2199 return (self.isinsert or self.isupdate) and bool( 

2200 cast(SQLCompiler, self.compiled).postfetch 

2201 ) 

2202 

2203 def _prepare_set_input_sizes( 

2204 self, 

2205 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]: 

2206 """Given a cursor and ClauseParameters, prepare arguments 

2207 in order to call the appropriate 

2208 style of ``setinputsizes()`` on the cursor, using DB-API types 

2209 from the bind parameter's ``TypeEngine`` objects. 

2210 

2211 This method only called by those dialects which set the 

2212 :attr:`.Dialect.bind_typing` attribute to 

2213 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are 

2214 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an 

2215 option. 

2216 

2217 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used 

2218 for pg8000 and asyncpg, which has been changed to inline rendering 

2219 of casts. 

2220 

2221 """ 

2222 if self.isddl or self.is_text: 

2223 return None 

2224 

2225 compiled = cast(SQLCompiler, self.compiled) 

2226 

2227 inputsizes = compiled._get_set_input_sizes_lookup() 

2228 

2229 if inputsizes is None: 

2230 return None 

2231 

2232 dialect = self.dialect 

2233 

2234 # all of the rest of this... cython? 

2235 

2236 if dialect._has_events: 

2237 inputsizes = dict(inputsizes) 

2238 dialect.dispatch.do_setinputsizes( 

2239 inputsizes, self.cursor, self.statement, self.parameters, self 

2240 ) 

2241 

2242 if compiled.escaped_bind_names: 

2243 escaped_bind_names = compiled.escaped_bind_names 

2244 else: 

2245 escaped_bind_names = None 

2246 

2247 if dialect.positional: 

2248 items = [ 

2249 (key, compiled.binds[key]) 

2250 for key in compiled.positiontup or () 

2251 ] 

2252 else: 

2253 items = [ 

2254 (key, bindparam) 

2255 for bindparam, key in compiled.bind_names.items() 

2256 ] 

2257 

2258 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = [] 

2259 for key, bindparam in items: 

2260 if bindparam in compiled.literal_execute_params: 

2261 continue 

2262 

2263 if key in self._expanded_parameters: 

2264 if is_tuple_type(bindparam.type): 

2265 num = len(bindparam.type.types) 

2266 dbtypes = inputsizes[bindparam] 

2267 generic_inputsizes.extend( 

2268 ( 

2269 ( 

2270 escaped_bind_names.get(paramname, paramname) 

2271 if escaped_bind_names is not None 

2272 else paramname 

2273 ), 

2274 dbtypes[idx % num], 

2275 bindparam.type.types[idx % num], 

2276 ) 

2277 for idx, paramname in enumerate( 

2278 self._expanded_parameters[key] 

2279 ) 

2280 ) 

2281 else: 

2282 dbtype = inputsizes.get(bindparam, None) 

2283 generic_inputsizes.extend( 

2284 ( 

2285 ( 

2286 escaped_bind_names.get(paramname, paramname) 

2287 if escaped_bind_names is not None 

2288 else paramname 

2289 ), 

2290 dbtype, 

2291 bindparam.type, 

2292 ) 

2293 for paramname in self._expanded_parameters[key] 

2294 ) 

2295 else: 

2296 dbtype = inputsizes.get(bindparam, None) 

2297 

2298 escaped_name = ( 

2299 escaped_bind_names.get(key, key) 

2300 if escaped_bind_names is not None 

2301 else key 

2302 ) 

2303 

2304 generic_inputsizes.append( 

2305 (escaped_name, dbtype, bindparam.type) 

2306 ) 

2307 

2308 return generic_inputsizes 

2309 

2310 def _exec_default(self, column, default, type_): 

2311 if default.is_sequence: 

2312 return self.fire_sequence(default, type_) 

2313 elif default.is_callable: 

2314 # this codepath is not normally used as it's inlined 

2315 # into _process_execute_defaults 

2316 self.current_column = column 

2317 return default.arg(self) 

2318 elif default.is_clause_element: 

2319 return self._exec_default_clause_element(column, default, type_) 

2320 else: 

2321 # this codepath is not normally used as it's inlined 

2322 # into _process_execute_defaults 

2323 return default.arg 

2324 

2325 def _exec_default_clause_element(self, column, default, type_): 

2326 # execute a default that's a complete clause element. Here, we have 

2327 # to re-implement a miniature version of the compile->parameters-> 

2328 # cursor.execute() sequence, since we don't want to modify the state 

2329 # of the connection / result in progress or create new connection/ 

2330 # result objects etc. 

2331 # .. versionchanged:: 1.4 

2332 

2333 if not default._arg_is_typed: 

2334 default_arg = expression.type_coerce(default.arg, type_) 

2335 else: 

2336 default_arg = default.arg 

2337 compiled = expression.select(default_arg).compile(dialect=self.dialect) 

2338 compiled_params = compiled.construct_params() 

2339 processors = compiled._bind_processors 

2340 if compiled.positional: 

2341 parameters = self.dialect.execute_sequence_format( 

2342 [ 

2343 ( 

2344 processors[key](compiled_params[key]) # type: ignore 

2345 if key in processors 

2346 else compiled_params[key] 

2347 ) 

2348 for key in compiled.positiontup or () 

2349 ] 

2350 ) 

2351 else: 

2352 parameters = { 

2353 key: ( 

2354 processors[key](compiled_params[key]) # type: ignore 

2355 if key in processors 

2356 else compiled_params[key] 

2357 ) 

2358 for key in compiled_params 

2359 } 

2360 return self._execute_scalar( 

2361 str(compiled), type_, parameters=parameters 

2362 ) 

2363 

2364 current_parameters: Optional[_CoreSingleExecuteParams] = None 

2365 """A dictionary of parameters applied to the current row. 

2366 

2367 This attribute is only available in the context of a user-defined default 

2368 generation function, e.g. as described at :ref:`context_default_functions`. 

2369 It consists of a dictionary which includes entries for each column/value 

2370 pair that is to be part of the INSERT or UPDATE statement. The keys of the 

2371 dictionary will be the key value of each :class:`_schema.Column`, 

2372 which is usually 

2373 synonymous with the name. 

2374 

2375 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute 

2376 does not accommodate for the "multi-values" feature of the 

2377 :meth:`_expression.Insert.values` method. The 

2378 :meth:`.DefaultExecutionContext.get_current_parameters` method should be 

2379 preferred. 

2380 

2381 .. seealso:: 

2382 

2383 :meth:`.DefaultExecutionContext.get_current_parameters` 

2384 

2385 :ref:`context_default_functions` 

2386 

2387 """ 

2388 

2389 def get_current_parameters(self, isolate_multiinsert_groups=True): 

2390 """Return a dictionary of parameters applied to the current row. 

2391 

2392 This method can only be used in the context of a user-defined default 

2393 generation function, e.g. as described at 

2394 :ref:`context_default_functions`. When invoked, a dictionary is 

2395 returned which includes entries for each column/value pair that is part 

2396 of the INSERT or UPDATE statement. The keys of the dictionary will be 

2397 the key value of each :class:`_schema.Column`, 

2398 which is usually synonymous 

2399 with the name. 

2400 

2401 :param isolate_multiinsert_groups=True: indicates that multi-valued 

2402 INSERT constructs created using :meth:`_expression.Insert.values` 

2403 should be 

2404 handled by returning only the subset of parameters that are local 

2405 to the current column default invocation. When ``False``, the 

2406 raw parameters of the statement are returned including the 

2407 naming convention used in the case of multi-valued INSERT. 

2408 

2409 .. seealso:: 

2410 

2411 :attr:`.DefaultExecutionContext.current_parameters` 

2412 

2413 :ref:`context_default_functions` 

2414 

2415 """ 

2416 try: 

2417 parameters = self.current_parameters 

2418 column = self.current_column 

2419 except AttributeError: 

2420 raise exc.InvalidRequestError( 

2421 "get_current_parameters() can only be invoked in the " 

2422 "context of a Python side column default function" 

2423 ) 

2424 else: 

2425 assert column is not None 

2426 assert parameters is not None 

2427 compile_state = cast( 

2428 "DMLState", cast(SQLCompiler, self.compiled).compile_state 

2429 ) 

2430 assert compile_state is not None 

2431 if ( 

2432 isolate_multiinsert_groups 

2433 and dml.isinsert(compile_state) 

2434 and compile_state._has_multi_parameters 

2435 ): 

2436 if column._is_multiparam_column: 

2437 index = column.index + 1 

2438 d = {column.original.key: parameters[column.key]} 

2439 else: 

2440 d = {column.key: parameters[column.key]} 

2441 index = 0 

2442 assert compile_state._dict_parameters is not None 

2443 keys = compile_state._dict_parameters.keys() 

2444 d.update( 

2445 (key, parameters["%s_m%d" % (key, index)]) for key in keys 

2446 ) 

2447 return d 

2448 else: 

2449 return parameters 

2450 

2451 def get_insert_default(self, column): 

2452 if column.default is None: 

2453 return None 

2454 else: 

2455 return self._exec_default(column, column.default, column.type) 

2456 

2457 def get_update_default(self, column): 

2458 if column.onupdate is None: 

2459 return None 

2460 else: 

2461 return self._exec_default(column, column.onupdate, column.type) 

2462 

2463 def _process_execute_defaults(self): 

2464 compiled = cast(SQLCompiler, self.compiled) 

2465 

2466 key_getter = compiled._within_exec_param_key_getter 

2467 

2468 sentinel_counter = 0 

2469 

2470 if compiled.insert_prefetch: 

2471 prefetch_recs = [ 

2472 ( 

2473 c, 

2474 key_getter(c), 

2475 c._default_description_tuple, 

2476 self.get_insert_default, 

2477 ) 

2478 for c in compiled.insert_prefetch 

2479 ] 

2480 elif compiled.update_prefetch: 

2481 prefetch_recs = [ 

2482 ( 

2483 c, 

2484 key_getter(c), 

2485 c._onupdate_description_tuple, 

2486 self.get_update_default, 

2487 ) 

2488 for c in compiled.update_prefetch 

2489 ] 

2490 else: 

2491 prefetch_recs = [] 

2492 

2493 for param in self.compiled_parameters: 

2494 self.current_parameters = param 

2495 

2496 for ( 

2497 c, 

2498 param_key, 

2499 (arg, is_scalar, is_callable, is_sentinel), 

2500 fallback, 

2501 ) in prefetch_recs: 

2502 if is_sentinel: 

2503 param[param_key] = sentinel_counter 

2504 sentinel_counter += 1 

2505 elif is_scalar: 

2506 param[param_key] = arg 

2507 elif is_callable: 

2508 self.current_column = c 

2509 param[param_key] = arg(self) 

2510 else: 

2511 val = fallback(c) 

2512 if val is not None: 

2513 param[param_key] = val 

2514 

2515 del self.current_parameters 

2516 

2517 

2518DefaultDialect.execution_ctx_cls = DefaultExecutionContext