Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/base.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

938 statements  

1# sql/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Foundational utilities common to many sql modules.""" 

10 

11 

12from __future__ import annotations 

13 

14import collections 

15from enum import Enum 

16import itertools 

17from itertools import zip_longest 

18import operator 

19import re 

20from typing import Any 

21from typing import Callable 

22from typing import cast 

23from typing import Dict 

24from typing import Final 

25from typing import FrozenSet 

26from typing import Generator 

27from typing import Generic 

28from typing import Iterable 

29from typing import Iterator 

30from typing import List 

31from typing import Mapping 

32from typing import MutableMapping 

33from typing import NamedTuple 

34from typing import NoReturn 

35from typing import Optional 

36from typing import overload 

37from typing import Protocol 

38from typing import Sequence 

39from typing import Set 

40from typing import Tuple 

41from typing import Type 

42from typing import TYPE_CHECKING 

43from typing import TypeGuard 

44from typing import TypeVar 

45from typing import Union 

46 

47from . import roles 

48from . import visitors 

49from .cache_key import HasCacheKey # noqa 

50from .cache_key import MemoizedHasCacheKey # noqa 

51from .traversals import HasCopyInternals # noqa 

52from .visitors import ClauseVisitor 

53from .visitors import ExtendedInternalTraversal 

54from .visitors import ExternallyTraversible 

55from .visitors import InternalTraversal 

56from .. import event 

57from .. import exc 

58from .. import util 

59from ..util import EMPTY_DICT 

60from ..util import HasMemoized as HasMemoized 

61from ..util import hybridmethod 

62from ..util.typing import Self 

63from ..util.typing import TypeVarTuple 

64from ..util.typing import Unpack 

65 

66if TYPE_CHECKING: 

67 from . import coercions 

68 from . import elements 

69 from . import type_api 

70 from ._orm_types import DMLStrategyArgument 

71 from ._orm_types import SynchronizeSessionArgument 

72 from ._typing import _CLE 

73 from .cache_key import CacheKey 

74 from .compiler import SQLCompiler 

75 from .dml import Delete 

76 from .dml import Insert 

77 from .dml import Update 

78 from .elements import BindParameter 

79 from .elements import ClauseElement 

80 from .elements import ClauseList 

81 from .elements import ColumnClause # noqa 

82 from .elements import ColumnElement 

83 from .elements import NamedColumn 

84 from .elements import SQLCoreOperations 

85 from .elements import TextClause 

86 from .schema import Column 

87 from .schema import DefaultGenerator 

88 from .selectable import _JoinTargetElement 

89 from .selectable import _SelectIterable 

90 from .selectable import FromClause 

91 from .selectable import Select 

92 from .visitors import anon_map 

93 from ..engine import Connection 

94 from ..engine import CursorResult 

95 from ..engine.interfaces import _CoreMultiExecuteParams 

96 from ..engine.interfaces import _CoreSingleExecuteParams 

97 from ..engine.interfaces import _ExecuteOptions 

98 from ..engine.interfaces import _ImmutableExecuteOptions 

99 from ..engine.interfaces import CacheStats 

100 from ..engine.interfaces import Compiled 

101 from ..engine.interfaces import CompiledCacheType 

102 from ..engine.interfaces import CoreExecuteOptionsParameter 

103 from ..engine.interfaces import Dialect 

104 from ..engine.interfaces import IsolationLevel 

105 from ..engine.interfaces import SchemaTranslateMapType 

106 from ..event import dispatcher 

107 

108if not TYPE_CHECKING: 

109 coercions = None # noqa 

110 elements = None # noqa 

111 type_api = None # noqa 

112 

113 

114_Ts = TypeVarTuple("_Ts") 

115 

116 

117class _NoArg(Enum): 

118 NO_ARG = 0 

119 

120 def __repr__(self): 

121 return f"_NoArg.{self.name}" 

122 

123 

124NO_ARG: Final = _NoArg.NO_ARG 

125 

126 

127class _NoneName(Enum): 

128 NONE_NAME = 0 

129 """indicate a 'deferred' name that was ultimately the value None.""" 

130 

131 

132_NONE_NAME: Final = _NoneName.NONE_NAME 

133 

134_T = TypeVar("_T", bound=Any) 

135 

136_Fn = TypeVar("_Fn", bound=Callable[..., Any]) 

137 

138_AmbiguousTableNameMap = MutableMapping[str, str] 

139 

140 

141class _DefaultDescriptionTuple(NamedTuple): 

142 arg: Any 

143 is_scalar: Optional[bool] 

144 is_callable: Optional[bool] 

145 is_sentinel: Optional[bool] 

146 

147 @classmethod 

148 def _from_column_default( 

149 cls, default: Optional[DefaultGenerator] 

150 ) -> _DefaultDescriptionTuple: 

151 return ( 

152 _DefaultDescriptionTuple( 

153 default.arg, # type: ignore 

154 default.is_scalar, 

155 default.is_callable, 

156 default.is_sentinel, 

157 ) 

158 if default 

159 and ( 

160 default.has_arg 

161 or (not default.for_update and default.is_sentinel) 

162 ) 

163 else _DefaultDescriptionTuple(None, None, None, None) 

164 ) 

165 

166 

167_never_select_column: operator.attrgetter[Any] = operator.attrgetter( 

168 "_omit_from_statements" 

169) 

170 

171 

172class _EntityNamespace(Protocol): 

173 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ... 

174 

175 

176class _HasEntityNamespace(Protocol): 

177 @util.ro_non_memoized_property 

178 def entity_namespace(self) -> _EntityNamespace: ... 

179 

180 

181def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]: 

182 return hasattr(element, "entity_namespace") 

183 

184 

185# Remove when https://github.com/python/mypy/issues/14640 will be fixed 

186_Self = TypeVar("_Self", bound=Any) 

187 

188 

189class Immutable: 

190 """mark a ClauseElement as 'immutable' when expressions are cloned. 

191 

192 "immutable" objects refers to the "mutability" of an object in the 

193 context of SQL DQL and DML generation. Such as, in DQL, one can 

194 compose a SELECT or subquery of varied forms, but one cannot modify 

195 the structure of a specific table or column within DQL. 

196 :class:`.Immutable` is mostly intended to follow this concept, and as 

197 such the primary "immutable" objects are :class:`.ColumnClause`, 

198 :class:`.Column`, :class:`.TableClause`, :class:`.Table`. 

199 

200 """ 

201 

202 __slots__ = () 

203 

204 _is_immutable: bool = True 

205 

206 def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn: 

207 raise NotImplementedError("Immutable objects do not support copying") 

208 

209 def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn: 

210 raise NotImplementedError("Immutable objects do not support copying") 

211 

212 def _clone(self: _Self, **kw: Any) -> _Self: 

213 return self 

214 

215 def _copy_internals( 

216 self, *, omit_attrs: Iterable[str] = (), **kw: Any 

217 ) -> None: 

218 pass 

219 

220 

221class SingletonConstant(Immutable): 

222 """Represent SQL constants like NULL, TRUE, FALSE""" 

223 

224 _is_singleton_constant: bool = True 

225 

226 _singleton: SingletonConstant 

227 

228 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T: 

229 return cast(_T, cls._singleton) 

230 

231 @util.non_memoized_property 

232 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]: 

233 raise NotImplementedError() 

234 

235 @classmethod 

236 def _create_singleton(cls) -> None: 

237 obj = object.__new__(cls) 

238 obj.__init__() # type: ignore 

239 

240 # for a long time this was an empty frozenset, meaning 

241 # a SingletonConstant would never be a "corresponding column" in 

242 # a statement. This referred to #6259. However, in #7154 we see 

243 # that we do in fact need "correspondence" to work when matching cols 

244 # in result sets, so the non-correspondence was moved to a more 

245 # specific level when we are actually adapting expressions for SQL 

246 # render only. 

247 obj.proxy_set = frozenset([obj]) 

248 cls._singleton = obj 

249 

250 

251def _from_objects( 

252 *elements: Union[ 

253 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement 

254 ] 

255) -> Iterator[FromClause]: 

256 return itertools.chain.from_iterable( 

257 [element._from_objects for element in elements] 

258 ) 

259 

260 

261def _select_iterables( 

262 elements: Iterable[roles.ColumnsClauseRole], 

263) -> _SelectIterable: 

264 """expand tables into individual columns in the 

265 given list of column expressions. 

266 

267 """ 

268 return itertools.chain.from_iterable( 

269 [c._select_iterable for c in elements] 

270 ) 

271 

272 

273_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType") 

274 

275 

276class _GenerativeType(Protocol): 

277 def _generate(self) -> Self: ... 

278 

279 

280def _generative(fn: _Fn) -> _Fn: 

281 """non-caching _generative() decorator. 

282 

283 This is basically the legacy decorator that copies the object and 

284 runs a method on the new copy. 

285 

286 """ 

287 

288 @util.decorator 

289 def _generative( 

290 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any 

291 ) -> _SelfGenerativeType: 

292 """Mark a method as generative.""" 

293 

294 self = self._generate() 

295 x = fn(self, *args, **kw) 

296 assert x is self, "generative methods must return self" 

297 return self 

298 

299 decorated = _generative(fn) 

300 decorated.non_generative = fn # type: ignore 

301 return decorated 

302 

303 

304def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]: 

305 msgs: Dict[str, str] = kw.pop("msgs", {}) 

306 

307 defaults: Dict[str, str] = kw.pop("defaults", {}) 

308 

309 getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [ 

310 (name, operator.attrgetter(name), defaults.get(name, None)) 

311 for name in names 

312 ] 

313 

314 @util.decorator 

315 def check(fn: _Fn, *args: Any, **kw: Any) -> Any: 

316 # make pylance happy by not including "self" in the argument 

317 # list 

318 self = args[0] 

319 args = args[1:] 

320 for name, getter, default_ in getters: 

321 if getter(self) is not default_: 

322 msg = msgs.get( 

323 name, 

324 "Method %s() has already been invoked on this %s construct" 

325 % (fn.__name__, self.__class__), 

326 ) 

327 raise exc.InvalidRequestError(msg) 

328 return fn(self, *args, **kw) 

329 

330 return check 

331 

332 

333def _clone(element, **kw): 

334 return element._clone(**kw) 

335 

336 

337def _expand_cloned( 

338 elements: Iterable[_CLE], 

339) -> Iterable[_CLE]: 

340 """expand the given set of ClauseElements to be the set of all 'cloned' 

341 predecessors. 

342 

343 """ 

344 # TODO: cython candidate 

345 return itertools.chain(*[x._cloned_set for x in elements]) 

346 

347 

348def _de_clone( 

349 elements: Iterable[_CLE], 

350) -> Iterable[_CLE]: 

351 for x in elements: 

352 while x._is_clone_of is not None: 

353 x = x._is_clone_of 

354 yield x 

355 

356 

357def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

358 """return the intersection of sets a and b, counting 

359 any overlap between 'cloned' predecessors. 

360 

361 The returned set is in terms of the entities present within 'a'. 

362 

363 """ 

364 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection( 

365 _expand_cloned(b) 

366 ) 

367 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)} 

368 

369 

370def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

371 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection( 

372 _expand_cloned(b) 

373 ) 

374 return { 

375 elem for elem in a if not all_overlap.intersection(elem._cloned_set) 

376 } 

377 

378 

379class _DialectArgView(MutableMapping[str, Any]): 

380 """A dictionary view of dialect-level arguments in the form 

381 <dialectname>_<argument_name>. 

382 

383 """ 

384 

385 __slots__ = ("obj",) 

386 

387 def __init__(self, obj: DialectKWArgs) -> None: 

388 self.obj = obj 

389 

390 def _key(self, key: str) -> Tuple[str, str]: 

391 try: 

392 dialect, value_key = key.split("_", 1) 

393 except ValueError as err: 

394 raise KeyError(key) from err 

395 else: 

396 return dialect, value_key 

397 

398 def __getitem__(self, key: str) -> Any: 

399 dialect, value_key = self._key(key) 

400 

401 try: 

402 opt = self.obj.dialect_options[dialect] 

403 except exc.NoSuchModuleError as err: 

404 raise KeyError(key) from err 

405 else: 

406 return opt[value_key] 

407 

408 def __setitem__(self, key: str, value: Any) -> None: 

409 try: 

410 dialect, value_key = self._key(key) 

411 except KeyError as err: 

412 raise exc.ArgumentError( 

413 "Keys must be of the form <dialectname>_<argname>" 

414 ) from err 

415 else: 

416 self.obj.dialect_options[dialect][value_key] = value 

417 

418 def __delitem__(self, key: str) -> None: 

419 dialect, value_key = self._key(key) 

420 del self.obj.dialect_options[dialect][value_key] 

421 

422 def __len__(self) -> int: 

423 return sum( 

424 len(args._non_defaults) 

425 for args in self.obj.dialect_options.values() 

426 ) 

427 

428 def __iter__(self) -> Generator[str, None, None]: 

429 return ( 

430 "%s_%s" % (dialect_name, value_name) 

431 for dialect_name in self.obj.dialect_options 

432 for value_name in self.obj.dialect_options[ 

433 dialect_name 

434 ]._non_defaults 

435 ) 

436 

437 

438class _DialectArgDict(MutableMapping[str, Any]): 

439 """A dictionary view of dialect-level arguments for a specific 

440 dialect. 

441 

442 Maintains a separate collection of user-specified arguments 

443 and dialect-specified default arguments. 

444 

445 """ 

446 

447 def __init__(self) -> None: 

448 self._non_defaults: Dict[str, Any] = {} 

449 self._defaults: Dict[str, Any] = {} 

450 

451 def __len__(self) -> int: 

452 return len(set(self._non_defaults).union(self._defaults)) 

453 

454 def __iter__(self) -> Iterator[str]: 

455 return iter(set(self._non_defaults).union(self._defaults)) 

456 

457 def __getitem__(self, key: str) -> Any: 

458 if key in self._non_defaults: 

459 return self._non_defaults[key] 

460 else: 

461 return self._defaults[key] 

462 

463 def __setitem__(self, key: str, value: Any) -> None: 

464 self._non_defaults[key] = value 

465 

466 def __delitem__(self, key: str) -> None: 

467 del self._non_defaults[key] 

468 

469 

470@util.preload_module("sqlalchemy.dialects") 

471def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]: 

472 dialect_cls = util.preloaded.dialects.registry.load(dialect_name) 

473 if dialect_cls.construct_arguments is None: 

474 return None 

475 return dict(dialect_cls.construct_arguments) 

476 

477 

478class DialectKWArgs: 

479 """Establish the ability for a class to have dialect-specific arguments 

480 with defaults and constructor validation. 

481 

482 The :class:`.DialectKWArgs` interacts with the 

483 :attr:`.DefaultDialect.construct_arguments` present on a dialect. 

484 

485 .. seealso:: 

486 

487 :attr:`.DefaultDialect.construct_arguments` 

488 

489 """ 

490 

491 __slots__ = () 

492 

493 _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [ 

494 ("dialect_options", InternalTraversal.dp_dialect_options) 

495 ] 

496 

497 @classmethod 

498 def argument_for( 

499 cls, dialect_name: str, argument_name: str, default: Any 

500 ) -> None: 

501 """Add a new kind of dialect-specific keyword argument for this class. 

502 

503 E.g.:: 

504 

505 Index.argument_for("mydialect", "length", None) 

506 

507 some_index = Index("a", "b", mydialect_length=5) 

508 

509 The :meth:`.DialectKWArgs.argument_for` method is a per-argument 

510 way adding extra arguments to the 

511 :attr:`.DefaultDialect.construct_arguments` dictionary. This 

512 dictionary provides a list of argument names accepted by various 

513 schema-level constructs on behalf of a dialect. 

514 

515 New dialects should typically specify this dictionary all at once as a 

516 data member of the dialect class. The use case for ad-hoc addition of 

517 argument names is typically for end-user code that is also using 

518 a custom compilation scheme which consumes the additional arguments. 

519 

520 :param dialect_name: name of a dialect. The dialect must be 

521 locatable, else a :class:`.NoSuchModuleError` is raised. The 

522 dialect must also include an existing 

523 :attr:`.DefaultDialect.construct_arguments` collection, indicating 

524 that it participates in the keyword-argument validation and default 

525 system, else :class:`.ArgumentError` is raised. If the dialect does 

526 not include this collection, then any keyword argument can be 

527 specified on behalf of this dialect already. All dialects packaged 

528 within SQLAlchemy include this collection, however for third party 

529 dialects, support may vary. 

530 

531 :param argument_name: name of the parameter. 

532 

533 :param default: default value of the parameter. 

534 

535 """ 

536 

537 construct_arg_dictionary: Optional[Dict[Any, Any]] = ( 

538 DialectKWArgs._kw_registry[dialect_name] 

539 ) 

540 if construct_arg_dictionary is None: 

541 raise exc.ArgumentError( 

542 "Dialect '%s' does have keyword-argument " 

543 "validation and defaults enabled configured" % dialect_name 

544 ) 

545 if cls not in construct_arg_dictionary: 

546 construct_arg_dictionary[cls] = {} 

547 construct_arg_dictionary[cls][argument_name] = default 

548 

549 @property 

550 def dialect_kwargs(self) -> _DialectArgView: 

551 """A collection of keyword arguments specified as dialect-specific 

552 options to this construct. 

553 

554 The arguments are present here in their original ``<dialect>_<kwarg>`` 

555 format. Only arguments that were actually passed are included; 

556 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which 

557 contains all options known by this dialect including defaults. 

558 

559 The collection is also writable; keys are accepted of the 

560 form ``<dialect>_<kwarg>`` where the value will be assembled 

561 into the list of options. 

562 

563 .. seealso:: 

564 

565 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form 

566 

567 """ 

568 return _DialectArgView(self) 

569 

570 @property 

571 def kwargs(self) -> _DialectArgView: 

572 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`.""" 

573 return self.dialect_kwargs 

574 

575 _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = ( 

576 util.PopulateDict(_kw_reg_for_dialect) 

577 ) 

578 

579 @classmethod 

580 def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict: 

581 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name] 

582 d = _DialectArgDict() 

583 

584 if construct_arg_dictionary is None: 

585 d._defaults.update({"*": None}) 

586 else: 

587 for cls in reversed(cls.__mro__): 

588 if cls in construct_arg_dictionary: 

589 d._defaults.update(construct_arg_dictionary[cls]) 

590 return d 

591 

592 @util.memoized_property 

593 def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]: 

594 """A collection of keyword arguments specified as dialect-specific 

595 options to this construct. 

596 

597 This is a two-level nested registry, keyed to ``<dialect_name>`` 

598 and ``<argument_name>``. For example, the ``postgresql_where`` 

599 argument would be locatable as:: 

600 

601 arg = my_object.dialect_options["postgresql"]["where"] 

602 

603 .. versionadded:: 0.9.2 

604 

605 .. seealso:: 

606 

607 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form 

608 

609 """ 

610 

611 return util.PopulateDict(self._kw_reg_for_dialect_cls) 

612 

613 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None: 

614 # validate remaining kwargs that they all specify DB prefixes 

615 

616 if not kwargs: 

617 return 

618 

619 for k in kwargs: 

620 m = re.match("^(.+?)_(.+)$", k) 

621 if not m: 

622 raise TypeError( 

623 "Additional arguments should be " 

624 "named <dialectname>_<argument>, got '%s'" % k 

625 ) 

626 dialect_name, arg_name = m.group(1, 2) 

627 

628 try: 

629 construct_arg_dictionary = self.dialect_options[dialect_name] 

630 except exc.NoSuchModuleError: 

631 util.warn( 

632 "Can't validate argument %r; can't " 

633 "locate any SQLAlchemy dialect named %r" 

634 % (k, dialect_name) 

635 ) 

636 self.dialect_options[dialect_name] = d = _DialectArgDict() 

637 d._defaults.update({"*": None}) 

638 d._non_defaults[arg_name] = kwargs[k] 

639 else: 

640 if ( 

641 "*" not in construct_arg_dictionary 

642 and arg_name not in construct_arg_dictionary 

643 ): 

644 raise exc.ArgumentError( 

645 "Argument %r is not accepted by " 

646 "dialect %r on behalf of %r" 

647 % (k, dialect_name, self.__class__) 

648 ) 

649 else: 

650 construct_arg_dictionary[arg_name] = kwargs[k] 

651 

652 

653class CompileState: 

654 """Produces additional object state necessary for a statement to be 

655 compiled. 

656 

657 the :class:`.CompileState` class is at the base of classes that assemble 

658 state for a particular statement object that is then used by the 

659 compiler. This process is essentially an extension of the process that 

660 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis 

661 on converting raw user intent into more organized structures rather than 

662 producing string output. The top-level :class:`.CompileState` for the 

663 statement being executed is also accessible when the execution context 

664 works with invoking the statement and collecting results. 

665 

666 The production of :class:`.CompileState` is specific to the compiler, such 

667 as within the :meth:`.SQLCompiler.visit_insert`, 

668 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also 

669 responsible for associating the :class:`.CompileState` with the 

670 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement, 

671 i.e. the outermost SQL statement that's actually being executed. 

672 There can be other :class:`.CompileState` objects that are not the 

673 toplevel, such as when a SELECT subquery or CTE-nested 

674 INSERT/UPDATE/DELETE is generated. 

675 

676 .. versionadded:: 1.4 

677 

678 """ 

679 

680 __slots__ = ("statement", "_ambiguous_table_name_map") 

681 

682 plugins: Dict[Tuple[str, str], Type[CompileState]] = {} 

683 

684 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] 

685 

686 @classmethod 

687 def create_for_statement( 

688 cls, statement: Executable, compiler: SQLCompiler, **kw: Any 

689 ) -> CompileState: 

690 # factory construction. 

691 

692 if statement._propagate_attrs: 

693 plugin_name = statement._propagate_attrs.get( 

694 "compile_state_plugin", "default" 

695 ) 

696 klass = cls.plugins.get( 

697 (plugin_name, statement._effective_plugin_target), None 

698 ) 

699 if klass is None: 

700 klass = cls.plugins[ 

701 ("default", statement._effective_plugin_target) 

702 ] 

703 

704 else: 

705 klass = cls.plugins[ 

706 ("default", statement._effective_plugin_target) 

707 ] 

708 

709 if klass is cls: 

710 return cls(statement, compiler, **kw) 

711 else: 

712 return klass.create_for_statement(statement, compiler, **kw) 

713 

714 def __init__(self, statement, compiler, **kw): 

715 self.statement = statement 

716 

717 @classmethod 

718 def get_plugin_class( 

719 cls, statement: Executable 

720 ) -> Optional[Type[CompileState]]: 

721 plugin_name = statement._propagate_attrs.get( 

722 "compile_state_plugin", None 

723 ) 

724 

725 if plugin_name: 

726 key = (plugin_name, statement._effective_plugin_target) 

727 if key in cls.plugins: 

728 return cls.plugins[key] 

729 

730 # there's no case where we call upon get_plugin_class() and want 

731 # to get None back, there should always be a default. return that 

732 # if there was no plugin-specific class (e.g. Insert with "orm" 

733 # plugin) 

734 try: 

735 return cls.plugins[("default", statement._effective_plugin_target)] 

736 except KeyError: 

737 return None 

738 

739 @classmethod 

740 def _get_plugin_class_for_plugin( 

741 cls, statement: Executable, plugin_name: str 

742 ) -> Optional[Type[CompileState]]: 

743 try: 

744 return cls.plugins[ 

745 (plugin_name, statement._effective_plugin_target) 

746 ] 

747 except KeyError: 

748 return None 

749 

750 @classmethod 

751 def plugin_for( 

752 cls, plugin_name: str, visit_name: str 

753 ) -> Callable[[_Fn], _Fn]: 

754 def decorate(cls_to_decorate): 

755 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate 

756 return cls_to_decorate 

757 

758 return decorate 

759 

760 

761class Generative(HasMemoized): 

762 """Provide a method-chaining pattern in conjunction with the 

763 @_generative decorator.""" 

764 

765 def _generate(self) -> Self: 

766 skip = self._memoized_keys 

767 cls = self.__class__ 

768 s = cls.__new__(cls) 

769 if skip: 

770 # ensure this iteration remains atomic 

771 s.__dict__ = { 

772 k: v for k, v in self.__dict__.copy().items() if k not in skip 

773 } 

774 else: 

775 s.__dict__ = self.__dict__.copy() 

776 return s 

777 

778 

779class InPlaceGenerative(HasMemoized): 

780 """Provide a method-chaining pattern in conjunction with the 

781 @_generative decorator that mutates in place.""" 

782 

783 __slots__ = () 

784 

785 def _generate(self) -> Self: 

786 skip = self._memoized_keys 

787 # note __dict__ needs to be in __slots__ if this is used 

788 for k in skip: 

789 self.__dict__.pop(k, None) 

790 return self 

791 

792 

793class HasCompileState(Generative): 

794 """A class that has a :class:`.CompileState` associated with it.""" 

795 

796 _compile_state_plugin: Optional[Type[CompileState]] = None 

797 

798 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT 

799 

800 _compile_state_factory = CompileState.create_for_statement 

801 

802 

803class _MetaOptions(type): 

804 """metaclass for the Options class. 

805 

806 This metaclass is actually necessary despite the availability of the 

807 ``__init_subclass__()`` hook as this type also provides custom class-level 

808 behavior for the ``__add__()`` method. 

809 

810 """ 

811 

812 _cache_attrs: Tuple[str, ...] 

813 

814 def __add__(self, other): 

815 o1 = self() 

816 

817 if set(other).difference(self._cache_attrs): 

818 raise TypeError( 

819 "dictionary contains attributes not covered by " 

820 "Options class %s: %r" 

821 % (self, set(other).difference(self._cache_attrs)) 

822 ) 

823 

824 o1.__dict__.update(other) 

825 return o1 

826 

827 if TYPE_CHECKING: 

828 

829 def __getattr__(self, key: str) -> Any: ... 

830 

831 def __setattr__(self, key: str, value: Any) -> None: ... 

832 

833 def __delattr__(self, key: str) -> None: ... 

834 

835 

836class Options(metaclass=_MetaOptions): 

837 """A cacheable option dictionary with defaults.""" 

838 

839 __slots__ = () 

840 

841 _cache_attrs: Tuple[str, ...] 

842 

843 def __init_subclass__(cls) -> None: 

844 dict_ = cls.__dict__ 

845 cls._cache_attrs = tuple( 

846 sorted( 

847 d 

848 for d in dict_ 

849 if not d.startswith("__") 

850 and d not in ("_cache_key_traversal",) 

851 ) 

852 ) 

853 super().__init_subclass__() 

854 

855 def __init__(self, **kw: Any) -> None: 

856 self.__dict__.update(kw) 

857 

858 def __add__(self, other): 

859 o1 = self.__class__.__new__(self.__class__) 

860 o1.__dict__.update(self.__dict__) 

861 

862 if set(other).difference(self._cache_attrs): 

863 raise TypeError( 

864 "dictionary contains attributes not covered by " 

865 "Options class %s: %r" 

866 % (self, set(other).difference(self._cache_attrs)) 

867 ) 

868 

869 o1.__dict__.update(other) 

870 return o1 

871 

872 def __eq__(self, other): 

873 # TODO: very inefficient. This is used only in test suites 

874 # right now. 

875 for a, b in zip_longest(self._cache_attrs, other._cache_attrs): 

876 if getattr(self, a) != getattr(other, b): 

877 return False 

878 return True 

879 

880 def __repr__(self) -> str: 

881 # TODO: fairly inefficient, used only in debugging right now. 

882 

883 return "%s(%s)" % ( 

884 self.__class__.__name__, 

885 ", ".join( 

886 "%s=%r" % (k, self.__dict__[k]) 

887 for k in self._cache_attrs 

888 if k in self.__dict__ 

889 ), 

890 ) 

891 

892 @classmethod 

893 def isinstance(cls, klass: Type[Any]) -> bool: 

894 return issubclass(cls, klass) 

895 

896 @hybridmethod 

897 def add_to_element(self, name: str, value: str) -> Any: 

898 return self + {name: getattr(self, name) + value} 

899 

900 @hybridmethod 

901 def _state_dict_inst(self) -> Mapping[str, Any]: 

902 return self.__dict__ 

903 

904 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT 

905 

906 @_state_dict_inst.classlevel 

907 def _state_dict(cls) -> Mapping[str, Any]: 

908 return cls._state_dict_const 

909 

910 @classmethod 

911 def safe_merge(cls, other: "Options") -> Any: 

912 d = other._state_dict() 

913 

914 # only support a merge with another object of our class 

915 # and which does not have attrs that we don't. otherwise 

916 # we risk having state that might not be part of our cache 

917 # key strategy 

918 

919 if ( 

920 cls is not other.__class__ 

921 and other._cache_attrs 

922 and set(other._cache_attrs).difference(cls._cache_attrs) 

923 ): 

924 raise TypeError( 

925 "other element %r is not empty, is not of type %s, " 

926 "and contains attributes not covered here %r" 

927 % ( 

928 other, 

929 cls, 

930 set(other._cache_attrs).difference(cls._cache_attrs), 

931 ) 

932 ) 

933 return cls + d 

934 

935 @classmethod 

936 def from_execution_options( 

937 cls, 

938 key: str, 

939 attrs: set[str], 

940 exec_options: Mapping[str, Any], 

941 statement_exec_options: Mapping[str, Any], 

942 ) -> Tuple["Options", Mapping[str, Any]]: 

943 """process Options argument in terms of execution options. 

944 

945 

946 e.g.:: 

947 

948 ( 

949 load_options, 

950 execution_options, 

951 ) = QueryContext.default_load_options.from_execution_options( 

952 "_sa_orm_load_options", 

953 {"populate_existing", "autoflush", "yield_per"}, 

954 execution_options, 

955 statement._execution_options, 

956 ) 

957 

958 get back the Options and refresh "_sa_orm_load_options" in the 

959 exec options dict w/ the Options as well 

960 

961 """ 

962 

963 # common case is that no options we are looking for are 

964 # in either dictionary, so cancel for that first 

965 check_argnames = attrs.intersection( 

966 set(exec_options).union(statement_exec_options) 

967 ) 

968 

969 existing_options = exec_options.get(key, cls) 

970 

971 if check_argnames: 

972 result = {} 

973 for argname in check_argnames: 

974 local = "_" + argname 

975 if argname in exec_options: 

976 result[local] = exec_options[argname] 

977 elif argname in statement_exec_options: 

978 result[local] = statement_exec_options[argname] 

979 

980 new_options = existing_options + result 

981 exec_options = util.immutabledict().merge_with( 

982 exec_options, {key: new_options} 

983 ) 

984 return new_options, exec_options 

985 

986 else: 

987 return existing_options, exec_options 

988 

989 if TYPE_CHECKING: 

990 

991 def __getattr__(self, key: str) -> Any: ... 

992 

993 def __setattr__(self, key: str, value: Any) -> None: ... 

994 

995 def __delattr__(self, key: str) -> None: ... 

996 

997 

998class CacheableOptions(Options, HasCacheKey): 

999 __slots__ = () 

1000 

1001 @hybridmethod 

1002 def _gen_cache_key_inst( 

1003 self, anon_map: Any, bindparams: List[BindParameter[Any]] 

1004 ) -> Optional[Tuple[Any]]: 

1005 return HasCacheKey._gen_cache_key(self, anon_map, bindparams) 

1006 

1007 @_gen_cache_key_inst.classlevel 

1008 def _gen_cache_key( 

1009 cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]] 

1010 ) -> Tuple[CacheableOptions, Any]: 

1011 return (cls, ()) 

1012 

1013 @hybridmethod 

1014 def _generate_cache_key(self) -> Optional[CacheKey]: 

1015 return HasCacheKey._generate_cache_key(self) 

1016 

1017 

1018class ExecutableOption(HasCopyInternals): 

1019 __slots__ = () 

1020 

1021 _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT 

1022 

1023 __visit_name__: str = "executable_option" 

1024 

1025 _is_has_cache_key: bool = False 

1026 

1027 _is_core: bool = True 

1028 

1029 def _clone(self, **kw): 

1030 """Create a shallow copy of this ExecutableOption.""" 

1031 c = self.__class__.__new__(self.__class__) 

1032 c.__dict__ = dict(self.__dict__) # type: ignore 

1033 return c 

1034 

1035 

1036_L = TypeVar("_L", bound=str) 

1037 

1038 

1039class HasSyntaxExtensions(Generic[_L]): 

1040 

1041 _position_map: Mapping[_L, str] 

1042 

1043 @_generative 

1044 def ext(self, extension: SyntaxExtension) -> Self: 

1045 """Applies a SQL syntax extension to this statement. 

1046 

1047 SQL syntax extensions are :class:`.ClauseElement` objects that define 

1048 some vendor-specific syntactical construct that take place in specific 

1049 parts of a SQL statement. Examples include vendor extensions like 

1050 PostgreSQL / SQLite's "ON DUPLICATE KEY UPDATE", PostgreSQL's 

1051 "DISTINCT ON", and MySQL's "LIMIT" that can be applied to UPDATE 

1052 and DELETE statements. 

1053 

1054 .. seealso:: 

1055 

1056 :ref:`examples_syntax_extensions` 

1057 

1058 :func:`_mysql.limit` - DML LIMIT for MySQL 

1059 

1060 :func:`_postgresql.distinct_on` - DISTINCT ON for PostgreSQL 

1061 

1062 .. versionadded:: 2.1 

1063 

1064 """ 

1065 extension = coercions.expect( 

1066 roles.SyntaxExtensionRole, extension, apply_propagate_attrs=self 

1067 ) 

1068 self._apply_syntax_extension_to_self(extension) 

1069 return self 

1070 

1071 @util.preload_module("sqlalchemy.sql.elements") 

1072 def apply_syntax_extension_point( 

1073 self, 

1074 apply_fn: Callable[[Sequence[ClauseElement]], Sequence[ClauseElement]], 

1075 position: _L, 

1076 ) -> None: 

1077 """Apply a :class:`.SyntaxExtension` to a known extension point. 

1078 

1079 Should be used only internally by :class:`.SyntaxExtension`. 

1080 

1081 E.g.:: 

1082 

1083 class Qualify(SyntaxExtension, ClauseElement): 

1084 

1085 # ... 

1086 

1087 def apply_to_select(self, select_stmt: Select) -> None: 

1088 # append self to existing 

1089 select_stmt.apply_extension_point( 

1090 lambda existing: [*existing, self], "post_criteria" 

1091 ) 

1092 

1093 

1094 class ReplaceExt(SyntaxExtension, ClauseElement): 

1095 

1096 # ... 

1097 

1098 def apply_to_select(self, select_stmt: Select) -> None: 

1099 # replace any existing elements regardless of type 

1100 select_stmt.apply_extension_point( 

1101 lambda existing: [self], "post_criteria" 

1102 ) 

1103 

1104 

1105 class ReplaceOfTypeExt(SyntaxExtension, ClauseElement): 

1106 

1107 # ... 

1108 

1109 def apply_to_select(self, select_stmt: Select) -> None: 

1110 # replace any existing elements of the same type 

1111 select_stmt.apply_extension_point( 

1112 self.append_replacing_same_type, "post_criteria" 

1113 ) 

1114 

1115 :param apply_fn: callable function that will receive a sequence of 

1116 :class:`.ClauseElement` that is already populating the extension 

1117 point (the sequence is empty if there isn't one), and should return 

1118 a new sequence of :class:`.ClauseElement` that will newly populate 

1119 that point. The function typically can choose to concatenate the 

1120 existing values with the new one, or to replace the values that are 

1121 there with a new one by returning a list of a single element, or 

1122 to perform more complex operations like removing only the same 

1123 type element from the input list of merging already existing elements 

1124 of the same type. Some examples are shown in the examples above 

1125 :param position: string name of the position to apply to. This 

1126 varies per statement type. IDEs should show the possible values 

1127 for each statement type as it's typed with a ``typing.Literal`` per 

1128 statement. 

1129 

1130 .. seealso:: 

1131 

1132 :ref:`examples_syntax_extensions` 

1133 

1134 

1135 """ # noqa: E501 

1136 

1137 try: 

1138 attrname = self._position_map[position] 

1139 except KeyError as ke: 

1140 raise ValueError( 

1141 f"Unknown position {position!r} for {self.__class__} " 

1142 f"construct; known positions: " 

1143 f"{', '.join(repr(k) for k in self._position_map)}" 

1144 ) from ke 

1145 else: 

1146 ElementList = util.preloaded.sql_elements.ElementList 

1147 existing: Optional[ClauseElement] = getattr(self, attrname, None) 

1148 if existing is None: 

1149 input_seq: Tuple[ClauseElement, ...] = () 

1150 elif isinstance(existing, ElementList): 

1151 input_seq = existing.clauses 

1152 else: 

1153 input_seq = (existing,) 

1154 

1155 new_seq = apply_fn(input_seq) 

1156 assert new_seq, "cannot return empty sequence" 

1157 new = new_seq[0] if len(new_seq) == 1 else ElementList(new_seq) 

1158 setattr(self, attrname, new) 

1159 

1160 def _apply_syntax_extension_to_self( 

1161 self, extension: SyntaxExtension 

1162 ) -> None: 

1163 raise NotImplementedError() 

1164 

1165 def _get_syntax_extensions_as_dict(self) -> Mapping[_L, SyntaxExtension]: 

1166 res: Dict[_L, SyntaxExtension] = {} 

1167 for name, attr in self._position_map.items(): 

1168 value = getattr(self, attr) 

1169 if value is not None: 

1170 res[name] = value 

1171 return res 

1172 

1173 def _set_syntax_extensions(self, **extensions: SyntaxExtension) -> None: 

1174 for name, value in extensions.items(): 

1175 setattr(self, self._position_map[name], value) # type: ignore[index] # noqa: E501 

1176 

1177 

1178class SyntaxExtension(roles.SyntaxExtensionRole): 

1179 """Defines a unit that when also extending from :class:`.ClauseElement` 

1180 can be applied to SQLAlchemy statements :class:`.Select`, 

1181 :class:`_sql.Insert`, :class:`.Update` and :class:`.Delete` making use of 

1182 pre-established SQL insertion points within these constructs. 

1183 

1184 .. versionadded:: 2.1 

1185 

1186 .. seealso:: 

1187 

1188 :ref:`examples_syntax_extensions` 

1189 

1190 """ 

1191 

1192 def append_replacing_same_type( 

1193 self, existing: Sequence[ClauseElement] 

1194 ) -> Sequence[ClauseElement]: 

1195 """Utility function that can be used as 

1196 :paramref:`_sql.HasSyntaxExtensions.apply_extension_point.apply_fn` 

1197 to remove any other element of the same type in existing and appending 

1198 ``self`` to the list. 

1199 

1200 This is equivalent to:: 

1201 

1202 stmt.apply_extension_point( 

1203 lambda existing: [ 

1204 *(e for e in existing if not isinstance(e, ReplaceOfTypeExt)), 

1205 self, 

1206 ], 

1207 "post_criteria", 

1208 ) 

1209 

1210 .. seealso:: 

1211 

1212 :ref:`examples_syntax_extensions` 

1213 

1214 :meth:`_sql.HasSyntaxExtensions.apply_syntax_extension_point` 

1215 

1216 """ # noqa: E501 

1217 cls = type(self) 

1218 return [*(e for e in existing if not isinstance(e, cls)), self] # type: ignore[list-item] # noqa: E501 

1219 

1220 def apply_to_select(self, select_stmt: Select[Unpack[_Ts]]) -> None: 

1221 """Apply this :class:`.SyntaxExtension` to a :class:`.Select`""" 

1222 raise NotImplementedError( 

1223 f"Extension {type(self).__name__} cannot be applied to select" 

1224 ) 

1225 

1226 def apply_to_update(self, update_stmt: Update) -> None: 

1227 """Apply this :class:`.SyntaxExtension` to an :class:`.Update`""" 

1228 raise NotImplementedError( 

1229 f"Extension {type(self).__name__} cannot be applied to update" 

1230 ) 

1231 

1232 def apply_to_delete(self, delete_stmt: Delete) -> None: 

1233 """Apply this :class:`.SyntaxExtension` to a :class:`.Delete`""" 

1234 raise NotImplementedError( 

1235 f"Extension {type(self).__name__} cannot be applied to delete" 

1236 ) 

1237 

1238 def apply_to_insert(self, insert_stmt: Insert) -> None: 

1239 """Apply this :class:`.SyntaxExtension` to an :class:`_sql.Insert`""" 

1240 raise NotImplementedError( 

1241 f"Extension {type(self).__name__} cannot be applied to insert" 

1242 ) 

1243 

1244 

1245class Executable(roles.StatementRole): 

1246 """Mark a :class:`_expression.ClauseElement` as supporting execution. 

1247 

1248 :class:`.Executable` is a superclass for all "statement" types 

1249 of objects, including :func:`select`, :func:`delete`, :func:`update`, 

1250 :func:`insert`, :func:`text`. 

1251 

1252 """ 

1253 

1254 supports_execution: bool = True 

1255 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT 

1256 _is_default_generator: bool = False 

1257 _with_options: Tuple[ExecutableOption, ...] = () 

1258 _compile_state_funcs: Tuple[ 

1259 Tuple[Callable[[CompileState], None], Any], ... 

1260 ] = () 

1261 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]] 

1262 

1263 _executable_traverse_internals = [ 

1264 ("_with_options", InternalTraversal.dp_executable_options), 

1265 ( 

1266 "_compile_state_funcs", 

1267 ExtendedInternalTraversal.dp_compile_state_funcs, 

1268 ), 

1269 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs), 

1270 ] 

1271 

1272 is_select: bool = False 

1273 is_from_statement: bool = False 

1274 is_update: bool = False 

1275 is_insert: bool = False 

1276 is_text: bool = False 

1277 is_delete: bool = False 

1278 is_dml: bool = False 

1279 

1280 if TYPE_CHECKING: 

1281 __visit_name__: str 

1282 

1283 def _compile_w_cache( 

1284 self, 

1285 dialect: Dialect, 

1286 *, 

1287 compiled_cache: Optional[CompiledCacheType], 

1288 column_keys: List[str], 

1289 for_executemany: bool = False, 

1290 schema_translate_map: Optional[SchemaTranslateMapType] = None, 

1291 **kw: Any, 

1292 ) -> tuple[ 

1293 Compiled, 

1294 Sequence[BindParameter[Any]] | None, 

1295 _CoreSingleExecuteParams | None, 

1296 CacheStats, 

1297 ]: ... 

1298 

1299 def _execute_on_connection( 

1300 self, 

1301 connection: Connection, 

1302 distilled_params: _CoreMultiExecuteParams, 

1303 execution_options: CoreExecuteOptionsParameter, 

1304 ) -> CursorResult[Any]: ... 

1305 

1306 def _execute_on_scalar( 

1307 self, 

1308 connection: Connection, 

1309 distilled_params: _CoreMultiExecuteParams, 

1310 execution_options: CoreExecuteOptionsParameter, 

1311 ) -> Any: ... 

1312 

1313 @util.ro_non_memoized_property 

1314 def _all_selected_columns(self) -> _SelectIterable: 

1315 raise NotImplementedError() 

1316 

1317 @property 

1318 def _effective_plugin_target(self) -> str: 

1319 return self.__visit_name__ 

1320 

1321 @_generative 

1322 def options(self, *options: ExecutableOption) -> Self: 

1323 """Apply options to this statement. 

1324 

1325 In the general sense, options are any kind of Python object 

1326 that can be interpreted by systems that consume the statement outside 

1327 of the regular SQL compiler chain. Specifically, these options are 

1328 the ORM level options that apply "eager load" and other loading 

1329 behaviors to an ORM query. 

1330 

1331 For background on specific kinds of options for specific kinds of 

1332 statements, refer to the documentation for those option objects. 

1333 

1334 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to 

1335 Core statement objects towards the goal of allowing unified 

1336 Core / ORM querying capabilities. 

1337 

1338 .. seealso:: 

1339 

1340 :ref:`loading_columns` - refers to options specific to the usage 

1341 of ORM queries 

1342 

1343 :ref:`relationship_loader_options` - refers to options specific 

1344 to the usage of ORM queries 

1345 

1346 """ 

1347 self._with_options += tuple( 

1348 coercions.expect(roles.ExecutableOptionRole, opt) 

1349 for opt in options 

1350 ) 

1351 return self 

1352 

1353 @_generative 

1354 def _set_compile_options(self, compile_options: CacheableOptions) -> Self: 

1355 """Assign the compile options to a new value. 

1356 

1357 :param compile_options: appropriate CacheableOptions structure 

1358 

1359 """ 

1360 

1361 self._compile_options = compile_options 

1362 return self 

1363 

1364 @_generative 

1365 def _update_compile_options(self, options: CacheableOptions) -> Self: 

1366 """update the _compile_options with new keys.""" 

1367 

1368 assert self._compile_options is not None 

1369 self._compile_options += options 

1370 return self 

1371 

1372 @_generative 

1373 def _add_compile_state_func( 

1374 self, 

1375 callable_: Callable[[CompileState], None], 

1376 cache_args: Any, 

1377 ) -> Self: 

1378 """Add a compile state function to this statement. 

1379 

1380 When using the ORM only, these are callable functions that will 

1381 be given the CompileState object upon compilation. 

1382 

1383 A second argument cache_args is required, which will be combined with 

1384 the ``__code__`` identity of the function itself in order to produce a 

1385 cache key. 

1386 

1387 """ 

1388 self._compile_state_funcs += ((callable_, cache_args),) 

1389 return self 

1390 

1391 @overload 

1392 def execution_options( 

1393 self, 

1394 *, 

1395 compiled_cache: Optional[CompiledCacheType] = ..., 

1396 logging_token: str = ..., 

1397 isolation_level: IsolationLevel = ..., 

1398 no_parameters: bool = False, 

1399 stream_results: bool = False, 

1400 max_row_buffer: int = ..., 

1401 yield_per: int = ..., 

1402 driver_column_names: bool = ..., 

1403 insertmanyvalues_page_size: int = ..., 

1404 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

1405 populate_existing: bool = False, 

1406 autoflush: bool = False, 

1407 synchronize_session: SynchronizeSessionArgument = ..., 

1408 dml_strategy: DMLStrategyArgument = ..., 

1409 render_nulls: bool = ..., 

1410 is_delete_using: bool = ..., 

1411 is_update_from: bool = ..., 

1412 preserve_rowcount: bool = False, 

1413 **opt: Any, 

1414 ) -> Self: ... 

1415 

1416 @overload 

1417 def execution_options(self, **opt: Any) -> Self: ... 

1418 

1419 @_generative 

1420 def execution_options(self, **kw: Any) -> Self: 

1421 """Set non-SQL options for the statement which take effect during 

1422 execution. 

1423 

1424 Execution options can be set at many scopes, including per-statement, 

1425 per-connection, or per execution, using methods such as 

1426 :meth:`_engine.Connection.execution_options` and parameters which 

1427 accept a dictionary of options such as 

1428 :paramref:`_engine.Connection.execute.execution_options` and 

1429 :paramref:`_orm.Session.execute.execution_options`. 

1430 

1431 The primary characteristic of an execution option, as opposed to 

1432 other kinds of options such as ORM loader options, is that 

1433 **execution options never affect the compiled SQL of a query, only 

1434 things that affect how the SQL statement itself is invoked or how 

1435 results are fetched**. That is, execution options are not part of 

1436 what's accommodated by SQL compilation nor are they considered part of 

1437 the cached state of a statement. 

1438 

1439 The :meth:`_sql.Executable.execution_options` method is 

1440 :term:`generative`, as 

1441 is the case for the method as applied to the :class:`_engine.Engine` 

1442 and :class:`_orm.Query` objects, which means when the method is called, 

1443 a copy of the object is returned, which applies the given parameters to 

1444 that new copy, but leaves the original unchanged:: 

1445 

1446 statement = select(table.c.x, table.c.y) 

1447 new_statement = statement.execution_options(my_option=True) 

1448 

1449 An exception to this behavior is the :class:`_engine.Connection` 

1450 object, where the :meth:`_engine.Connection.execution_options` method 

1451 is explicitly **not** generative. 

1452 

1453 The kinds of options that may be passed to 

1454 :meth:`_sql.Executable.execution_options` and other related methods and 

1455 parameter dictionaries include parameters that are explicitly consumed 

1456 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not 

1457 defined by SQLAlchemy, which means the methods and/or parameter 

1458 dictionaries may be used for user-defined parameters that interact with 

1459 custom code, which may access the parameters using methods such as 

1460 :meth:`_sql.Executable.get_execution_options` and 

1461 :meth:`_engine.Connection.get_execution_options`, or within selected 

1462 event hooks using a dedicated ``execution_options`` event parameter 

1463 such as 

1464 :paramref:`_events.ConnectionEvents.before_execute.execution_options` 

1465 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.:: 

1466 

1467 from sqlalchemy import event 

1468 

1469 

1470 @event.listens_for(some_engine, "before_execute") 

1471 def _process_opt(conn, statement, multiparams, params, execution_options): 

1472 "run a SQL function before invoking a statement" 

1473 

1474 if execution_options.get("do_special_thing", False): 

1475 conn.exec_driver_sql("run_special_function()") 

1476 

1477 Within the scope of options that are explicitly recognized by 

1478 SQLAlchemy, most apply to specific classes of objects and not others. 

1479 The most common execution options include: 

1480 

1481 * :paramref:`_engine.Connection.execution_options.isolation_level` - 

1482 sets the isolation level for a connection or a class of connections 

1483 via an :class:`_engine.Engine`. This option is accepted only 

1484 by :class:`_engine.Connection` or :class:`_engine.Engine`. 

1485 

1486 * :paramref:`_engine.Connection.execution_options.stream_results` - 

1487 indicates results should be fetched using a server side cursor; 

1488 this option is accepted by :class:`_engine.Connection`, by the 

1489 :paramref:`_engine.Connection.execute.execution_options` parameter 

1490 on :meth:`_engine.Connection.execute`, and additionally by 

1491 :meth:`_sql.Executable.execution_options` on a SQL statement object, 

1492 as well as by ORM constructs like :meth:`_orm.Session.execute`. 

1493 

1494 * :paramref:`_engine.Connection.execution_options.compiled_cache` - 

1495 indicates a dictionary that will serve as the 

1496 :ref:`SQL compilation cache <sql_caching>` 

1497 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as 

1498 well as for ORM methods like :meth:`_orm.Session.execute`. 

1499 Can be passed as ``None`` to disable caching for statements. 

1500 This option is not accepted by 

1501 :meth:`_sql.Executable.execution_options` as it is inadvisable to 

1502 carry along a compilation cache within a statement object. 

1503 

1504 * :paramref:`_engine.Connection.execution_options.schema_translate_map` 

1505 - a mapping of schema names used by the 

1506 :ref:`Schema Translate Map <schema_translating>` feature, accepted 

1507 by :class:`_engine.Connection`, :class:`_engine.Engine`, 

1508 :class:`_sql.Executable`, as well as by ORM constructs 

1509 like :meth:`_orm.Session.execute`. 

1510 

1511 .. seealso:: 

1512 

1513 :meth:`_engine.Connection.execution_options` 

1514 

1515 :paramref:`_engine.Connection.execute.execution_options` 

1516 

1517 :paramref:`_orm.Session.execute.execution_options` 

1518 

1519 :ref:`orm_queryguide_execution_options` - documentation on all 

1520 ORM-specific execution options 

1521 

1522 """ # noqa: E501 

1523 if "isolation_level" in kw: 

1524 raise exc.ArgumentError( 

1525 "'isolation_level' execution option may only be specified " 

1526 "on Connection.execution_options(), or " 

1527 "per-engine using the isolation_level " 

1528 "argument to create_engine()." 

1529 ) 

1530 if "compiled_cache" in kw: 

1531 raise exc.ArgumentError( 

1532 "'compiled_cache' execution option may only be specified " 

1533 "on Connection.execution_options(), not per statement." 

1534 ) 

1535 self._execution_options = self._execution_options.union(kw) 

1536 return self 

1537 

1538 def get_execution_options(self) -> _ExecuteOptions: 

1539 """Get the non-SQL options which will take effect during execution. 

1540 

1541 .. seealso:: 

1542 

1543 :meth:`.Executable.execution_options` 

1544 """ 

1545 return self._execution_options 

1546 

1547 

1548class ExecutableStatement(Executable): 

1549 """Executable subclass that implements a lightweight version of ``params`` 

1550 that avoids a full cloned traverse. 

1551 

1552 .. versionadded:: 2.1 

1553 

1554 """ 

1555 

1556 _params: util.immutabledict[str, Any] = EMPTY_DICT 

1557 

1558 _executable_traverse_internals = ( 

1559 Executable._executable_traverse_internals 

1560 + [("_params", InternalTraversal.dp_params)] 

1561 ) 

1562 

1563 @_generative 

1564 def params( 

1565 self, 

1566 __optionaldict: _CoreSingleExecuteParams | None = None, 

1567 /, 

1568 **kwargs: Any, 

1569 ) -> Self: 

1570 """Return a copy with the provided bindparam values. 

1571 

1572 Returns a copy of this Executable with bindparam values set 

1573 to the given dictionary:: 

1574 

1575 >>> clause = column("x") + bindparam("foo") 

1576 >>> print(clause.compile().params) 

1577 {'foo': None} 

1578 >>> print(clause.params({"foo": 7}).compile().params) 

1579 {'foo': 7} 

1580 

1581 """ 

1582 if __optionaldict: 

1583 kwargs.update(__optionaldict) 

1584 self._params = ( 

1585 util.immutabledict(kwargs) 

1586 if not self._params 

1587 else self._params | kwargs 

1588 ) 

1589 return self 

1590 

1591 

1592class SchemaEventTarget(event.EventTarget): 

1593 """Base class for elements that are the targets of :class:`.DDLEvents` 

1594 events. 

1595 

1596 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`. 

1597 

1598 """ 

1599 

1600 dispatch: dispatcher[SchemaEventTarget] 

1601 

1602 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: 

1603 """Associate with this SchemaEvent's parent object.""" 

1604 

1605 def _set_parent_with_dispatch( 

1606 self, parent: SchemaEventTarget, **kw: Any 

1607 ) -> None: 

1608 self.dispatch.before_parent_attach(self, parent) 

1609 self._set_parent(parent, **kw) 

1610 self.dispatch.after_parent_attach(self, parent) 

1611 

1612 

1613class SchemaVisitable(SchemaEventTarget, visitors.Visitable): 

1614 """Base class for elements that are targets of a :class:`.SchemaVisitor`. 

1615 

1616 .. versionadded:: 2.0.41 

1617 

1618 """ 

1619 

1620 

1621class SchemaVisitor(ClauseVisitor): 

1622 """Define the visiting for ``SchemaItem`` and more 

1623 generally ``SchemaVisitable`` objects. 

1624 

1625 """ 

1626 

1627 __traverse_options__: Dict[str, Any] = {"schema_visitor": True} 

1628 

1629 

1630class _SentinelDefaultCharacterization(Enum): 

1631 NONE = "none" 

1632 UNKNOWN = "unknown" 

1633 CLIENTSIDE = "clientside" 

1634 SENTINEL_DEFAULT = "sentinel_default" 

1635 SERVERSIDE = "serverside" 

1636 IDENTITY = "identity" 

1637 SEQUENCE = "sequence" 

1638 

1639 

1640class _SentinelColumnCharacterization(NamedTuple): 

1641 columns: Optional[Sequence[Column[Any]]] = None 

1642 is_explicit: bool = False 

1643 is_autoinc: bool = False 

1644 default_characterization: _SentinelDefaultCharacterization = ( 

1645 _SentinelDefaultCharacterization.NONE 

1646 ) 

1647 

1648 

1649_COLKEY = TypeVar("_COLKEY", Union[None, str], str) 

1650 

1651_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True) 

1652_COL = TypeVar("_COL", bound="ColumnElement[Any]") 

1653 

1654 

1655class _ColumnMetrics(Generic[_COL_co]): 

1656 __slots__ = ("column",) 

1657 

1658 column: _COL_co 

1659 

1660 def __init__( 

1661 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co 

1662 ) -> None: 

1663 self.column = col 

1664 

1665 # proxy_index being non-empty means it was initialized. 

1666 # so we need to update it 

1667 pi = collection._proxy_index 

1668 if pi: 

1669 for eps_col in col._expanded_proxy_set: 

1670 pi[eps_col].add(self) 

1671 

1672 def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]: 

1673 return self.column._expanded_proxy_set 

1674 

1675 def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None: 

1676 pi = collection._proxy_index 

1677 if not pi: 

1678 return 

1679 for col in self.column._expanded_proxy_set: 

1680 colset = pi.get(col, None) 

1681 if colset: 

1682 colset.discard(self) 

1683 if colset is not None and not colset: 

1684 del pi[col] 

1685 

1686 def embedded( 

1687 self, 

1688 target_set: Union[ 

1689 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]] 

1690 ], 

1691 ) -> bool: 

1692 expanded_proxy_set = self.column._expanded_proxy_set 

1693 for t in target_set.difference(expanded_proxy_set): 

1694 if not expanded_proxy_set.intersection(_expand_cloned([t])): 

1695 return False 

1696 return True 

1697 

1698 

1699class ColumnCollection(Generic[_COLKEY, _COL_co]): 

1700 """Collection of :class:`_expression.ColumnElement` instances, 

1701 typically for 

1702 :class:`_sql.FromClause` objects. 

1703 

1704 The :class:`_sql.ColumnCollection` object is most commonly available 

1705 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection 

1706 on the :class:`_schema.Table` object, introduced at 

1707 :ref:`metadata_tables_and_columns`. 

1708 

1709 The :class:`_expression.ColumnCollection` has both mapping- and sequence- 

1710 like behaviors. A :class:`_expression.ColumnCollection` usually stores 

1711 :class:`_schema.Column` objects, which are then accessible both via mapping 

1712 style access as well as attribute access style. 

1713 

1714 To access :class:`_schema.Column` objects using ordinary attribute-style 

1715 access, specify the name like any other object attribute, such as below 

1716 a column named ``employee_name`` is accessed:: 

1717 

1718 >>> employee_table.c.employee_name 

1719 

1720 To access columns that have names with special characters or spaces, 

1721 index-style access is used, such as below which illustrates a column named 

1722 ``employee ' payment`` is accessed:: 

1723 

1724 >>> employee_table.c["employee ' payment"] 

1725 

1726 As the :class:`_sql.ColumnCollection` object provides a Python dictionary 

1727 interface, common dictionary method names like 

1728 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`, 

1729 and :meth:`_sql.ColumnCollection.items` are available, which means that 

1730 database columns that are keyed under these names also need to use indexed 

1731 access:: 

1732 

1733 >>> employee_table.c["values"] 

1734 

1735 

1736 The name for which a :class:`_schema.Column` would be present is normally 

1737 that of the :paramref:`_schema.Column.key` parameter. In some contexts, 

1738 such as a :class:`_sql.Select` object that uses a label style set 

1739 using the :meth:`_sql.Select.set_label_style` method, a column of a certain 

1740 key may instead be represented under a particular label name such 

1741 as ``tablename_columnname``:: 

1742 

1743 >>> from sqlalchemy import select, column, table 

1744 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL 

1745 >>> t = table("t", column("c")) 

1746 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) 

1747 >>> subq = stmt.subquery() 

1748 >>> subq.c.t_c 

1749 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c> 

1750 

1751 :class:`.ColumnCollection` also indexes the columns in order and allows 

1752 them to be accessible by their integer position:: 

1753 

1754 >>> cc[0] 

1755 Column('x', Integer(), table=None) 

1756 >>> cc[1] 

1757 Column('y', Integer(), table=None) 

1758 

1759 .. versionadded:: 1.4 :class:`_expression.ColumnCollection` 

1760 allows integer-based 

1761 index access to the collection. 

1762 

1763 Iterating the collection yields the column expressions in order:: 

1764 

1765 >>> list(cc) 

1766 [Column('x', Integer(), table=None), 

1767 Column('y', Integer(), table=None)] 

1768 

1769 The base :class:`_expression.ColumnCollection` object can store 

1770 duplicates, which can 

1771 mean either two columns with the same key, in which case the column 

1772 returned by key access is **arbitrary**:: 

1773 

1774 >>> x1, x2 = Column("x", Integer), Column("x", Integer) 

1775 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)]) 

1776 >>> list(cc) 

1777 [Column('x', Integer(), table=None), 

1778 Column('x', Integer(), table=None)] 

1779 >>> cc["x"] is x1 

1780 False 

1781 >>> cc["x"] is x2 

1782 True 

1783 

1784 Or it can also mean the same column multiple times. These cases are 

1785 supported as :class:`_expression.ColumnCollection` 

1786 is used to represent the columns in 

1787 a SELECT statement which may include duplicates. 

1788 

1789 A special subclass :class:`.DedupeColumnCollection` exists which instead 

1790 maintains SQLAlchemy's older behavior of not allowing duplicates; this 

1791 collection is used for schema level objects like :class:`_schema.Table` 

1792 and 

1793 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The 

1794 :class:`.DedupeColumnCollection` class also has additional mutation methods 

1795 as the schema constructs have more use cases that require removal and 

1796 replacement of columns. 

1797 

1798 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection` 

1799 now stores duplicate 

1800 column keys as well as the same column in multiple positions. The 

1801 :class:`.DedupeColumnCollection` class is added to maintain the 

1802 former behavior in those cases where deduplication as well as 

1803 additional replace/remove operations are needed. 

1804 

1805 

1806 """ 

1807 

1808 __slots__ = ("_collection", "_index", "_colset", "_proxy_index") 

1809 

1810 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]] 

1811 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]] 

1812 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]] 

1813 _colset: Set[_COL_co] 

1814 

1815 def __init__( 

1816 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None 

1817 ): 

1818 object.__setattr__(self, "_colset", set()) 

1819 object.__setattr__(self, "_index", {}) 

1820 object.__setattr__( 

1821 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

1822 ) 

1823 object.__setattr__(self, "_collection", []) 

1824 if columns: 

1825 self._initial_populate(columns) 

1826 

1827 @util.preload_module("sqlalchemy.sql.elements") 

1828 def __clause_element__(self) -> ClauseList: 

1829 elements = util.preloaded.sql_elements 

1830 

1831 return elements.ClauseList( 

1832 _literal_as_text_role=roles.ColumnsClauseRole, 

1833 group=False, 

1834 *self._all_columns, 

1835 ) 

1836 

1837 def _initial_populate( 

1838 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1839 ) -> None: 

1840 self._populate_separate_keys(iter_) 

1841 

1842 @property 

1843 def _all_columns(self) -> List[_COL_co]: 

1844 return [col for (_, col, _) in self._collection] 

1845 

1846 def keys(self) -> List[_COLKEY]: 

1847 """Return a sequence of string key names for all columns in this 

1848 collection.""" 

1849 return [k for (k, _, _) in self._collection] 

1850 

1851 def values(self) -> List[_COL_co]: 

1852 """Return a sequence of :class:`_sql.ColumnClause` or 

1853 :class:`_schema.Column` objects for all columns in this 

1854 collection.""" 

1855 return [col for (_, col, _) in self._collection] 

1856 

1857 def items(self) -> List[Tuple[_COLKEY, _COL_co]]: 

1858 """Return a sequence of (key, column) tuples for all columns in this 

1859 collection each consisting of a string key name and a 

1860 :class:`_sql.ColumnClause` or 

1861 :class:`_schema.Column` object. 

1862 """ 

1863 

1864 return [(k, col) for (k, col, _) in self._collection] 

1865 

1866 def __bool__(self) -> bool: 

1867 return bool(self._collection) 

1868 

1869 def __len__(self) -> int: 

1870 return len(self._collection) 

1871 

1872 def __iter__(self) -> Iterator[_COL_co]: 

1873 # turn to a list first to maintain over a course of changes 

1874 return iter([col for _, col, _ in self._collection]) 

1875 

1876 @overload 

1877 def __getitem__(self, key: Union[str, int]) -> _COL_co: ... 

1878 

1879 @overload 

1880 def __getitem__( 

1881 self, key: Tuple[Union[str, int], ...] 

1882 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1883 

1884 @overload 

1885 def __getitem__( 

1886 self, key: slice 

1887 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1888 

1889 def __getitem__( 

1890 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]] 

1891 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]: 

1892 try: 

1893 if isinstance(key, (tuple, slice)): 

1894 if isinstance(key, slice): 

1895 cols = ( 

1896 (sub_key, col) 

1897 for (sub_key, col, _) in self._collection[key] 

1898 ) 

1899 else: 

1900 cols = (self._index[sub_key] for sub_key in key) 

1901 

1902 return ColumnCollection(cols).as_readonly() 

1903 else: 

1904 return self._index[key][1] 

1905 except KeyError as err: 

1906 if isinstance(err.args[0], int): 

1907 raise IndexError(err.args[0]) from err 

1908 else: 

1909 raise 

1910 

1911 def __getattr__(self, key: str) -> _COL_co: 

1912 try: 

1913 return self._index[key][1] 

1914 except KeyError as err: 

1915 raise AttributeError(key) from err 

1916 

1917 def __contains__(self, key: str) -> bool: 

1918 if key not in self._index: 

1919 if not isinstance(key, str): 

1920 raise exc.ArgumentError( 

1921 "__contains__ requires a string argument" 

1922 ) 

1923 return False 

1924 else: 

1925 return True 

1926 

1927 def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool: 

1928 """Compare this :class:`_expression.ColumnCollection` to another 

1929 based on the names of the keys""" 

1930 

1931 for l, r in zip_longest(self, other): 

1932 if l is not r: 

1933 return False 

1934 else: 

1935 return True 

1936 

1937 def __eq__(self, other: Any) -> bool: 

1938 return self.compare(other) 

1939 

1940 @overload 

1941 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ... 

1942 

1943 @overload 

1944 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ... 

1945 

1946 def get( 

1947 self, key: str, default: Optional[_COL] = None 

1948 ) -> Optional[Union[_COL_co, _COL]]: 

1949 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object 

1950 based on a string key name from this 

1951 :class:`_expression.ColumnCollection`.""" 

1952 

1953 if key in self._index: 

1954 return self._index[key][1] 

1955 else: 

1956 return default 

1957 

1958 def __str__(self) -> str: 

1959 return "%s(%s)" % ( 

1960 self.__class__.__name__, 

1961 ", ".join(str(c) for c in self), 

1962 ) 

1963 

1964 def __setitem__(self, key: str, value: Any) -> NoReturn: 

1965 raise NotImplementedError() 

1966 

1967 def __delitem__(self, key: str) -> NoReturn: 

1968 raise NotImplementedError() 

1969 

1970 def __setattr__(self, key: str, obj: Any) -> NoReturn: 

1971 raise NotImplementedError() 

1972 

1973 def clear(self) -> NoReturn: 

1974 """Dictionary clear() is not implemented for 

1975 :class:`_sql.ColumnCollection`.""" 

1976 raise NotImplementedError() 

1977 

1978 def remove(self, column: Any) -> NoReturn: 

1979 raise NotImplementedError() 

1980 

1981 def update(self, iter_: Any) -> NoReturn: 

1982 """Dictionary update() is not implemented for 

1983 :class:`_sql.ColumnCollection`.""" 

1984 raise NotImplementedError() 

1985 

1986 # https://github.com/python/mypy/issues/4266 

1987 __hash__: Optional[int] = None # type: ignore 

1988 

1989 def _populate_separate_keys( 

1990 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1991 ) -> None: 

1992 """populate from an iterator of (key, column)""" 

1993 

1994 self._collection[:] = collection = [ 

1995 (k, c, _ColumnMetrics(self, c)) for k, c in iter_ 

1996 ] 

1997 self._colset.update(c._deannotate() for _, c, _ in collection) 

1998 self._index.update( 

1999 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)} 

2000 ) 

2001 self._index.update({k: (k, col) for k, col, _ in reversed(collection)}) 

2002 

2003 def add( 

2004 self, 

2005 column: ColumnElement[Any], 

2006 key: Optional[_COLKEY] = None, 

2007 ) -> None: 

2008 """Add a column to this :class:`_sql.ColumnCollection`. 

2009 

2010 .. note:: 

2011 

2012 This method is **not normally used by user-facing code**, as the 

2013 :class:`_sql.ColumnCollection` is usually part of an existing 

2014 object such as a :class:`_schema.Table`. To add a 

2015 :class:`_schema.Column` to an existing :class:`_schema.Table` 

2016 object, use the :meth:`_schema.Table.append_column` method. 

2017 

2018 """ 

2019 colkey: _COLKEY 

2020 

2021 if key is None: 

2022 colkey = column.key # type: ignore 

2023 else: 

2024 colkey = key 

2025 

2026 l = len(self._collection) 

2027 

2028 # don't really know how this part is supposed to work w/ the 

2029 # covariant thing 

2030 

2031 _column = cast(_COL_co, column) 

2032 

2033 self._collection.append( 

2034 (colkey, _column, _ColumnMetrics(self, _column)) 

2035 ) 

2036 self._colset.add(_column._deannotate()) 

2037 

2038 self._index[l] = (colkey, _column) 

2039 if colkey not in self._index: 

2040 self._index[colkey] = (colkey, _column) 

2041 

2042 def __getstate__(self) -> Dict[str, Any]: 

2043 return { 

2044 "_collection": [(k, c) for k, c, _ in self._collection], 

2045 "_index": self._index, 

2046 } 

2047 

2048 def __setstate__(self, state: Dict[str, Any]) -> None: 

2049 object.__setattr__(self, "_index", state["_index"]) 

2050 object.__setattr__( 

2051 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

2052 ) 

2053 object.__setattr__( 

2054 self, 

2055 "_collection", 

2056 [ 

2057 (k, c, _ColumnMetrics(self, c)) 

2058 for (k, c) in state["_collection"] 

2059 ], 

2060 ) 

2061 object.__setattr__( 

2062 self, "_colset", {col for k, col, _ in self._collection} 

2063 ) 

2064 

2065 def contains_column(self, col: ColumnElement[Any]) -> bool: 

2066 """Checks if a column object exists in this collection""" 

2067 if col not in self._colset: 

2068 if isinstance(col, str): 

2069 raise exc.ArgumentError( 

2070 "contains_column cannot be used with string arguments. " 

2071 "Use ``col_name in table.c`` instead." 

2072 ) 

2073 return False 

2074 else: 

2075 return True 

2076 

2077 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: 

2078 """Return a "read only" form of this 

2079 :class:`_sql.ColumnCollection`.""" 

2080 

2081 return ReadOnlyColumnCollection(self) 

2082 

2083 def _init_proxy_index(self) -> None: 

2084 """populate the "proxy index", if empty. 

2085 

2086 proxy index is added in 2.0 to provide more efficient operation 

2087 for the corresponding_column() method. 

2088 

2089 For reasons of both time to construct new .c collections as well as 

2090 memory conservation for large numbers of large .c collections, the 

2091 proxy_index is only filled if corresponding_column() is called. once 

2092 filled it stays that way, and new _ColumnMetrics objects created after 

2093 that point will populate it with new data. Note this case would be 

2094 unusual, if not nonexistent, as it means a .c collection is being 

2095 mutated after corresponding_column() were used, however it is tested in 

2096 test/base/test_utils.py. 

2097 

2098 """ 

2099 pi = self._proxy_index 

2100 if pi: 

2101 return 

2102 

2103 for _, _, metrics in self._collection: 

2104 eps = metrics.column._expanded_proxy_set 

2105 

2106 for eps_col in eps: 

2107 pi[eps_col].add(metrics) 

2108 

2109 def corresponding_column( 

2110 self, column: _COL, require_embedded: bool = False 

2111 ) -> Optional[Union[_COL, _COL_co]]: 

2112 """Given a :class:`_expression.ColumnElement`, return the exported 

2113 :class:`_expression.ColumnElement` object from this 

2114 :class:`_expression.ColumnCollection` 

2115 which corresponds to that original :class:`_expression.ColumnElement` 

2116 via a common 

2117 ancestor column. 

2118 

2119 :param column: the target :class:`_expression.ColumnElement` 

2120 to be matched. 

2121 

2122 :param require_embedded: only return corresponding columns for 

2123 the given :class:`_expression.ColumnElement`, if the given 

2124 :class:`_expression.ColumnElement` 

2125 is actually present within a sub-element 

2126 of this :class:`_expression.Selectable`. 

2127 Normally the column will match if 

2128 it merely shares a common ancestor with one of the exported 

2129 columns of this :class:`_expression.Selectable`. 

2130 

2131 .. seealso:: 

2132 

2133 :meth:`_expression.Selectable.corresponding_column` 

2134 - invokes this method 

2135 against the collection returned by 

2136 :attr:`_expression.Selectable.exported_columns`. 

2137 

2138 .. versionchanged:: 1.4 the implementation for ``corresponding_column`` 

2139 was moved onto the :class:`_expression.ColumnCollection` itself. 

2140 

2141 """ 

2142 # TODO: cython candidate 

2143 

2144 # don't dig around if the column is locally present 

2145 if column in self._colset: 

2146 return column 

2147 

2148 selected_intersection, selected_metrics = None, None 

2149 target_set = column.proxy_set 

2150 

2151 pi = self._proxy_index 

2152 if not pi: 

2153 self._init_proxy_index() 

2154 

2155 for current_metrics in ( 

2156 mm for ts in target_set if ts in pi for mm in pi[ts] 

2157 ): 

2158 if not require_embedded or current_metrics.embedded(target_set): 

2159 if selected_metrics is None: 

2160 # no corresponding column yet, pick this one. 

2161 selected_metrics = current_metrics 

2162 continue 

2163 

2164 current_intersection = target_set.intersection( 

2165 current_metrics.column._expanded_proxy_set 

2166 ) 

2167 if selected_intersection is None: 

2168 selected_intersection = target_set.intersection( 

2169 selected_metrics.column._expanded_proxy_set 

2170 ) 

2171 

2172 if len(current_intersection) > len(selected_intersection): 

2173 # 'current' has a larger field of correspondence than 

2174 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x 

2175 # matches a1.c.x->table.c.x better than 

2176 # selectable.c.x->table.c.x does. 

2177 

2178 selected_metrics = current_metrics 

2179 selected_intersection = current_intersection 

2180 elif current_intersection == selected_intersection: 

2181 # they have the same field of correspondence. see 

2182 # which proxy_set has fewer columns in it, which 

2183 # indicates a closer relationship with the root 

2184 # column. Also take into account the "weight" 

2185 # attribute which CompoundSelect() uses to give 

2186 # higher precedence to columns based on vertical 

2187 # position in the compound statement, and discard 

2188 # columns that have no reference to the target 

2189 # column (also occurs with CompoundSelect) 

2190 

2191 selected_col_distance = sum( 

2192 [ 

2193 sc._annotations.get("weight", 1) 

2194 for sc in ( 

2195 selected_metrics.column._uncached_proxy_list() 

2196 ) 

2197 if sc.shares_lineage(column) 

2198 ], 

2199 ) 

2200 current_col_distance = sum( 

2201 [ 

2202 sc._annotations.get("weight", 1) 

2203 for sc in ( 

2204 current_metrics.column._uncached_proxy_list() 

2205 ) 

2206 if sc.shares_lineage(column) 

2207 ], 

2208 ) 

2209 if current_col_distance < selected_col_distance: 

2210 selected_metrics = current_metrics 

2211 selected_intersection = current_intersection 

2212 

2213 return selected_metrics.column if selected_metrics else None 

2214 

2215 

2216_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]") 

2217 

2218 

2219class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]): 

2220 """A :class:`_expression.ColumnCollection` 

2221 that maintains deduplicating behavior. 

2222 

2223 This is useful by schema level objects such as :class:`_schema.Table` and 

2224 :class:`.PrimaryKeyConstraint`. The collection includes more 

2225 sophisticated mutator methods as well to suit schema objects which 

2226 require mutable column collections. 

2227 

2228 .. versionadded:: 1.4 

2229 

2230 """ 

2231 

2232 def add( # type: ignore[override] 

2233 self, 

2234 column: _NAMEDCOL, 

2235 key: Optional[str] = None, 

2236 *, 

2237 index: Optional[int] = None, 

2238 ) -> None: 

2239 if key is not None and column.key != key: 

2240 raise exc.ArgumentError( 

2241 "DedupeColumnCollection requires columns be under " 

2242 "the same key as their .key" 

2243 ) 

2244 key = column.key 

2245 

2246 if key is None: 

2247 raise exc.ArgumentError( 

2248 "Can't add unnamed column to column collection" 

2249 ) 

2250 

2251 if key in self._index: 

2252 existing = self._index[key][1] 

2253 

2254 if existing is column: 

2255 return 

2256 

2257 self.replace(column, index=index) 

2258 

2259 # pop out memoized proxy_set as this 

2260 # operation may very well be occurring 

2261 # in a _make_proxy operation 

2262 util.memoized_property.reset(column, "proxy_set") 

2263 else: 

2264 self._append_new_column(key, column, index=index) 

2265 

2266 def _append_new_column( 

2267 self, key: str, named_column: _NAMEDCOL, *, index: Optional[int] = None 

2268 ) -> None: 

2269 collection_length = len(self._collection) 

2270 

2271 if index is None: 

2272 l = collection_length 

2273 else: 

2274 if index < 0: 

2275 index = max(0, collection_length + index) 

2276 l = index 

2277 

2278 if index is None: 

2279 self._collection.append( 

2280 (key, named_column, _ColumnMetrics(self, named_column)) 

2281 ) 

2282 else: 

2283 self._collection.insert( 

2284 index, (key, named_column, _ColumnMetrics(self, named_column)) 

2285 ) 

2286 

2287 self._colset.add(named_column._deannotate()) 

2288 

2289 if index is not None: 

2290 for idx in reversed(range(index, collection_length)): 

2291 self._index[idx + 1] = self._index[idx] 

2292 

2293 self._index[l] = (key, named_column) 

2294 self._index[key] = (key, named_column) 

2295 

2296 def _populate_separate_keys( 

2297 self, iter_: Iterable[Tuple[str, _NAMEDCOL]] 

2298 ) -> None: 

2299 """populate from an iterator of (key, column)""" 

2300 cols = list(iter_) 

2301 

2302 replace_col = [] 

2303 for k, col in cols: 

2304 if col.key != k: 

2305 raise exc.ArgumentError( 

2306 "DedupeColumnCollection requires columns be under " 

2307 "the same key as their .key" 

2308 ) 

2309 if col.name in self._index and col.key != col.name: 

2310 replace_col.append(col) 

2311 elif col.key in self._index: 

2312 replace_col.append(col) 

2313 else: 

2314 self._index[k] = (k, col) 

2315 self._collection.append((k, col, _ColumnMetrics(self, col))) 

2316 self._colset.update(c._deannotate() for (k, c, _) in self._collection) 

2317 

2318 self._index.update( 

2319 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection) 

2320 ) 

2321 for col in replace_col: 

2322 self.replace(col) 

2323 

2324 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None: 

2325 self._populate_separate_keys((col.key, col) for col in iter_) 

2326 

2327 def remove(self, column: _NAMEDCOL) -> None: # type: ignore[override] 

2328 if column not in self._colset: 

2329 raise ValueError( 

2330 "Can't remove column %r; column is not in this collection" 

2331 % column 

2332 ) 

2333 del self._index[column.key] 

2334 self._colset.remove(column) 

2335 self._collection[:] = [ 

2336 (k, c, metrics) 

2337 for (k, c, metrics) in self._collection 

2338 if c is not column 

2339 ] 

2340 for metrics in self._proxy_index.get(column, ()): 

2341 metrics.dispose(self) 

2342 

2343 self._index.update( 

2344 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2345 ) 

2346 # delete higher index 

2347 del self._index[len(self._collection)] 

2348 

2349 def replace( 

2350 self, 

2351 column: _NAMEDCOL, 

2352 *, 

2353 extra_remove: Optional[Iterable[_NAMEDCOL]] = None, 

2354 index: Optional[int] = None, 

2355 ) -> None: 

2356 """add the given column to this collection, removing unaliased 

2357 versions of this column as well as existing columns with the 

2358 same key. 

2359 

2360 e.g.:: 

2361 

2362 t = Table("sometable", metadata, Column("col1", Integer)) 

2363 t.columns.replace(Column("col1", Integer, key="columnone")) 

2364 

2365 will remove the original 'col1' from the collection, and add 

2366 the new column under the name 'columnname'. 

2367 

2368 Used by schema.Column to override columns during table reflection. 

2369 

2370 """ 

2371 

2372 if extra_remove: 

2373 remove_col = set(extra_remove) 

2374 else: 

2375 remove_col = set() 

2376 # remove up to two columns based on matches of name as well as key 

2377 if column.name in self._index and column.key != column.name: 

2378 other = self._index[column.name][1] 

2379 if other.name == other.key: 

2380 remove_col.add(other) 

2381 

2382 if column.key in self._index: 

2383 remove_col.add(self._index[column.key][1]) 

2384 

2385 if not remove_col: 

2386 self._append_new_column(column.key, column, index=index) 

2387 return 

2388 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = [] 

2389 replace_index = None 

2390 

2391 for idx, (k, col, metrics) in enumerate(self._collection): 

2392 if col in remove_col: 

2393 if replace_index is None: 

2394 replace_index = idx 

2395 new_cols.append( 

2396 (column.key, column, _ColumnMetrics(self, column)) 

2397 ) 

2398 else: 

2399 new_cols.append((k, col, metrics)) 

2400 

2401 if remove_col: 

2402 self._colset.difference_update(remove_col) 

2403 

2404 for rc in remove_col: 

2405 for metrics in self._proxy_index.get(rc, ()): 

2406 metrics.dispose(self) 

2407 

2408 if replace_index is None: 

2409 if index is not None: 

2410 new_cols.insert( 

2411 index, (column.key, column, _ColumnMetrics(self, column)) 

2412 ) 

2413 

2414 else: 

2415 new_cols.append( 

2416 (column.key, column, _ColumnMetrics(self, column)) 

2417 ) 

2418 elif index is not None: 

2419 to_move = new_cols[replace_index] 

2420 effective_positive_index = ( 

2421 index if index >= 0 else max(0, len(new_cols) + index) 

2422 ) 

2423 new_cols.insert(index, to_move) 

2424 if replace_index > effective_positive_index: 

2425 del new_cols[replace_index + 1] 

2426 else: 

2427 del new_cols[replace_index] 

2428 

2429 self._colset.add(column._deannotate()) 

2430 self._collection[:] = new_cols 

2431 

2432 self._index.clear() 

2433 

2434 self._index.update( 

2435 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2436 ) 

2437 self._index.update({k: (k, col) for (k, col, _) in self._collection}) 

2438 

2439 

2440class ReadOnlyColumnCollection( 

2441 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co] 

2442): 

2443 __slots__ = ("_parent",) 

2444 

2445 def __init__(self, collection: ColumnCollection[_COLKEY, _COL_co]): 

2446 object.__setattr__(self, "_parent", collection) 

2447 object.__setattr__(self, "_colset", collection._colset) 

2448 object.__setattr__(self, "_index", collection._index) 

2449 object.__setattr__(self, "_collection", collection._collection) 

2450 object.__setattr__(self, "_proxy_index", collection._proxy_index) 

2451 

2452 def __getstate__(self) -> Dict[str, _COL_co]: 

2453 return {"_parent": self._parent} 

2454 

2455 def __setstate__(self, state: Dict[str, Any]) -> None: 

2456 parent = state["_parent"] 

2457 self.__init__(parent) # type: ignore 

2458 

2459 def add(self, column: Any, key: Any = ...) -> Any: 

2460 self._readonly() 

2461 

2462 def extend(self, elements: Any) -> NoReturn: 

2463 self._readonly() 

2464 

2465 def remove(self, item: Any) -> NoReturn: 

2466 self._readonly() 

2467 

2468 

2469class ColumnSet(util.OrderedSet["ColumnClause[Any]"]): 

2470 def contains_column(self, col: ColumnClause[Any]) -> bool: 

2471 return col in self 

2472 

2473 def extend(self, cols: Iterable[Any]) -> None: 

2474 for col in cols: 

2475 self.add(col) 

2476 

2477 def __eq__(self, other): 

2478 l = [] 

2479 for c in other: 

2480 for local in self: 

2481 if c.shares_lineage(local): 

2482 l.append(c == local) 

2483 return elements.and_(*l) 

2484 

2485 def __hash__(self) -> int: # type: ignore[override] 

2486 return hash(tuple(x for x in self)) 

2487 

2488 

2489def _entity_namespace( 

2490 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2491) -> _EntityNamespace: 

2492 """Return the nearest .entity_namespace for the given entity. 

2493 

2494 If not immediately available, does an iterate to find a sub-element 

2495 that has one, if any. 

2496 

2497 """ 

2498 try: 

2499 return cast(_HasEntityNamespace, entity).entity_namespace 

2500 except AttributeError: 

2501 for elem in visitors.iterate(cast(ExternallyTraversible, entity)): 

2502 if _is_has_entity_namespace(elem): 

2503 return elem.entity_namespace 

2504 else: 

2505 raise 

2506 

2507 

2508@overload 

2509def _entity_namespace_key( 

2510 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2511 key: str, 

2512) -> SQLCoreOperations[Any]: ... 

2513 

2514 

2515@overload 

2516def _entity_namespace_key( 

2517 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2518 key: str, 

2519 default: _NoArg, 

2520) -> SQLCoreOperations[Any]: ... 

2521 

2522 

2523@overload 

2524def _entity_namespace_key( 

2525 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2526 key: str, 

2527 default: _T, 

2528) -> Union[SQLCoreOperations[Any], _T]: ... 

2529 

2530 

2531def _entity_namespace_key( 

2532 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2533 key: str, 

2534 default: Union[SQLCoreOperations[Any], _T, _NoArg] = NO_ARG, 

2535) -> Union[SQLCoreOperations[Any], _T]: 

2536 """Return an entry from an entity_namespace. 

2537 

2538 

2539 Raises :class:`_exc.InvalidRequestError` rather than attribute error 

2540 on not found. 

2541 

2542 """ 

2543 

2544 try: 

2545 ns = _entity_namespace(entity) 

2546 if default is not NO_ARG: 

2547 return getattr(ns, key, default) 

2548 else: 

2549 return getattr(ns, key) # type: ignore 

2550 except AttributeError as err: 

2551 raise exc.InvalidRequestError( 

2552 'Entity namespace for "%s" has no property "%s"' % (entity, key) 

2553 ) from err