Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/base.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

957 statements  

1# sql/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Foundational utilities common to many sql modules.""" 

10 

11 

12from __future__ import annotations 

13 

14import collections 

15from enum import Enum 

16import itertools 

17from itertools import zip_longest 

18import operator 

19import re 

20from typing import Any 

21from typing import Callable 

22from typing import cast 

23from typing import Collection 

24from typing import Dict 

25from typing import Final 

26from typing import FrozenSet 

27from typing import Generator 

28from typing import Generic 

29from typing import Iterable 

30from typing import Iterator 

31from typing import List 

32from typing import Mapping 

33from typing import MutableMapping 

34from typing import NamedTuple 

35from typing import NoReturn 

36from typing import Optional 

37from typing import overload 

38from typing import Protocol 

39from typing import Sequence 

40from typing import Set 

41from typing import Tuple 

42from typing import Type 

43from typing import TYPE_CHECKING 

44from typing import TypeGuard 

45from typing import TypeVar 

46from typing import Union 

47 

48from . import roles 

49from . import visitors 

50from .cache_key import HasCacheKey # noqa 

51from .cache_key import MemoizedHasCacheKey # noqa 

52from .traversals import HasCopyInternals # noqa 

53from .visitors import ClauseVisitor 

54from .visitors import ExtendedInternalTraversal 

55from .visitors import ExternallyTraversible 

56from .visitors import InternalTraversal 

57from .. import event 

58from .. import exc 

59from .. import util 

60from ..util import EMPTY_DICT 

61from ..util import HasMemoized as HasMemoized 

62from ..util import hybridmethod 

63from ..util.typing import Self 

64from ..util.typing import TypeVarTuple 

65from ..util.typing import Unpack 

66 

67if TYPE_CHECKING: 

68 from . import coercions 

69 from . import elements 

70 from . import type_api 

71 from ._orm_types import DMLStrategyArgument 

72 from ._orm_types import SynchronizeSessionArgument 

73 from ._typing import _CLE 

74 from .cache_key import CacheKey 

75 from .compiler import SQLCompiler 

76 from .dml import Delete 

77 from .dml import Insert 

78 from .dml import Update 

79 from .elements import BindParameter 

80 from .elements import ClauseElement 

81 from .elements import ClauseList 

82 from .elements import ColumnClause # noqa 

83 from .elements import ColumnElement 

84 from .elements import NamedColumn 

85 from .elements import SQLCoreOperations 

86 from .elements import TextClause 

87 from .schema import Column 

88 from .schema import DefaultGenerator 

89 from .selectable import _JoinTargetElement 

90 from .selectable import _SelectIterable 

91 from .selectable import FromClause 

92 from .selectable import Select 

93 from .visitors import anon_map 

94 from ..engine import Connection 

95 from ..engine import CursorResult 

96 from ..engine.interfaces import _CoreMultiExecuteParams 

97 from ..engine.interfaces import _CoreSingleExecuteParams 

98 from ..engine.interfaces import _ExecuteOptions 

99 from ..engine.interfaces import _ImmutableExecuteOptions 

100 from ..engine.interfaces import CacheStats 

101 from ..engine.interfaces import Compiled 

102 from ..engine.interfaces import CompiledCacheType 

103 from ..engine.interfaces import CoreExecuteOptionsParameter 

104 from ..engine.interfaces import Dialect 

105 from ..engine.interfaces import IsolationLevel 

106 from ..engine.interfaces import SchemaTranslateMapType 

107 from ..event import dispatcher 

108 

109if not TYPE_CHECKING: 

110 coercions = None # noqa 

111 elements = None # noqa 

112 type_api = None # noqa 

113 

114 

115_Ts = TypeVarTuple("_Ts") 

116 

117 

118class _NoArg(Enum): 

119 NO_ARG = 0 

120 

121 def __repr__(self): 

122 return f"_NoArg.{self.name}" 

123 

124 

125NO_ARG: Final = _NoArg.NO_ARG 

126 

127 

128class _NoneName(Enum): 

129 NONE_NAME = 0 

130 """indicate a 'deferred' name that was ultimately the value None.""" 

131 

132 

133_NONE_NAME: Final = _NoneName.NONE_NAME 

134 

135_T = TypeVar("_T", bound=Any) 

136 

137_Fn = TypeVar("_Fn", bound=Callable[..., Any]) 

138 

139_AmbiguousTableNameMap = MutableMapping[str, str] 

140 

141 

142class _DefaultDescriptionTuple(NamedTuple): 

143 arg: Any 

144 is_scalar: Optional[bool] 

145 is_callable: Optional[bool] 

146 is_sentinel: Optional[bool] 

147 

148 @classmethod 

149 def _from_column_default( 

150 cls, default: Optional[DefaultGenerator] 

151 ) -> _DefaultDescriptionTuple: 

152 return ( 

153 _DefaultDescriptionTuple( 

154 default.arg, # type: ignore 

155 default.is_scalar, 

156 default.is_callable, 

157 default.is_sentinel, 

158 ) 

159 if default 

160 and ( 

161 default.has_arg 

162 or (not default.for_update and default.is_sentinel) 

163 ) 

164 else _DefaultDescriptionTuple(None, None, None, None) 

165 ) 

166 

167 

168_never_select_column: operator.attrgetter[Any] = operator.attrgetter( 

169 "_omit_from_statements" 

170) 

171 

172 

173class _EntityNamespace(Protocol): 

174 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ... 

175 

176 

177class _HasEntityNamespace(Protocol): 

178 @util.ro_non_memoized_property 

179 def entity_namespace(self) -> _EntityNamespace: ... 

180 

181 

182def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]: 

183 return hasattr(element, "entity_namespace") 

184 

185 

186# Remove when https://github.com/python/mypy/issues/14640 will be fixed 

187_Self = TypeVar("_Self", bound=Any) 

188 

189 

190class Immutable: 

191 """mark a ClauseElement as 'immutable' when expressions are cloned. 

192 

193 "immutable" objects refers to the "mutability" of an object in the 

194 context of SQL DQL and DML generation. Such as, in DQL, one can 

195 compose a SELECT or subquery of varied forms, but one cannot modify 

196 the structure of a specific table or column within DQL. 

197 :class:`.Immutable` is mostly intended to follow this concept, and as 

198 such the primary "immutable" objects are :class:`.ColumnClause`, 

199 :class:`.Column`, :class:`.TableClause`, :class:`.Table`. 

200 

201 """ 

202 

203 __slots__ = () 

204 

205 _is_immutable: bool = True 

206 

207 def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn: 

208 raise NotImplementedError("Immutable objects do not support copying") 

209 

210 def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn: 

211 raise NotImplementedError("Immutable objects do not support copying") 

212 

213 def _clone(self: _Self, **kw: Any) -> _Self: 

214 return self 

215 

216 def _copy_internals( 

217 self, *, omit_attrs: Iterable[str] = (), **kw: Any 

218 ) -> None: 

219 pass 

220 

221 

222class SingletonConstant(Immutable): 

223 """Represent SQL constants like NULL, TRUE, FALSE""" 

224 

225 _is_singleton_constant: bool = True 

226 

227 _singleton: SingletonConstant 

228 

229 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T: 

230 return cast(_T, cls._singleton) 

231 

232 @util.non_memoized_property 

233 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]: 

234 raise NotImplementedError() 

235 

236 @classmethod 

237 def _create_singleton(cls) -> None: 

238 obj = object.__new__(cls) 

239 obj.__init__() # type: ignore 

240 

241 # for a long time this was an empty frozenset, meaning 

242 # a SingletonConstant would never be a "corresponding column" in 

243 # a statement. This referred to #6259. However, in #7154 we see 

244 # that we do in fact need "correspondence" to work when matching cols 

245 # in result sets, so the non-correspondence was moved to a more 

246 # specific level when we are actually adapting expressions for SQL 

247 # render only. 

248 obj.proxy_set = frozenset([obj]) 

249 cls._singleton = obj 

250 

251 

252def _from_objects( 

253 *elements: Union[ 

254 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement 

255 ] 

256) -> Iterator[FromClause]: 

257 return itertools.chain.from_iterable( 

258 [element._from_objects for element in elements] 

259 ) 

260 

261 

262def _select_iterables( 

263 elements: Iterable[roles.ColumnsClauseRole], 

264) -> _SelectIterable: 

265 """expand tables into individual columns in the 

266 given list of column expressions. 

267 

268 """ 

269 return itertools.chain.from_iterable( 

270 [c._select_iterable for c in elements] 

271 ) 

272 

273 

274_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType") 

275 

276 

277class _GenerativeType(Protocol): 

278 def _generate(self) -> Self: ... 

279 

280 

281def _generative(fn: _Fn) -> _Fn: 

282 """non-caching _generative() decorator. 

283 

284 This is basically the legacy decorator that copies the object and 

285 runs a method on the new copy. 

286 

287 """ 

288 

289 @util.decorator 

290 def _generative( 

291 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any 

292 ) -> _SelfGenerativeType: 

293 """Mark a method as generative.""" 

294 

295 self = self._generate() 

296 x = fn(self, *args, **kw) 

297 assert x is self, "generative methods must return self" 

298 return self 

299 

300 decorated = _generative(fn) 

301 decorated.non_generative = fn # type: ignore 

302 return decorated 

303 

304 

305def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]: 

306 msgs: Dict[str, str] = kw.pop("msgs", {}) 

307 

308 defaults: Dict[str, str] = kw.pop("defaults", {}) 

309 

310 getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [ 

311 (name, operator.attrgetter(name), defaults.get(name, None)) 

312 for name in names 

313 ] 

314 

315 @util.decorator 

316 def check(fn: _Fn, *args: Any, **kw: Any) -> Any: 

317 # make pylance happy by not including "self" in the argument 

318 # list 

319 self = args[0] 

320 args = args[1:] 

321 for name, getter, default_ in getters: 

322 if getter(self) is not default_: 

323 msg = msgs.get( 

324 name, 

325 "Method %s() has already been invoked on this %s construct" 

326 % (fn.__name__, self.__class__), 

327 ) 

328 raise exc.InvalidRequestError(msg) 

329 return fn(self, *args, **kw) 

330 

331 return check 

332 

333 

334def _clone(element, **kw): 

335 return element._clone(**kw) 

336 

337 

338def _expand_cloned( 

339 elements: Iterable[_CLE], 

340) -> Iterable[_CLE]: 

341 """expand the given set of ClauseElements to be the set of all 'cloned' 

342 predecessors. 

343 

344 """ 

345 # TODO: cython candidate 

346 return itertools.chain(*[x._cloned_set for x in elements]) 

347 

348 

349def _de_clone( 

350 elements: Iterable[_CLE], 

351) -> Iterable[_CLE]: 

352 for x in elements: 

353 while x._is_clone_of is not None: 

354 x = x._is_clone_of 

355 yield x 

356 

357 

358def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

359 """return the intersection of sets a and b, counting 

360 any overlap between 'cloned' predecessors. 

361 

362 The returned set is in terms of the entities present within 'a'. 

363 

364 """ 

365 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection( 

366 _expand_cloned(b) 

367 ) 

368 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)} 

369 

370 

371def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

372 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection( 

373 _expand_cloned(b) 

374 ) 

375 return { 

376 elem for elem in a if not all_overlap.intersection(elem._cloned_set) 

377 } 

378 

379 

380class _DialectArgView(MutableMapping[str, Any]): 

381 """A dictionary view of dialect-level arguments in the form 

382 <dialectname>_<argument_name>. 

383 

384 """ 

385 

386 __slots__ = ("obj",) 

387 

388 def __init__(self, obj: DialectKWArgs) -> None: 

389 self.obj = obj 

390 

391 def _key(self, key: str) -> Tuple[str, str]: 

392 try: 

393 dialect, value_key = key.split("_", 1) 

394 except ValueError as err: 

395 raise KeyError(key) from err 

396 else: 

397 return dialect, value_key 

398 

399 def __getitem__(self, key: str) -> Any: 

400 dialect, value_key = self._key(key) 

401 

402 try: 

403 opt = self.obj.dialect_options[dialect] 

404 except exc.NoSuchModuleError as err: 

405 raise KeyError(key) from err 

406 else: 

407 return opt[value_key] 

408 

409 def __setitem__(self, key: str, value: Any) -> None: 

410 try: 

411 dialect, value_key = self._key(key) 

412 except KeyError as err: 

413 raise exc.ArgumentError( 

414 "Keys must be of the form <dialectname>_<argname>" 

415 ) from err 

416 else: 

417 self.obj.dialect_options[dialect][value_key] = value 

418 

419 def __delitem__(self, key: str) -> None: 

420 dialect, value_key = self._key(key) 

421 del self.obj.dialect_options[dialect][value_key] 

422 

423 def __len__(self) -> int: 

424 return sum( 

425 len(args._non_defaults) 

426 for args in self.obj.dialect_options.values() 

427 ) 

428 

429 def __iter__(self) -> Generator[str, None, None]: 

430 return ( 

431 "%s_%s" % (dialect_name, value_name) 

432 for dialect_name in self.obj.dialect_options 

433 for value_name in self.obj.dialect_options[ 

434 dialect_name 

435 ]._non_defaults 

436 ) 

437 

438 

439class _DialectArgDict(MutableMapping[str, Any]): 

440 """A dictionary view of dialect-level arguments for a specific 

441 dialect. 

442 

443 Maintains a separate collection of user-specified arguments 

444 and dialect-specified default arguments. 

445 

446 """ 

447 

448 def __init__(self) -> None: 

449 self._non_defaults: Dict[str, Any] = {} 

450 self._defaults: Dict[str, Any] = {} 

451 

452 def __len__(self) -> int: 

453 return len(set(self._non_defaults).union(self._defaults)) 

454 

455 def __iter__(self) -> Iterator[str]: 

456 return iter(set(self._non_defaults).union(self._defaults)) 

457 

458 def __getitem__(self, key: str) -> Any: 

459 if key in self._non_defaults: 

460 return self._non_defaults[key] 

461 else: 

462 return self._defaults[key] 

463 

464 def __setitem__(self, key: str, value: Any) -> None: 

465 self._non_defaults[key] = value 

466 

467 def __delitem__(self, key: str) -> None: 

468 del self._non_defaults[key] 

469 

470 

471@util.preload_module("sqlalchemy.dialects") 

472def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]: 

473 dialect_cls = util.preloaded.dialects.registry.load(dialect_name) 

474 if dialect_cls.construct_arguments is None: 

475 return None 

476 return dict(dialect_cls.construct_arguments) 

477 

478 

479class DialectKWArgs: 

480 """Establish the ability for a class to have dialect-specific arguments 

481 with defaults and constructor validation. 

482 

483 The :class:`.DialectKWArgs` interacts with the 

484 :attr:`.DefaultDialect.construct_arguments` present on a dialect. 

485 

486 .. seealso:: 

487 

488 :attr:`.DefaultDialect.construct_arguments` 

489 

490 """ 

491 

492 __slots__ = () 

493 

494 _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [ 

495 ("dialect_options", InternalTraversal.dp_dialect_options) 

496 ] 

497 

498 @classmethod 

499 def argument_for( 

500 cls, dialect_name: str, argument_name: str, default: Any 

501 ) -> None: 

502 """Add a new kind of dialect-specific keyword argument for this class. 

503 

504 E.g.:: 

505 

506 Index.argument_for("mydialect", "length", None) 

507 

508 some_index = Index("a", "b", mydialect_length=5) 

509 

510 The :meth:`.DialectKWArgs.argument_for` method is a per-argument 

511 way adding extra arguments to the 

512 :attr:`.DefaultDialect.construct_arguments` dictionary. This 

513 dictionary provides a list of argument names accepted by various 

514 schema-level constructs on behalf of a dialect. 

515 

516 New dialects should typically specify this dictionary all at once as a 

517 data member of the dialect class. The use case for ad-hoc addition of 

518 argument names is typically for end-user code that is also using 

519 a custom compilation scheme which consumes the additional arguments. 

520 

521 :param dialect_name: name of a dialect. The dialect must be 

522 locatable, else a :class:`.NoSuchModuleError` is raised. The 

523 dialect must also include an existing 

524 :attr:`.DefaultDialect.construct_arguments` collection, indicating 

525 that it participates in the keyword-argument validation and default 

526 system, else :class:`.ArgumentError` is raised. If the dialect does 

527 not include this collection, then any keyword argument can be 

528 specified on behalf of this dialect already. All dialects packaged 

529 within SQLAlchemy include this collection, however for third party 

530 dialects, support may vary. 

531 

532 :param argument_name: name of the parameter. 

533 

534 :param default: default value of the parameter. 

535 

536 """ 

537 

538 construct_arg_dictionary: Optional[Dict[Any, Any]] = ( 

539 DialectKWArgs._kw_registry[dialect_name] 

540 ) 

541 if construct_arg_dictionary is None: 

542 raise exc.ArgumentError( 

543 "Dialect '%s' does have keyword-argument " 

544 "validation and defaults enabled configured" % dialect_name 

545 ) 

546 if cls not in construct_arg_dictionary: 

547 construct_arg_dictionary[cls] = {} 

548 construct_arg_dictionary[cls][argument_name] = default 

549 

550 @property 

551 def dialect_kwargs(self) -> _DialectArgView: 

552 """A collection of keyword arguments specified as dialect-specific 

553 options to this construct. 

554 

555 The arguments are present here in their original ``<dialect>_<kwarg>`` 

556 format. Only arguments that were actually passed are included; 

557 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which 

558 contains all options known by this dialect including defaults. 

559 

560 The collection is also writable; keys are accepted of the 

561 form ``<dialect>_<kwarg>`` where the value will be assembled 

562 into the list of options. 

563 

564 .. seealso:: 

565 

566 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form 

567 

568 """ 

569 return _DialectArgView(self) 

570 

571 @property 

572 def kwargs(self) -> _DialectArgView: 

573 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`.""" 

574 return self.dialect_kwargs 

575 

576 _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = ( 

577 util.PopulateDict(_kw_reg_for_dialect) 

578 ) 

579 

580 @classmethod 

581 def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict: 

582 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name] 

583 d = _DialectArgDict() 

584 

585 if construct_arg_dictionary is None: 

586 d._defaults.update({"*": None}) 

587 else: 

588 for cls in reversed(cls.__mro__): 

589 if cls in construct_arg_dictionary: 

590 d._defaults.update(construct_arg_dictionary[cls]) 

591 return d 

592 

593 @util.memoized_property 

594 def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]: 

595 """A collection of keyword arguments specified as dialect-specific 

596 options to this construct. 

597 

598 This is a two-level nested registry, keyed to ``<dialect_name>`` 

599 and ``<argument_name>``. For example, the ``postgresql_where`` 

600 argument would be locatable as:: 

601 

602 arg = my_object.dialect_options["postgresql"]["where"] 

603 

604 .. versionadded:: 0.9.2 

605 

606 .. seealso:: 

607 

608 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form 

609 

610 """ 

611 

612 return util.PopulateDict(self._kw_reg_for_dialect_cls) 

613 

614 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None: 

615 # validate remaining kwargs that they all specify DB prefixes 

616 

617 if not kwargs: 

618 return 

619 

620 for k in kwargs: 

621 m = re.match("^(.+?)_(.+)$", k) 

622 if not m: 

623 raise TypeError( 

624 "Additional arguments should be " 

625 "named <dialectname>_<argument>, got '%s'" % k 

626 ) 

627 dialect_name, arg_name = m.group(1, 2) 

628 

629 try: 

630 construct_arg_dictionary = self.dialect_options[dialect_name] 

631 except exc.NoSuchModuleError: 

632 util.warn( 

633 "Can't validate argument %r; can't " 

634 "locate any SQLAlchemy dialect named %r" 

635 % (k, dialect_name) 

636 ) 

637 self.dialect_options[dialect_name] = d = _DialectArgDict() 

638 d._defaults.update({"*": None}) 

639 d._non_defaults[arg_name] = kwargs[k] 

640 else: 

641 if ( 

642 "*" not in construct_arg_dictionary 

643 and arg_name not in construct_arg_dictionary 

644 ): 

645 raise exc.ArgumentError( 

646 "Argument %r is not accepted by " 

647 "dialect %r on behalf of %r" 

648 % (k, dialect_name, self.__class__) 

649 ) 

650 else: 

651 construct_arg_dictionary[arg_name] = kwargs[k] 

652 

653 

654class CompileState: 

655 """Produces additional object state necessary for a statement to be 

656 compiled. 

657 

658 the :class:`.CompileState` class is at the base of classes that assemble 

659 state for a particular statement object that is then used by the 

660 compiler. This process is essentially an extension of the process that 

661 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis 

662 on converting raw user intent into more organized structures rather than 

663 producing string output. The top-level :class:`.CompileState` for the 

664 statement being executed is also accessible when the execution context 

665 works with invoking the statement and collecting results. 

666 

667 The production of :class:`.CompileState` is specific to the compiler, such 

668 as within the :meth:`.SQLCompiler.visit_insert`, 

669 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also 

670 responsible for associating the :class:`.CompileState` with the 

671 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement, 

672 i.e. the outermost SQL statement that's actually being executed. 

673 There can be other :class:`.CompileState` objects that are not the 

674 toplevel, such as when a SELECT subquery or CTE-nested 

675 INSERT/UPDATE/DELETE is generated. 

676 

677 .. versionadded:: 1.4 

678 

679 """ 

680 

681 __slots__ = ("statement", "_ambiguous_table_name_map") 

682 

683 plugins: Dict[Tuple[str, str], Type[CompileState]] = {} 

684 

685 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] 

686 

687 @classmethod 

688 def create_for_statement( 

689 cls, statement: Executable, compiler: SQLCompiler, **kw: Any 

690 ) -> CompileState: 

691 # factory construction. 

692 

693 if statement._propagate_attrs: 

694 plugin_name = statement._propagate_attrs.get( 

695 "compile_state_plugin", "default" 

696 ) 

697 klass = cls.plugins.get( 

698 (plugin_name, statement._effective_plugin_target), None 

699 ) 

700 if klass is None: 

701 klass = cls.plugins[ 

702 ("default", statement._effective_plugin_target) 

703 ] 

704 

705 else: 

706 klass = cls.plugins[ 

707 ("default", statement._effective_plugin_target) 

708 ] 

709 

710 if klass is cls: 

711 return cls(statement, compiler, **kw) 

712 else: 

713 return klass.create_for_statement(statement, compiler, **kw) 

714 

715 def __init__(self, statement, compiler, **kw): 

716 self.statement = statement 

717 

718 @classmethod 

719 def get_plugin_class( 

720 cls, statement: Executable 

721 ) -> Optional[Type[CompileState]]: 

722 plugin_name = statement._propagate_attrs.get( 

723 "compile_state_plugin", None 

724 ) 

725 

726 if plugin_name: 

727 key = (plugin_name, statement._effective_plugin_target) 

728 if key in cls.plugins: 

729 return cls.plugins[key] 

730 

731 # there's no case where we call upon get_plugin_class() and want 

732 # to get None back, there should always be a default. return that 

733 # if there was no plugin-specific class (e.g. Insert with "orm" 

734 # plugin) 

735 try: 

736 return cls.plugins[("default", statement._effective_plugin_target)] 

737 except KeyError: 

738 return None 

739 

740 @classmethod 

741 def _get_plugin_class_for_plugin( 

742 cls, statement: Executable, plugin_name: str 

743 ) -> Optional[Type[CompileState]]: 

744 try: 

745 return cls.plugins[ 

746 (plugin_name, statement._effective_plugin_target) 

747 ] 

748 except KeyError: 

749 return None 

750 

751 @classmethod 

752 def plugin_for( 

753 cls, plugin_name: str, visit_name: str 

754 ) -> Callable[[_Fn], _Fn]: 

755 def decorate(cls_to_decorate): 

756 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate 

757 return cls_to_decorate 

758 

759 return decorate 

760 

761 

762class Generative(HasMemoized): 

763 """Provide a method-chaining pattern in conjunction with the 

764 @_generative decorator.""" 

765 

766 def _generate(self) -> Self: 

767 skip = self._memoized_keys 

768 cls = self.__class__ 

769 s = cls.__new__(cls) 

770 if skip: 

771 # ensure this iteration remains atomic 

772 s.__dict__ = { 

773 k: v for k, v in self.__dict__.copy().items() if k not in skip 

774 } 

775 else: 

776 s.__dict__ = self.__dict__.copy() 

777 return s 

778 

779 

780class InPlaceGenerative(HasMemoized): 

781 """Provide a method-chaining pattern in conjunction with the 

782 @_generative decorator that mutates in place.""" 

783 

784 __slots__ = () 

785 

786 def _generate(self) -> Self: 

787 skip = self._memoized_keys 

788 # note __dict__ needs to be in __slots__ if this is used 

789 for k in skip: 

790 self.__dict__.pop(k, None) 

791 return self 

792 

793 

794class HasCompileState(Generative): 

795 """A class that has a :class:`.CompileState` associated with it.""" 

796 

797 _compile_state_plugin: Optional[Type[CompileState]] = None 

798 

799 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT 

800 

801 _compile_state_factory = CompileState.create_for_statement 

802 

803 

804class _MetaOptions(type): 

805 """metaclass for the Options class. 

806 

807 This metaclass is actually necessary despite the availability of the 

808 ``__init_subclass__()`` hook as this type also provides custom class-level 

809 behavior for the ``__add__()`` method. 

810 

811 """ 

812 

813 _cache_attrs: Tuple[str, ...] 

814 

815 def __add__(self, other): 

816 o1 = self() 

817 

818 if set(other).difference(self._cache_attrs): 

819 raise TypeError( 

820 "dictionary contains attributes not covered by " 

821 "Options class %s: %r" 

822 % (self, set(other).difference(self._cache_attrs)) 

823 ) 

824 

825 o1.__dict__.update(other) 

826 return o1 

827 

828 if TYPE_CHECKING: 

829 

830 def __getattr__(self, key: str) -> Any: ... 

831 

832 def __setattr__(self, key: str, value: Any) -> None: ... 

833 

834 def __delattr__(self, key: str) -> None: ... 

835 

836 

837class Options(metaclass=_MetaOptions): 

838 """A cacheable option dictionary with defaults.""" 

839 

840 __slots__ = () 

841 

842 _cache_attrs: Tuple[str, ...] 

843 

844 def __init_subclass__(cls) -> None: 

845 dict_ = cls.__dict__ 

846 cls._cache_attrs = tuple( 

847 sorted( 

848 d 

849 for d in dict_ 

850 if not d.startswith("__") 

851 and d not in ("_cache_key_traversal",) 

852 ) 

853 ) 

854 super().__init_subclass__() 

855 

856 def __init__(self, **kw: Any) -> None: 

857 self.__dict__.update(kw) 

858 

859 def __add__(self, other): 

860 o1 = self.__class__.__new__(self.__class__) 

861 o1.__dict__.update(self.__dict__) 

862 

863 if set(other).difference(self._cache_attrs): 

864 raise TypeError( 

865 "dictionary contains attributes not covered by " 

866 "Options class %s: %r" 

867 % (self, set(other).difference(self._cache_attrs)) 

868 ) 

869 

870 o1.__dict__.update(other) 

871 return o1 

872 

873 def __eq__(self, other): 

874 # TODO: very inefficient. This is used only in test suites 

875 # right now. 

876 for a, b in zip_longest(self._cache_attrs, other._cache_attrs): 

877 if getattr(self, a) != getattr(other, b): 

878 return False 

879 return True 

880 

881 def __repr__(self) -> str: 

882 # TODO: fairly inefficient, used only in debugging right now. 

883 

884 return "%s(%s)" % ( 

885 self.__class__.__name__, 

886 ", ".join( 

887 "%s=%r" % (k, self.__dict__[k]) 

888 for k in self._cache_attrs 

889 if k in self.__dict__ 

890 ), 

891 ) 

892 

893 @classmethod 

894 def isinstance(cls, klass: Type[Any]) -> bool: 

895 return issubclass(cls, klass) 

896 

897 @hybridmethod 

898 def add_to_element(self, name: str, value: str) -> Any: 

899 return self + {name: getattr(self, name) + value} 

900 

901 @hybridmethod 

902 def _state_dict_inst(self) -> Mapping[str, Any]: 

903 return self.__dict__ 

904 

905 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT 

906 

907 @_state_dict_inst.classlevel 

908 def _state_dict(cls) -> Mapping[str, Any]: 

909 return cls._state_dict_const 

910 

911 @classmethod 

912 def safe_merge(cls, other: "Options") -> Any: 

913 d = other._state_dict() 

914 

915 # only support a merge with another object of our class 

916 # and which does not have attrs that we don't. otherwise 

917 # we risk having state that might not be part of our cache 

918 # key strategy 

919 

920 if ( 

921 cls is not other.__class__ 

922 and other._cache_attrs 

923 and set(other._cache_attrs).difference(cls._cache_attrs) 

924 ): 

925 raise TypeError( 

926 "other element %r is not empty, is not of type %s, " 

927 "and contains attributes not covered here %r" 

928 % ( 

929 other, 

930 cls, 

931 set(other._cache_attrs).difference(cls._cache_attrs), 

932 ) 

933 ) 

934 return cls + d 

935 

936 @classmethod 

937 def from_execution_options( 

938 cls, 

939 key: str, 

940 attrs: set[str], 

941 exec_options: Mapping[str, Any], 

942 statement_exec_options: Mapping[str, Any], 

943 ) -> Tuple["Options", Mapping[str, Any]]: 

944 """process Options argument in terms of execution options. 

945 

946 

947 e.g.:: 

948 

949 ( 

950 load_options, 

951 execution_options, 

952 ) = QueryContext.default_load_options.from_execution_options( 

953 "_sa_orm_load_options", 

954 {"populate_existing", "autoflush", "yield_per"}, 

955 execution_options, 

956 statement._execution_options, 

957 ) 

958 

959 get back the Options and refresh "_sa_orm_load_options" in the 

960 exec options dict w/ the Options as well 

961 

962 """ 

963 

964 # common case is that no options we are looking for are 

965 # in either dictionary, so cancel for that first 

966 check_argnames = attrs.intersection( 

967 set(exec_options).union(statement_exec_options) 

968 ) 

969 

970 existing_options = exec_options.get(key, cls) 

971 

972 if check_argnames: 

973 result = {} 

974 for argname in check_argnames: 

975 local = "_" + argname 

976 if argname in exec_options: 

977 result[local] = exec_options[argname] 

978 elif argname in statement_exec_options: 

979 result[local] = statement_exec_options[argname] 

980 

981 new_options = existing_options + result 

982 exec_options = util.immutabledict().merge_with( 

983 exec_options, {key: new_options} 

984 ) 

985 return new_options, exec_options 

986 

987 else: 

988 return existing_options, exec_options 

989 

990 if TYPE_CHECKING: 

991 

992 def __getattr__(self, key: str) -> Any: ... 

993 

994 def __setattr__(self, key: str, value: Any) -> None: ... 

995 

996 def __delattr__(self, key: str) -> None: ... 

997 

998 

999class CacheableOptions(Options, HasCacheKey): 

1000 __slots__ = () 

1001 

1002 @hybridmethod 

1003 def _gen_cache_key_inst( 

1004 self, anon_map: Any, bindparams: List[BindParameter[Any]] 

1005 ) -> Optional[Tuple[Any]]: 

1006 return HasCacheKey._gen_cache_key(self, anon_map, bindparams) 

1007 

1008 @_gen_cache_key_inst.classlevel 

1009 def _gen_cache_key( 

1010 cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]] 

1011 ) -> Tuple[CacheableOptions, Any]: 

1012 return (cls, ()) 

1013 

1014 @hybridmethod 

1015 def _generate_cache_key(self) -> Optional[CacheKey]: 

1016 return HasCacheKey._generate_cache_key(self) 

1017 

1018 

1019class ExecutableOption(HasCopyInternals): 

1020 __slots__ = () 

1021 

1022 _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT 

1023 

1024 __visit_name__: str = "executable_option" 

1025 

1026 _is_has_cache_key: bool = False 

1027 

1028 _is_core: bool = True 

1029 

1030 def _clone(self, **kw): 

1031 """Create a shallow copy of this ExecutableOption.""" 

1032 c = self.__class__.__new__(self.__class__) 

1033 c.__dict__ = dict(self.__dict__) # type: ignore 

1034 return c 

1035 

1036 

1037_L = TypeVar("_L", bound=str) 

1038 

1039 

1040class HasSyntaxExtensions(Generic[_L]): 

1041 

1042 _position_map: Mapping[_L, str] 

1043 

1044 @_generative 

1045 def ext(self, extension: SyntaxExtension) -> Self: 

1046 """Applies a SQL syntax extension to this statement. 

1047 

1048 SQL syntax extensions are :class:`.ClauseElement` objects that define 

1049 some vendor-specific syntactical construct that take place in specific 

1050 parts of a SQL statement. Examples include vendor extensions like 

1051 PostgreSQL / SQLite's "ON DUPLICATE KEY UPDATE", PostgreSQL's 

1052 "DISTINCT ON", and MySQL's "LIMIT" that can be applied to UPDATE 

1053 and DELETE statements. 

1054 

1055 .. seealso:: 

1056 

1057 :ref:`examples_syntax_extensions` 

1058 

1059 :func:`_mysql.limit` - DML LIMIT for MySQL 

1060 

1061 :func:`_postgresql.distinct_on` - DISTINCT ON for PostgreSQL 

1062 

1063 .. versionadded:: 2.1 

1064 

1065 """ 

1066 extension = coercions.expect( 

1067 roles.SyntaxExtensionRole, extension, apply_propagate_attrs=self 

1068 ) 

1069 self._apply_syntax_extension_to_self(extension) 

1070 return self 

1071 

1072 @util.preload_module("sqlalchemy.sql.elements") 

1073 def apply_syntax_extension_point( 

1074 self, 

1075 apply_fn: Callable[[Sequence[ClauseElement]], Sequence[ClauseElement]], 

1076 position: _L, 

1077 ) -> None: 

1078 """Apply a :class:`.SyntaxExtension` to a known extension point. 

1079 

1080 Should be used only internally by :class:`.SyntaxExtension`. 

1081 

1082 E.g.:: 

1083 

1084 class Qualify(SyntaxExtension, ClauseElement): 

1085 

1086 # ... 

1087 

1088 def apply_to_select(self, select_stmt: Select) -> None: 

1089 # append self to existing 

1090 select_stmt.apply_extension_point( 

1091 lambda existing: [*existing, self], "post_criteria" 

1092 ) 

1093 

1094 

1095 class ReplaceExt(SyntaxExtension, ClauseElement): 

1096 

1097 # ... 

1098 

1099 def apply_to_select(self, select_stmt: Select) -> None: 

1100 # replace any existing elements regardless of type 

1101 select_stmt.apply_extension_point( 

1102 lambda existing: [self], "post_criteria" 

1103 ) 

1104 

1105 

1106 class ReplaceOfTypeExt(SyntaxExtension, ClauseElement): 

1107 

1108 # ... 

1109 

1110 def apply_to_select(self, select_stmt: Select) -> None: 

1111 # replace any existing elements of the same type 

1112 select_stmt.apply_extension_point( 

1113 self.append_replacing_same_type, "post_criteria" 

1114 ) 

1115 

1116 :param apply_fn: callable function that will receive a sequence of 

1117 :class:`.ClauseElement` that is already populating the extension 

1118 point (the sequence is empty if there isn't one), and should return 

1119 a new sequence of :class:`.ClauseElement` that will newly populate 

1120 that point. The function typically can choose to concatenate the 

1121 existing values with the new one, or to replace the values that are 

1122 there with a new one by returning a list of a single element, or 

1123 to perform more complex operations like removing only the same 

1124 type element from the input list of merging already existing elements 

1125 of the same type. Some examples are shown in the examples above 

1126 :param position: string name of the position to apply to. This 

1127 varies per statement type. IDEs should show the possible values 

1128 for each statement type as it's typed with a ``typing.Literal`` per 

1129 statement. 

1130 

1131 .. seealso:: 

1132 

1133 :ref:`examples_syntax_extensions` 

1134 

1135 

1136 """ # noqa: E501 

1137 

1138 try: 

1139 attrname = self._position_map[position] 

1140 except KeyError as ke: 

1141 raise ValueError( 

1142 f"Unknown position {position!r} for {self.__class__} " 

1143 f"construct; known positions: " 

1144 f"{', '.join(repr(k) for k in self._position_map)}" 

1145 ) from ke 

1146 else: 

1147 ElementList = util.preloaded.sql_elements.ElementList 

1148 existing: Optional[ClauseElement] = getattr(self, attrname, None) 

1149 if existing is None: 

1150 input_seq: Tuple[ClauseElement, ...] = () 

1151 elif isinstance(existing, ElementList): 

1152 input_seq = existing.clauses 

1153 else: 

1154 input_seq = (existing,) 

1155 

1156 new_seq = apply_fn(input_seq) 

1157 assert new_seq, "cannot return empty sequence" 

1158 new = new_seq[0] if len(new_seq) == 1 else ElementList(new_seq) 

1159 setattr(self, attrname, new) 

1160 

1161 def _apply_syntax_extension_to_self( 

1162 self, extension: SyntaxExtension 

1163 ) -> None: 

1164 raise NotImplementedError() 

1165 

1166 def _get_syntax_extensions_as_dict(self) -> Mapping[_L, SyntaxExtension]: 

1167 res: Dict[_L, SyntaxExtension] = {} 

1168 for name, attr in self._position_map.items(): 

1169 value = getattr(self, attr) 

1170 if value is not None: 

1171 res[name] = value 

1172 return res 

1173 

1174 def _set_syntax_extensions(self, **extensions: SyntaxExtension) -> None: 

1175 for name, value in extensions.items(): 

1176 setattr(self, self._position_map[name], value) # type: ignore[index] # noqa: E501 

1177 

1178 

1179class SyntaxExtension(roles.SyntaxExtensionRole): 

1180 """Defines a unit that when also extending from :class:`.ClauseElement` 

1181 can be applied to SQLAlchemy statements :class:`.Select`, 

1182 :class:`_sql.Insert`, :class:`.Update` and :class:`.Delete` making use of 

1183 pre-established SQL insertion points within these constructs. 

1184 

1185 .. versionadded:: 2.1 

1186 

1187 .. seealso:: 

1188 

1189 :ref:`examples_syntax_extensions` 

1190 

1191 """ 

1192 

1193 def append_replacing_same_type( 

1194 self, existing: Sequence[ClauseElement] 

1195 ) -> Sequence[ClauseElement]: 

1196 """Utility function that can be used as 

1197 :paramref:`_sql.HasSyntaxExtensions.apply_extension_point.apply_fn` 

1198 to remove any other element of the same type in existing and appending 

1199 ``self`` to the list. 

1200 

1201 This is equivalent to:: 

1202 

1203 stmt.apply_extension_point( 

1204 lambda existing: [ 

1205 *(e for e in existing if not isinstance(e, ReplaceOfTypeExt)), 

1206 self, 

1207 ], 

1208 "post_criteria", 

1209 ) 

1210 

1211 .. seealso:: 

1212 

1213 :ref:`examples_syntax_extensions` 

1214 

1215 :meth:`_sql.HasSyntaxExtensions.apply_syntax_extension_point` 

1216 

1217 """ # noqa: E501 

1218 cls = type(self) 

1219 return [*(e for e in existing if not isinstance(e, cls)), self] # type: ignore[list-item] # noqa: E501 

1220 

1221 def apply_to_select(self, select_stmt: Select[Unpack[_Ts]]) -> None: 

1222 """Apply this :class:`.SyntaxExtension` to a :class:`.Select`""" 

1223 raise NotImplementedError( 

1224 f"Extension {type(self).__name__} cannot be applied to select" 

1225 ) 

1226 

1227 def apply_to_update(self, update_stmt: Update) -> None: 

1228 """Apply this :class:`.SyntaxExtension` to an :class:`.Update`""" 

1229 raise NotImplementedError( 

1230 f"Extension {type(self).__name__} cannot be applied to update" 

1231 ) 

1232 

1233 def apply_to_delete(self, delete_stmt: Delete) -> None: 

1234 """Apply this :class:`.SyntaxExtension` to a :class:`.Delete`""" 

1235 raise NotImplementedError( 

1236 f"Extension {type(self).__name__} cannot be applied to delete" 

1237 ) 

1238 

1239 def apply_to_insert(self, insert_stmt: Insert) -> None: 

1240 """Apply this :class:`.SyntaxExtension` to an :class:`_sql.Insert`""" 

1241 raise NotImplementedError( 

1242 f"Extension {type(self).__name__} cannot be applied to insert" 

1243 ) 

1244 

1245 

1246class Executable(roles.StatementRole): 

1247 """Mark a :class:`_expression.ClauseElement` as supporting execution. 

1248 

1249 :class:`.Executable` is a superclass for all "statement" types 

1250 of objects, including :func:`select`, :func:`delete`, :func:`update`, 

1251 :func:`insert`, :func:`text`. 

1252 

1253 """ 

1254 

1255 supports_execution: bool = True 

1256 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT 

1257 _is_default_generator: bool = False 

1258 _with_options: Tuple[ExecutableOption, ...] = () 

1259 _compile_state_funcs: Tuple[ 

1260 Tuple[Callable[[CompileState], None], Any], ... 

1261 ] = () 

1262 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]] 

1263 

1264 _executable_traverse_internals = [ 

1265 ("_with_options", InternalTraversal.dp_executable_options), 

1266 ( 

1267 "_compile_state_funcs", 

1268 ExtendedInternalTraversal.dp_compile_state_funcs, 

1269 ), 

1270 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs), 

1271 ] 

1272 

1273 is_select: bool = False 

1274 is_from_statement: bool = False 

1275 is_update: bool = False 

1276 is_insert: bool = False 

1277 is_text: bool = False 

1278 is_delete: bool = False 

1279 is_dml: bool = False 

1280 

1281 if TYPE_CHECKING: 

1282 __visit_name__: str 

1283 

1284 def _compile_w_cache( 

1285 self, 

1286 dialect: Dialect, 

1287 *, 

1288 compiled_cache: Optional[CompiledCacheType], 

1289 column_keys: List[str], 

1290 for_executemany: bool = False, 

1291 schema_translate_map: Optional[SchemaTranslateMapType] = None, 

1292 **kw: Any, 

1293 ) -> tuple[ 

1294 Compiled, 

1295 Sequence[BindParameter[Any]] | None, 

1296 _CoreSingleExecuteParams | None, 

1297 CacheStats, 

1298 ]: ... 

1299 

1300 def _execute_on_connection( 

1301 self, 

1302 connection: Connection, 

1303 distilled_params: _CoreMultiExecuteParams, 

1304 execution_options: CoreExecuteOptionsParameter, 

1305 ) -> CursorResult[Any]: ... 

1306 

1307 def _execute_on_scalar( 

1308 self, 

1309 connection: Connection, 

1310 distilled_params: _CoreMultiExecuteParams, 

1311 execution_options: CoreExecuteOptionsParameter, 

1312 ) -> Any: ... 

1313 

1314 @util.ro_non_memoized_property 

1315 def _all_selected_columns(self) -> _SelectIterable: 

1316 raise NotImplementedError() 

1317 

1318 @property 

1319 def _effective_plugin_target(self) -> str: 

1320 return self.__visit_name__ 

1321 

1322 @_generative 

1323 def options(self, *options: ExecutableOption) -> Self: 

1324 """Apply options to this statement. 

1325 

1326 In the general sense, options are any kind of Python object 

1327 that can be interpreted by systems that consume the statement outside 

1328 of the regular SQL compiler chain. Specifically, these options are 

1329 the ORM level options that apply "eager load" and other loading 

1330 behaviors to an ORM query. 

1331 

1332 For background on specific kinds of options for specific kinds of 

1333 statements, refer to the documentation for those option objects. 

1334 

1335 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to 

1336 Core statement objects towards the goal of allowing unified 

1337 Core / ORM querying capabilities. 

1338 

1339 .. seealso:: 

1340 

1341 :ref:`loading_columns` - refers to options specific to the usage 

1342 of ORM queries 

1343 

1344 :ref:`relationship_loader_options` - refers to options specific 

1345 to the usage of ORM queries 

1346 

1347 """ 

1348 self._with_options += tuple( 

1349 coercions.expect(roles.ExecutableOptionRole, opt) 

1350 for opt in options 

1351 ) 

1352 return self 

1353 

1354 @_generative 

1355 def _set_compile_options(self, compile_options: CacheableOptions) -> Self: 

1356 """Assign the compile options to a new value. 

1357 

1358 :param compile_options: appropriate CacheableOptions structure 

1359 

1360 """ 

1361 

1362 self._compile_options = compile_options 

1363 return self 

1364 

1365 @_generative 

1366 def _update_compile_options(self, options: CacheableOptions) -> Self: 

1367 """update the _compile_options with new keys.""" 

1368 

1369 assert self._compile_options is not None 

1370 self._compile_options += options 

1371 return self 

1372 

1373 @_generative 

1374 def _add_compile_state_func( 

1375 self, 

1376 callable_: Callable[[CompileState], None], 

1377 cache_args: Any, 

1378 ) -> Self: 

1379 """Add a compile state function to this statement. 

1380 

1381 When using the ORM only, these are callable functions that will 

1382 be given the CompileState object upon compilation. 

1383 

1384 A second argument cache_args is required, which will be combined with 

1385 the ``__code__`` identity of the function itself in order to produce a 

1386 cache key. 

1387 

1388 """ 

1389 self._compile_state_funcs += ((callable_, cache_args),) 

1390 return self 

1391 

1392 @overload 

1393 def execution_options( 

1394 self, 

1395 *, 

1396 compiled_cache: Optional[CompiledCacheType] = ..., 

1397 logging_token: str = ..., 

1398 isolation_level: IsolationLevel = ..., 

1399 no_parameters: bool = False, 

1400 stream_results: bool = False, 

1401 max_row_buffer: int = ..., 

1402 yield_per: int = ..., 

1403 driver_column_names: bool = ..., 

1404 insertmanyvalues_page_size: int = ..., 

1405 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

1406 populate_existing: bool = False, 

1407 autoflush: bool = False, 

1408 synchronize_session: SynchronizeSessionArgument = ..., 

1409 dml_strategy: DMLStrategyArgument = ..., 

1410 render_nulls: bool = ..., 

1411 is_delete_using: bool = ..., 

1412 is_update_from: bool = ..., 

1413 preserve_rowcount: bool = False, 

1414 **opt: Any, 

1415 ) -> Self: ... 

1416 

1417 @overload 

1418 def execution_options(self, **opt: Any) -> Self: ... 

1419 

1420 @_generative 

1421 def execution_options(self, **kw: Any) -> Self: 

1422 """Set non-SQL options for the statement which take effect during 

1423 execution. 

1424 

1425 Execution options can be set at many scopes, including per-statement, 

1426 per-connection, or per execution, using methods such as 

1427 :meth:`_engine.Connection.execution_options` and parameters which 

1428 accept a dictionary of options such as 

1429 :paramref:`_engine.Connection.execute.execution_options` and 

1430 :paramref:`_orm.Session.execute.execution_options`. 

1431 

1432 The primary characteristic of an execution option, as opposed to 

1433 other kinds of options such as ORM loader options, is that 

1434 **execution options never affect the compiled SQL of a query, only 

1435 things that affect how the SQL statement itself is invoked or how 

1436 results are fetched**. That is, execution options are not part of 

1437 what's accommodated by SQL compilation nor are they considered part of 

1438 the cached state of a statement. 

1439 

1440 The :meth:`_sql.Executable.execution_options` method is 

1441 :term:`generative`, as 

1442 is the case for the method as applied to the :class:`_engine.Engine` 

1443 and :class:`_orm.Query` objects, which means when the method is called, 

1444 a copy of the object is returned, which applies the given parameters to 

1445 that new copy, but leaves the original unchanged:: 

1446 

1447 statement = select(table.c.x, table.c.y) 

1448 new_statement = statement.execution_options(my_option=True) 

1449 

1450 An exception to this behavior is the :class:`_engine.Connection` 

1451 object, where the :meth:`_engine.Connection.execution_options` method 

1452 is explicitly **not** generative. 

1453 

1454 The kinds of options that may be passed to 

1455 :meth:`_sql.Executable.execution_options` and other related methods and 

1456 parameter dictionaries include parameters that are explicitly consumed 

1457 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not 

1458 defined by SQLAlchemy, which means the methods and/or parameter 

1459 dictionaries may be used for user-defined parameters that interact with 

1460 custom code, which may access the parameters using methods such as 

1461 :meth:`_sql.Executable.get_execution_options` and 

1462 :meth:`_engine.Connection.get_execution_options`, or within selected 

1463 event hooks using a dedicated ``execution_options`` event parameter 

1464 such as 

1465 :paramref:`_events.ConnectionEvents.before_execute.execution_options` 

1466 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.:: 

1467 

1468 from sqlalchemy import event 

1469 

1470 

1471 @event.listens_for(some_engine, "before_execute") 

1472 def _process_opt(conn, statement, multiparams, params, execution_options): 

1473 "run a SQL function before invoking a statement" 

1474 

1475 if execution_options.get("do_special_thing", False): 

1476 conn.exec_driver_sql("run_special_function()") 

1477 

1478 Within the scope of options that are explicitly recognized by 

1479 SQLAlchemy, most apply to specific classes of objects and not others. 

1480 The most common execution options include: 

1481 

1482 * :paramref:`_engine.Connection.execution_options.isolation_level` - 

1483 sets the isolation level for a connection or a class of connections 

1484 via an :class:`_engine.Engine`. This option is accepted only 

1485 by :class:`_engine.Connection` or :class:`_engine.Engine`. 

1486 

1487 * :paramref:`_engine.Connection.execution_options.stream_results` - 

1488 indicates results should be fetched using a server side cursor; 

1489 this option is accepted by :class:`_engine.Connection`, by the 

1490 :paramref:`_engine.Connection.execute.execution_options` parameter 

1491 on :meth:`_engine.Connection.execute`, and additionally by 

1492 :meth:`_sql.Executable.execution_options` on a SQL statement object, 

1493 as well as by ORM constructs like :meth:`_orm.Session.execute`. 

1494 

1495 * :paramref:`_engine.Connection.execution_options.compiled_cache` - 

1496 indicates a dictionary that will serve as the 

1497 :ref:`SQL compilation cache <sql_caching>` 

1498 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as 

1499 well as for ORM methods like :meth:`_orm.Session.execute`. 

1500 Can be passed as ``None`` to disable caching for statements. 

1501 This option is not accepted by 

1502 :meth:`_sql.Executable.execution_options` as it is inadvisable to 

1503 carry along a compilation cache within a statement object. 

1504 

1505 * :paramref:`_engine.Connection.execution_options.schema_translate_map` 

1506 - a mapping of schema names used by the 

1507 :ref:`Schema Translate Map <schema_translating>` feature, accepted 

1508 by :class:`_engine.Connection`, :class:`_engine.Engine`, 

1509 :class:`_sql.Executable`, as well as by ORM constructs 

1510 like :meth:`_orm.Session.execute`. 

1511 

1512 .. seealso:: 

1513 

1514 :meth:`_engine.Connection.execution_options` 

1515 

1516 :paramref:`_engine.Connection.execute.execution_options` 

1517 

1518 :paramref:`_orm.Session.execute.execution_options` 

1519 

1520 :ref:`orm_queryguide_execution_options` - documentation on all 

1521 ORM-specific execution options 

1522 

1523 """ # noqa: E501 

1524 if "isolation_level" in kw: 

1525 raise exc.ArgumentError( 

1526 "'isolation_level' execution option may only be specified " 

1527 "on Connection.execution_options(), or " 

1528 "per-engine using the isolation_level " 

1529 "argument to create_engine()." 

1530 ) 

1531 if "compiled_cache" in kw: 

1532 raise exc.ArgumentError( 

1533 "'compiled_cache' execution option may only be specified " 

1534 "on Connection.execution_options(), not per statement." 

1535 ) 

1536 self._execution_options = self._execution_options.union(kw) 

1537 return self 

1538 

1539 def get_execution_options(self) -> _ExecuteOptions: 

1540 """Get the non-SQL options which will take effect during execution. 

1541 

1542 .. seealso:: 

1543 

1544 :meth:`.Executable.execution_options` 

1545 """ 

1546 return self._execution_options 

1547 

1548 

1549class ExecutableStatement(Executable): 

1550 """Executable subclass that implements a lightweight version of ``params`` 

1551 that avoids a full cloned traverse. 

1552 

1553 .. versionadded:: 2.1 

1554 

1555 """ 

1556 

1557 _params: util.immutabledict[str, Any] = EMPTY_DICT 

1558 

1559 _executable_traverse_internals = ( 

1560 Executable._executable_traverse_internals 

1561 + [("_params", InternalTraversal.dp_params)] 

1562 ) 

1563 

1564 @_generative 

1565 def params( 

1566 self, 

1567 __optionaldict: _CoreSingleExecuteParams | None = None, 

1568 /, 

1569 **kwargs: Any, 

1570 ) -> Self: 

1571 """Return a copy with the provided bindparam values. 

1572 

1573 Returns a copy of this Executable with bindparam values set 

1574 to the given dictionary:: 

1575 

1576 >>> clause = column("x") + bindparam("foo") 

1577 >>> print(clause.compile().params) 

1578 {'foo': None} 

1579 >>> print(clause.params({"foo": 7}).compile().params) 

1580 {'foo': 7} 

1581 

1582 """ 

1583 if __optionaldict: 

1584 kwargs.update(__optionaldict) 

1585 self._params = ( 

1586 util.immutabledict(kwargs) 

1587 if not self._params 

1588 else self._params | kwargs 

1589 ) 

1590 return self 

1591 

1592 

1593class SchemaEventTarget(event.EventTarget): 

1594 """Base class for elements that are the targets of :class:`.DDLEvents` 

1595 events. 

1596 

1597 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`. 

1598 

1599 """ 

1600 

1601 dispatch: dispatcher[SchemaEventTarget] 

1602 

1603 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: 

1604 """Associate with this SchemaEvent's parent object.""" 

1605 

1606 def _set_parent_with_dispatch( 

1607 self, parent: SchemaEventTarget, **kw: Any 

1608 ) -> None: 

1609 self.dispatch.before_parent_attach(self, parent) 

1610 self._set_parent(parent, **kw) 

1611 self.dispatch.after_parent_attach(self, parent) 

1612 

1613 

1614class SchemaVisitable(SchemaEventTarget, visitors.Visitable): 

1615 """Base class for elements that are targets of a :class:`.SchemaVisitor`. 

1616 

1617 .. versionadded:: 2.0.41 

1618 

1619 """ 

1620 

1621 

1622class SchemaVisitor(ClauseVisitor): 

1623 """Define the visiting for ``SchemaItem`` and more 

1624 generally ``SchemaVisitable`` objects. 

1625 

1626 """ 

1627 

1628 __traverse_options__: Dict[str, Any] = {"schema_visitor": True} 

1629 

1630 

1631class _SentinelDefaultCharacterization(Enum): 

1632 NONE = "none" 

1633 UNKNOWN = "unknown" 

1634 CLIENTSIDE = "clientside" 

1635 SENTINEL_DEFAULT = "sentinel_default" 

1636 SERVERSIDE = "serverside" 

1637 IDENTITY = "identity" 

1638 SEQUENCE = "sequence" 

1639 MONOTONIC_FUNCTION = "monotonic" 

1640 

1641 

1642class _SentinelColumnCharacterization(NamedTuple): 

1643 columns: Optional[Sequence[Column[Any]]] = None 

1644 is_explicit: bool = False 

1645 is_autoinc: bool = False 

1646 default_characterization: _SentinelDefaultCharacterization = ( 

1647 _SentinelDefaultCharacterization.NONE 

1648 ) 

1649 

1650 

1651_COLKEY = TypeVar("_COLKEY", Union[None, str], str) 

1652 

1653_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True) 

1654_COL = TypeVar("_COL", bound="ColumnElement[Any]") 

1655 

1656 

1657class _ColumnMetrics(Generic[_COL_co]): 

1658 __slots__ = ("column",) 

1659 

1660 column: _COL_co 

1661 

1662 def __init__( 

1663 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co 

1664 ) -> None: 

1665 self.column = col 

1666 

1667 # proxy_index being non-empty means it was initialized. 

1668 # so we need to update it 

1669 pi = collection._proxy_index 

1670 if pi: 

1671 for eps_col in col._expanded_proxy_set: 

1672 pi[eps_col].add(self) 

1673 

1674 def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]: 

1675 return self.column._expanded_proxy_set 

1676 

1677 def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None: 

1678 pi = collection._proxy_index 

1679 if not pi: 

1680 return 

1681 for col in self.column._expanded_proxy_set: 

1682 colset = pi.get(col, None) 

1683 if colset: 

1684 colset.discard(self) 

1685 if colset is not None and not colset: 

1686 del pi[col] 

1687 

1688 def embedded( 

1689 self, 

1690 target_set: Union[ 

1691 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]] 

1692 ], 

1693 ) -> bool: 

1694 expanded_proxy_set = self.column._expanded_proxy_set 

1695 for t in target_set.difference(expanded_proxy_set): 

1696 if not expanded_proxy_set.intersection(_expand_cloned([t])): 

1697 return False 

1698 return True 

1699 

1700 

1701class ColumnCollection(Generic[_COLKEY, _COL_co]): 

1702 """Collection of :class:`_expression.ColumnElement` instances, 

1703 typically for 

1704 :class:`_sql.FromClause` objects. 

1705 

1706 The :class:`_sql.ColumnCollection` object is most commonly available 

1707 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection 

1708 on the :class:`_schema.Table` object, introduced at 

1709 :ref:`metadata_tables_and_columns`. 

1710 

1711 The :class:`_expression.ColumnCollection` has both mapping- and sequence- 

1712 like behaviors. A :class:`_expression.ColumnCollection` usually stores 

1713 :class:`_schema.Column` objects, which are then accessible both via mapping 

1714 style access as well as attribute access style. 

1715 

1716 To access :class:`_schema.Column` objects using ordinary attribute-style 

1717 access, specify the name like any other object attribute, such as below 

1718 a column named ``employee_name`` is accessed:: 

1719 

1720 >>> employee_table.c.employee_name 

1721 

1722 To access columns that have names with special characters or spaces, 

1723 index-style access is used, such as below which illustrates a column named 

1724 ``employee ' payment`` is accessed:: 

1725 

1726 >>> employee_table.c["employee ' payment"] 

1727 

1728 As the :class:`_sql.ColumnCollection` object provides a Python dictionary 

1729 interface, common dictionary method names like 

1730 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`, 

1731 and :meth:`_sql.ColumnCollection.items` are available, which means that 

1732 database columns that are keyed under these names also need to use indexed 

1733 access:: 

1734 

1735 >>> employee_table.c["values"] 

1736 

1737 

1738 The name for which a :class:`_schema.Column` would be present is normally 

1739 that of the :paramref:`_schema.Column.key` parameter. In some contexts, 

1740 such as a :class:`_sql.Select` object that uses a label style set 

1741 using the :meth:`_sql.Select.set_label_style` method, a column of a certain 

1742 key may instead be represented under a particular label name such 

1743 as ``tablename_columnname``:: 

1744 

1745 >>> from sqlalchemy import select, column, table 

1746 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL 

1747 >>> t = table("t", column("c")) 

1748 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) 

1749 >>> subq = stmt.subquery() 

1750 >>> subq.c.t_c 

1751 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c> 

1752 

1753 :class:`.ColumnCollection` also indexes the columns in order and allows 

1754 them to be accessible by their integer position:: 

1755 

1756 >>> cc[0] 

1757 Column('x', Integer(), table=None) 

1758 >>> cc[1] 

1759 Column('y', Integer(), table=None) 

1760 

1761 .. versionadded:: 1.4 :class:`_expression.ColumnCollection` 

1762 allows integer-based 

1763 index access to the collection. 

1764 

1765 Iterating the collection yields the column expressions in order:: 

1766 

1767 >>> list(cc) 

1768 [Column('x', Integer(), table=None), 

1769 Column('y', Integer(), table=None)] 

1770 

1771 The base :class:`_expression.ColumnCollection` object can store 

1772 duplicates, which can 

1773 mean either two columns with the same key, in which case the column 

1774 returned by key access is **arbitrary**:: 

1775 

1776 >>> x1, x2 = Column("x", Integer), Column("x", Integer) 

1777 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)]) 

1778 >>> list(cc) 

1779 [Column('x', Integer(), table=None), 

1780 Column('x', Integer(), table=None)] 

1781 >>> cc["x"] is x1 

1782 False 

1783 >>> cc["x"] is x2 

1784 True 

1785 

1786 Or it can also mean the same column multiple times. These cases are 

1787 supported as :class:`_expression.ColumnCollection` 

1788 is used to represent the columns in 

1789 a SELECT statement which may include duplicates. 

1790 

1791 A special subclass :class:`.DedupeColumnCollection` exists which instead 

1792 maintains SQLAlchemy's older behavior of not allowing duplicates; this 

1793 collection is used for schema level objects like :class:`_schema.Table` 

1794 and 

1795 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The 

1796 :class:`.DedupeColumnCollection` class also has additional mutation methods 

1797 as the schema constructs have more use cases that require removal and 

1798 replacement of columns. 

1799 

1800 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection` 

1801 now stores duplicate 

1802 column keys as well as the same column in multiple positions. The 

1803 :class:`.DedupeColumnCollection` class is added to maintain the 

1804 former behavior in those cases where deduplication as well as 

1805 additional replace/remove operations are needed. 

1806 

1807 

1808 """ 

1809 

1810 __slots__ = ("_collection", "_index", "_colset", "_proxy_index") 

1811 

1812 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]] 

1813 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]] 

1814 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]] 

1815 _colset: Set[_COL_co] 

1816 

1817 def __init__( 

1818 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None 

1819 ): 

1820 object.__setattr__(self, "_colset", set()) 

1821 object.__setattr__(self, "_index", {}) 

1822 object.__setattr__( 

1823 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

1824 ) 

1825 object.__setattr__(self, "_collection", []) 

1826 if columns: 

1827 self._initial_populate(columns) 

1828 

1829 @util.preload_module("sqlalchemy.sql.elements") 

1830 def __clause_element__(self) -> ClauseList: 

1831 elements = util.preloaded.sql_elements 

1832 

1833 return elements.ClauseList( 

1834 _literal_as_text_role=roles.ColumnsClauseRole, 

1835 group=False, 

1836 *self._all_columns, 

1837 ) 

1838 

1839 def _initial_populate( 

1840 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1841 ) -> None: 

1842 self._populate_separate_keys(iter_) 

1843 

1844 @property 

1845 def _all_columns(self) -> List[_COL_co]: 

1846 return [col for (_, col, _) in self._collection] 

1847 

1848 def keys(self) -> List[_COLKEY]: 

1849 """Return a sequence of string key names for all columns in this 

1850 collection.""" 

1851 return [k for (k, _, _) in self._collection] 

1852 

1853 def values(self) -> List[_COL_co]: 

1854 """Return a sequence of :class:`_sql.ColumnClause` or 

1855 :class:`_schema.Column` objects for all columns in this 

1856 collection.""" 

1857 return [col for (_, col, _) in self._collection] 

1858 

1859 def items(self) -> List[Tuple[_COLKEY, _COL_co]]: 

1860 """Return a sequence of (key, column) tuples for all columns in this 

1861 collection each consisting of a string key name and a 

1862 :class:`_sql.ColumnClause` or 

1863 :class:`_schema.Column` object. 

1864 """ 

1865 

1866 return [(k, col) for (k, col, _) in self._collection] 

1867 

1868 def __bool__(self) -> bool: 

1869 return bool(self._collection) 

1870 

1871 def __len__(self) -> int: 

1872 return len(self._collection) 

1873 

1874 def __iter__(self) -> Iterator[_COL_co]: 

1875 # turn to a list first to maintain over a course of changes 

1876 return iter([col for _, col, _ in self._collection]) 

1877 

1878 @overload 

1879 def __getitem__(self, key: Union[str, int]) -> _COL_co: ... 

1880 

1881 @overload 

1882 def __getitem__( 

1883 self, key: Tuple[Union[str, int], ...] 

1884 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1885 

1886 @overload 

1887 def __getitem__( 

1888 self, key: slice 

1889 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1890 

1891 def __getitem__( 

1892 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]] 

1893 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]: 

1894 try: 

1895 if isinstance(key, (tuple, slice)): 

1896 if isinstance(key, slice): 

1897 cols = ( 

1898 (sub_key, col) 

1899 for (sub_key, col, _) in self._collection[key] 

1900 ) 

1901 else: 

1902 cols = (self._index[sub_key] for sub_key in key) 

1903 

1904 return ColumnCollection(cols).as_readonly() 

1905 else: 

1906 return self._index[key][1] 

1907 except KeyError as err: 

1908 if isinstance(err.args[0], int): 

1909 raise IndexError(err.args[0]) from err 

1910 else: 

1911 raise 

1912 

1913 def __getattr__(self, key: str) -> _COL_co: 

1914 try: 

1915 return self._index[key][1] 

1916 except KeyError as err: 

1917 raise AttributeError(key) from err 

1918 

1919 def __contains__(self, key: str) -> bool: 

1920 if key not in self._index: 

1921 if not isinstance(key, str): 

1922 raise exc.ArgumentError( 

1923 "__contains__ requires a string argument" 

1924 ) 

1925 return False 

1926 else: 

1927 return True 

1928 

1929 def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool: 

1930 """Compare this :class:`_expression.ColumnCollection` to another 

1931 based on the names of the keys""" 

1932 

1933 for l, r in zip_longest(self, other): 

1934 if l is not r: 

1935 return False 

1936 else: 

1937 return True 

1938 

1939 def __eq__(self, other: Any) -> bool: 

1940 return self.compare(other) 

1941 

1942 @overload 

1943 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ... 

1944 

1945 @overload 

1946 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ... 

1947 

1948 def get( 

1949 self, key: str, default: Optional[_COL] = None 

1950 ) -> Optional[Union[_COL_co, _COL]]: 

1951 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object 

1952 based on a string key name from this 

1953 :class:`_expression.ColumnCollection`.""" 

1954 

1955 if key in self._index: 

1956 return self._index[key][1] 

1957 else: 

1958 return default 

1959 

1960 def __str__(self) -> str: 

1961 return "%s(%s)" % ( 

1962 self.__class__.__name__, 

1963 ", ".join(str(c) for c in self), 

1964 ) 

1965 

1966 def __setitem__(self, key: str, value: Any) -> NoReturn: 

1967 raise NotImplementedError() 

1968 

1969 def __delitem__(self, key: str) -> NoReturn: 

1970 raise NotImplementedError() 

1971 

1972 def __setattr__(self, key: str, obj: Any) -> NoReturn: 

1973 raise NotImplementedError() 

1974 

1975 def clear(self) -> NoReturn: 

1976 """Dictionary clear() is not implemented for 

1977 :class:`_sql.ColumnCollection`.""" 

1978 raise NotImplementedError() 

1979 

1980 def remove(self, column: Any) -> NoReturn: 

1981 raise NotImplementedError() 

1982 

1983 def update(self, iter_: Any) -> NoReturn: 

1984 """Dictionary update() is not implemented for 

1985 :class:`_sql.ColumnCollection`.""" 

1986 raise NotImplementedError() 

1987 

1988 # https://github.com/python/mypy/issues/4266 

1989 __hash__: Optional[int] = None # type: ignore 

1990 

1991 def _populate_separate_keys( 

1992 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1993 ) -> None: 

1994 """populate from an iterator of (key, column)""" 

1995 

1996 self._collection[:] = collection = [ 

1997 (k, c, _ColumnMetrics(self, c)) for k, c in iter_ 

1998 ] 

1999 self._colset.update(c._deannotate() for _, c, _ in collection) 

2000 self._index.update( 

2001 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)} 

2002 ) 

2003 self._index.update({k: (k, col) for k, col, _ in reversed(collection)}) 

2004 

2005 def add( 

2006 self, 

2007 column: ColumnElement[Any], 

2008 key: Optional[_COLKEY] = None, 

2009 ) -> None: 

2010 """Add a column to this :class:`_sql.ColumnCollection`. 

2011 

2012 .. note:: 

2013 

2014 This method is **not normally used by user-facing code**, as the 

2015 :class:`_sql.ColumnCollection` is usually part of an existing 

2016 object such as a :class:`_schema.Table`. To add a 

2017 :class:`_schema.Column` to an existing :class:`_schema.Table` 

2018 object, use the :meth:`_schema.Table.append_column` method. 

2019 

2020 """ 

2021 colkey: _COLKEY 

2022 

2023 if key is None: 

2024 colkey = column.key # type: ignore 

2025 else: 

2026 colkey = key 

2027 

2028 l = len(self._collection) 

2029 

2030 # don't really know how this part is supposed to work w/ the 

2031 # covariant thing 

2032 

2033 _column = cast(_COL_co, column) 

2034 

2035 self._collection.append( 

2036 (colkey, _column, _ColumnMetrics(self, _column)) 

2037 ) 

2038 self._colset.add(_column._deannotate()) 

2039 

2040 self._index[l] = (colkey, _column) 

2041 if colkey not in self._index: 

2042 self._index[colkey] = (colkey, _column) 

2043 

2044 def __getstate__(self) -> Dict[str, Any]: 

2045 return { 

2046 "_collection": [(k, c) for k, c, _ in self._collection], 

2047 "_index": self._index, 

2048 } 

2049 

2050 def __setstate__(self, state: Dict[str, Any]) -> None: 

2051 object.__setattr__(self, "_index", state["_index"]) 

2052 object.__setattr__( 

2053 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

2054 ) 

2055 object.__setattr__( 

2056 self, 

2057 "_collection", 

2058 [ 

2059 (k, c, _ColumnMetrics(self, c)) 

2060 for (k, c) in state["_collection"] 

2061 ], 

2062 ) 

2063 object.__setattr__( 

2064 self, "_colset", {col for k, col, _ in self._collection} 

2065 ) 

2066 

2067 def contains_column(self, col: ColumnElement[Any]) -> bool: 

2068 """Checks if a column object exists in this collection""" 

2069 if col not in self._colset: 

2070 if isinstance(col, str): 

2071 raise exc.ArgumentError( 

2072 "contains_column cannot be used with string arguments. " 

2073 "Use ``col_name in table.c`` instead." 

2074 ) 

2075 return False 

2076 else: 

2077 return True 

2078 

2079 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: 

2080 """Return a "read only" form of this 

2081 :class:`_sql.ColumnCollection`.""" 

2082 

2083 return ReadOnlyColumnCollection(self) 

2084 

2085 def _init_proxy_index(self) -> None: 

2086 """populate the "proxy index", if empty. 

2087 

2088 proxy index is added in 2.0 to provide more efficient operation 

2089 for the corresponding_column() method. 

2090 

2091 For reasons of both time to construct new .c collections as well as 

2092 memory conservation for large numbers of large .c collections, the 

2093 proxy_index is only filled if corresponding_column() is called. once 

2094 filled it stays that way, and new _ColumnMetrics objects created after 

2095 that point will populate it with new data. Note this case would be 

2096 unusual, if not nonexistent, as it means a .c collection is being 

2097 mutated after corresponding_column() were used, however it is tested in 

2098 test/base/test_utils.py. 

2099 

2100 """ 

2101 pi = self._proxy_index 

2102 if pi: 

2103 return 

2104 

2105 for _, _, metrics in self._collection: 

2106 eps = metrics.column._expanded_proxy_set 

2107 

2108 for eps_col in eps: 

2109 pi[eps_col].add(metrics) 

2110 

2111 def corresponding_column( 

2112 self, column: _COL, require_embedded: bool = False 

2113 ) -> Optional[Union[_COL, _COL_co]]: 

2114 """Given a :class:`_expression.ColumnElement`, return the exported 

2115 :class:`_expression.ColumnElement` object from this 

2116 :class:`_expression.ColumnCollection` 

2117 which corresponds to that original :class:`_expression.ColumnElement` 

2118 via a common 

2119 ancestor column. 

2120 

2121 :param column: the target :class:`_expression.ColumnElement` 

2122 to be matched. 

2123 

2124 :param require_embedded: only return corresponding columns for 

2125 the given :class:`_expression.ColumnElement`, if the given 

2126 :class:`_expression.ColumnElement` 

2127 is actually present within a sub-element 

2128 of this :class:`_expression.Selectable`. 

2129 Normally the column will match if 

2130 it merely shares a common ancestor with one of the exported 

2131 columns of this :class:`_expression.Selectable`. 

2132 

2133 .. seealso:: 

2134 

2135 :meth:`_expression.Selectable.corresponding_column` 

2136 - invokes this method 

2137 against the collection returned by 

2138 :attr:`_expression.Selectable.exported_columns`. 

2139 

2140 .. versionchanged:: 1.4 the implementation for ``corresponding_column`` 

2141 was moved onto the :class:`_expression.ColumnCollection` itself. 

2142 

2143 """ 

2144 # TODO: cython candidate 

2145 

2146 # don't dig around if the column is locally present 

2147 if column in self._colset: 

2148 return column 

2149 

2150 selected_intersection, selected_metrics = None, None 

2151 target_set = column.proxy_set 

2152 

2153 pi = self._proxy_index 

2154 if not pi: 

2155 self._init_proxy_index() 

2156 

2157 for current_metrics in ( 

2158 mm for ts in target_set if ts in pi for mm in pi[ts] 

2159 ): 

2160 if not require_embedded or current_metrics.embedded(target_set): 

2161 if selected_metrics is None: 

2162 # no corresponding column yet, pick this one. 

2163 selected_metrics = current_metrics 

2164 continue 

2165 

2166 current_intersection = target_set.intersection( 

2167 current_metrics.column._expanded_proxy_set 

2168 ) 

2169 if selected_intersection is None: 

2170 selected_intersection = target_set.intersection( 

2171 selected_metrics.column._expanded_proxy_set 

2172 ) 

2173 

2174 if len(current_intersection) > len(selected_intersection): 

2175 # 'current' has a larger field of correspondence than 

2176 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x 

2177 # matches a1.c.x->table.c.x better than 

2178 # selectable.c.x->table.c.x does. 

2179 

2180 selected_metrics = current_metrics 

2181 selected_intersection = current_intersection 

2182 elif current_intersection == selected_intersection: 

2183 # they have the same field of correspondence. see 

2184 # which proxy_set has fewer columns in it, which 

2185 # indicates a closer relationship with the root 

2186 # column. Also take into account the "weight" 

2187 # attribute which CompoundSelect() uses to give 

2188 # higher precedence to columns based on vertical 

2189 # position in the compound statement, and discard 

2190 # columns that have no reference to the target 

2191 # column (also occurs with CompoundSelect) 

2192 

2193 selected_col_distance = sum( 

2194 [ 

2195 sc._annotations.get("weight", 1) 

2196 for sc in ( 

2197 selected_metrics.column._uncached_proxy_list() 

2198 ) 

2199 if sc.shares_lineage(column) 

2200 ], 

2201 ) 

2202 current_col_distance = sum( 

2203 [ 

2204 sc._annotations.get("weight", 1) 

2205 for sc in ( 

2206 current_metrics.column._uncached_proxy_list() 

2207 ) 

2208 if sc.shares_lineage(column) 

2209 ], 

2210 ) 

2211 if current_col_distance < selected_col_distance: 

2212 selected_metrics = current_metrics 

2213 selected_intersection = current_intersection 

2214 

2215 return selected_metrics.column if selected_metrics else None 

2216 

2217 

2218_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]") 

2219 

2220 

2221class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]): 

2222 """A :class:`_expression.ColumnCollection` 

2223 that maintains deduplicating behavior. 

2224 

2225 This is useful by schema level objects such as :class:`_schema.Table` and 

2226 :class:`.PrimaryKeyConstraint`. The collection includes more 

2227 sophisticated mutator methods as well to suit schema objects which 

2228 require mutable column collections. 

2229 

2230 .. versionadded:: 1.4 

2231 

2232 """ 

2233 

2234 def add( # type: ignore[override] 

2235 self, 

2236 column: _NAMEDCOL, 

2237 key: Optional[str] = None, 

2238 *, 

2239 index: Optional[int] = None, 

2240 ) -> None: 

2241 if key is not None and column.key != key: 

2242 raise exc.ArgumentError( 

2243 "DedupeColumnCollection requires columns be under " 

2244 "the same key as their .key" 

2245 ) 

2246 key = column.key 

2247 

2248 if key is None: 

2249 raise exc.ArgumentError( 

2250 "Can't add unnamed column to column collection" 

2251 ) 

2252 

2253 if key in self._index: 

2254 existing = self._index[key][1] 

2255 

2256 if existing is column: 

2257 return 

2258 

2259 self.replace(column, index=index) 

2260 

2261 # pop out memoized proxy_set as this 

2262 # operation may very well be occurring 

2263 # in a _make_proxy operation 

2264 util.memoized_property.reset(column, "proxy_set") 

2265 else: 

2266 self._append_new_column(key, column, index=index) 

2267 

2268 def _append_new_column( 

2269 self, key: str, named_column: _NAMEDCOL, *, index: Optional[int] = None 

2270 ) -> None: 

2271 collection_length = len(self._collection) 

2272 

2273 if index is None: 

2274 l = collection_length 

2275 else: 

2276 if index < 0: 

2277 index = max(0, collection_length + index) 

2278 l = index 

2279 

2280 if index is None: 

2281 self._collection.append( 

2282 (key, named_column, _ColumnMetrics(self, named_column)) 

2283 ) 

2284 else: 

2285 self._collection.insert( 

2286 index, (key, named_column, _ColumnMetrics(self, named_column)) 

2287 ) 

2288 

2289 self._colset.add(named_column._deannotate()) 

2290 

2291 if index is not None: 

2292 for idx in reversed(range(index, collection_length)): 

2293 self._index[idx + 1] = self._index[idx] 

2294 

2295 self._index[l] = (key, named_column) 

2296 self._index[key] = (key, named_column) 

2297 

2298 def _populate_separate_keys( 

2299 self, iter_: Iterable[Tuple[str, _NAMEDCOL]] 

2300 ) -> None: 

2301 """populate from an iterator of (key, column)""" 

2302 cols = list(iter_) 

2303 

2304 replace_col = [] 

2305 for k, col in cols: 

2306 if col.key != k: 

2307 raise exc.ArgumentError( 

2308 "DedupeColumnCollection requires columns be under " 

2309 "the same key as their .key" 

2310 ) 

2311 if col.name in self._index and col.key != col.name: 

2312 replace_col.append(col) 

2313 elif col.key in self._index: 

2314 replace_col.append(col) 

2315 else: 

2316 self._index[k] = (k, col) 

2317 self._collection.append((k, col, _ColumnMetrics(self, col))) 

2318 self._colset.update(c._deannotate() for (k, c, _) in self._collection) 

2319 

2320 self._index.update( 

2321 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection) 

2322 ) 

2323 for col in replace_col: 

2324 self.replace(col) 

2325 

2326 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None: 

2327 self._populate_separate_keys((col.key, col) for col in iter_) 

2328 

2329 def remove(self, column: _NAMEDCOL) -> None: # type: ignore[override] 

2330 if column not in self._colset: 

2331 raise ValueError( 

2332 "Can't remove column %r; column is not in this collection" 

2333 % column 

2334 ) 

2335 del self._index[column.key] 

2336 self._colset.remove(column) 

2337 self._collection[:] = [ 

2338 (k, c, metrics) 

2339 for (k, c, metrics) in self._collection 

2340 if c is not column 

2341 ] 

2342 for metrics in self._proxy_index.get(column, ()): 

2343 metrics.dispose(self) 

2344 

2345 self._index.update( 

2346 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2347 ) 

2348 # delete higher index 

2349 del self._index[len(self._collection)] 

2350 

2351 def replace( 

2352 self, 

2353 column: _NAMEDCOL, 

2354 *, 

2355 extra_remove: Optional[Iterable[_NAMEDCOL]] = None, 

2356 index: Optional[int] = None, 

2357 ) -> None: 

2358 """add the given column to this collection, removing unaliased 

2359 versions of this column as well as existing columns with the 

2360 same key. 

2361 

2362 e.g.:: 

2363 

2364 t = Table("sometable", metadata, Column("col1", Integer)) 

2365 t.columns.replace(Column("col1", Integer, key="columnone")) 

2366 

2367 will remove the original 'col1' from the collection, and add 

2368 the new column under the name 'columnname'. 

2369 

2370 Used by schema.Column to override columns during table reflection. 

2371 

2372 """ 

2373 

2374 if extra_remove: 

2375 remove_col = set(extra_remove) 

2376 else: 

2377 remove_col = set() 

2378 # remove up to two columns based on matches of name as well as key 

2379 if column.name in self._index and column.key != column.name: 

2380 other = self._index[column.name][1] 

2381 if other.name == other.key: 

2382 remove_col.add(other) 

2383 

2384 if column.key in self._index: 

2385 remove_col.add(self._index[column.key][1]) 

2386 

2387 if not remove_col: 

2388 self._append_new_column(column.key, column, index=index) 

2389 return 

2390 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = [] 

2391 replace_index = None 

2392 

2393 for idx, (k, col, metrics) in enumerate(self._collection): 

2394 if col in remove_col: 

2395 if replace_index is None: 

2396 replace_index = idx 

2397 new_cols.append( 

2398 (column.key, column, _ColumnMetrics(self, column)) 

2399 ) 

2400 else: 

2401 new_cols.append((k, col, metrics)) 

2402 

2403 if remove_col: 

2404 self._colset.difference_update(remove_col) 

2405 

2406 for rc in remove_col: 

2407 for metrics in self._proxy_index.get(rc, ()): 

2408 metrics.dispose(self) 

2409 

2410 if replace_index is None: 

2411 if index is not None: 

2412 new_cols.insert( 

2413 index, (column.key, column, _ColumnMetrics(self, column)) 

2414 ) 

2415 

2416 else: 

2417 new_cols.append( 

2418 (column.key, column, _ColumnMetrics(self, column)) 

2419 ) 

2420 elif index is not None: 

2421 to_move = new_cols[replace_index] 

2422 effective_positive_index = ( 

2423 index if index >= 0 else max(0, len(new_cols) + index) 

2424 ) 

2425 new_cols.insert(index, to_move) 

2426 if replace_index > effective_positive_index: 

2427 del new_cols[replace_index + 1] 

2428 else: 

2429 del new_cols[replace_index] 

2430 

2431 self._colset.add(column._deannotate()) 

2432 self._collection[:] = new_cols 

2433 

2434 self._index.clear() 

2435 

2436 self._index.update( 

2437 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2438 ) 

2439 self._index.update({k: (k, col) for (k, col, _) in self._collection}) 

2440 

2441 

2442class ReadOnlyColumnCollection( 

2443 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co] 

2444): 

2445 __slots__ = ("_parent",) 

2446 

2447 def __init__(self, collection: ColumnCollection[_COLKEY, _COL_co]): 

2448 object.__setattr__(self, "_parent", collection) 

2449 object.__setattr__(self, "_colset", collection._colset) 

2450 object.__setattr__(self, "_index", collection._index) 

2451 object.__setattr__(self, "_collection", collection._collection) 

2452 object.__setattr__(self, "_proxy_index", collection._proxy_index) 

2453 

2454 def __getstate__(self) -> Dict[str, _COL_co]: 

2455 return {"_parent": self._parent} 

2456 

2457 def __setstate__(self, state: Dict[str, Any]) -> None: 

2458 parent = state["_parent"] 

2459 self.__init__(parent) # type: ignore 

2460 

2461 def add(self, column: Any, key: Any = ...) -> Any: 

2462 self._readonly() 

2463 

2464 def extend(self, elements: Any) -> NoReturn: 

2465 self._readonly() 

2466 

2467 def remove(self, item: Any) -> NoReturn: 

2468 self._readonly() 

2469 

2470 

2471class ColumnSet(util.OrderedSet["ColumnClause[Any]"]): 

2472 def contains_column(self, col: ColumnClause[Any]) -> bool: 

2473 return col in self 

2474 

2475 def extend(self, cols: Iterable[Any]) -> None: 

2476 for col in cols: 

2477 self.add(col) 

2478 

2479 def __eq__(self, other): 

2480 l = [] 

2481 for c in other: 

2482 for local in self: 

2483 if c.shares_lineage(local): 

2484 l.append(c == local) 

2485 return elements.and_(*l) 

2486 

2487 def __hash__(self) -> int: # type: ignore[override] 

2488 return hash(tuple(x for x in self)) 

2489 

2490 

2491def _entity_namespace( 

2492 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2493) -> _EntityNamespace: 

2494 """Return the nearest .entity_namespace for the given entity. 

2495 

2496 If not immediately available, does an iterate to find a sub-element 

2497 that has one, if any. 

2498 

2499 """ 

2500 try: 

2501 return cast(_HasEntityNamespace, entity).entity_namespace 

2502 except AttributeError: 

2503 for elem in visitors.iterate(cast(ExternallyTraversible, entity)): 

2504 if _is_has_entity_namespace(elem): 

2505 return elem.entity_namespace 

2506 else: 

2507 raise 

2508 

2509 

2510@overload 

2511def _entity_namespace_key( 

2512 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2513 key: str, 

2514) -> SQLCoreOperations[Any]: ... 

2515 

2516 

2517@overload 

2518def _entity_namespace_key( 

2519 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2520 key: str, 

2521 default: _NoArg, 

2522) -> SQLCoreOperations[Any]: ... 

2523 

2524 

2525@overload 

2526def _entity_namespace_key( 

2527 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2528 key: str, 

2529 default: _T, 

2530) -> Union[SQLCoreOperations[Any], _T]: ... 

2531 

2532 

2533def _entity_namespace_key( 

2534 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2535 key: str, 

2536 default: Union[SQLCoreOperations[Any], _T, _NoArg] = NO_ARG, 

2537) -> Union[SQLCoreOperations[Any], _T]: 

2538 """Return an entry from an entity_namespace. 

2539 

2540 

2541 Raises :class:`_exc.InvalidRequestError` rather than attribute error 

2542 on not found. 

2543 

2544 """ 

2545 

2546 try: 

2547 ns = _entity_namespace(entity) 

2548 if default is not NO_ARG: 

2549 return getattr(ns, key, default) 

2550 else: 

2551 return getattr(ns, key) # type: ignore 

2552 except AttributeError as err: 

2553 raise exc.InvalidRequestError( 

2554 'Entity namespace for "%s" has no property "%s"' % (entity, key) 

2555 ) from err 

2556 

2557 

2558def _entity_namespace_key_search_all( 

2559 entities: Collection[Any], 

2560 key: str, 

2561) -> SQLCoreOperations[Any]: 

2562 """Search multiple entities for a key, raise if ambiguous or not found. 

2563 

2564 This is used by filter_by() to search across all FROM clause entities 

2565 when a single entity doesn't have the requested attribute. 

2566 

2567 .. versionadded:: 2.1 

2568 

2569 Raises: 

2570 AmbiguousColumnError: If key exists in multiple entities 

2571 InvalidRequestError: If key doesn't exist in any entity 

2572 """ 

2573 

2574 match_: SQLCoreOperations[Any] | None = None 

2575 

2576 for entity in entities: 

2577 ns = _entity_namespace(entity) 

2578 # Check if the attribute exists 

2579 if hasattr(ns, key): 

2580 if match_ is not None: 

2581 entity_desc = ", ".join(str(e) for e in list(entities)[:3]) 

2582 if len(entities) > 3: 

2583 entity_desc += f", ... ({len(entities)} total)" 

2584 raise exc.AmbiguousColumnError( 

2585 f'Attribute name "{key}" is ambiguous; it exists in ' 

2586 f"multiple FROM clause entities ({entity_desc}). " 

2587 f"Use filter() with explicit column references instead " 

2588 f"of filter_by()." 

2589 ) 

2590 match_ = getattr(ns, key) 

2591 

2592 if match_ is None: 

2593 # No entity has this attribute 

2594 entity_desc = ", ".join(str(e) for e in list(entities)[:3]) 

2595 if len(entities) > 3: 

2596 entity_desc += f", ... ({len(entities)} total)" 

2597 raise exc.InvalidRequestError( 

2598 f'None of the FROM clause entities have a property "{key}". ' 

2599 f"Searched entities: {entity_desc}" 

2600 ) 

2601 

2602 return match_