Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/base.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

927 statements  

1# sql/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Foundational utilities common to many sql modules.""" 

10 

11 

12from __future__ import annotations 

13 

14import collections 

15from enum import Enum 

16import itertools 

17from itertools import zip_longest 

18import operator 

19import re 

20from typing import Any 

21from typing import Callable 

22from typing import cast 

23from typing import Dict 

24from typing import Final 

25from typing import FrozenSet 

26from typing import Generator 

27from typing import Generic 

28from typing import Iterable 

29from typing import Iterator 

30from typing import List 

31from typing import Mapping 

32from typing import MutableMapping 

33from typing import NamedTuple 

34from typing import NoReturn 

35from typing import Optional 

36from typing import overload 

37from typing import Protocol 

38from typing import Sequence 

39from typing import Set 

40from typing import Tuple 

41from typing import Type 

42from typing import TYPE_CHECKING 

43from typing import TypeVar 

44from typing import Union 

45 

46from . import roles 

47from . import visitors 

48from .cache_key import HasCacheKey # noqa 

49from .cache_key import MemoizedHasCacheKey # noqa 

50from .traversals import HasCopyInternals # noqa 

51from .visitors import ClauseVisitor 

52from .visitors import ExtendedInternalTraversal 

53from .visitors import ExternallyTraversible 

54from .visitors import InternalTraversal 

55from .. import event 

56from .. import exc 

57from .. import util 

58from ..util import HasMemoized as HasMemoized 

59from ..util import hybridmethod 

60from ..util.typing import Self 

61from ..util.typing import TypeGuard 

62from ..util.typing import TypeVarTuple 

63from ..util.typing import Unpack 

64 

65if TYPE_CHECKING: 

66 from . import coercions 

67 from . import elements 

68 from . import type_api 

69 from ._orm_types import DMLStrategyArgument 

70 from ._orm_types import SynchronizeSessionArgument 

71 from ._typing import _CLE 

72 from .cache_key import CacheKey 

73 from .compiler import SQLCompiler 

74 from .dml import Delete 

75 from .dml import Insert 

76 from .dml import Update 

77 from .elements import BindParameter 

78 from .elements import ClauseElement 

79 from .elements import ClauseList 

80 from .elements import ColumnClause # noqa 

81 from .elements import ColumnElement 

82 from .elements import NamedColumn 

83 from .elements import SQLCoreOperations 

84 from .elements import TextClause 

85 from .schema import Column 

86 from .schema import DefaultGenerator 

87 from .selectable import _JoinTargetElement 

88 from .selectable import _SelectIterable 

89 from .selectable import FromClause 

90 from .selectable import Select 

91 from .visitors import anon_map 

92 from ..engine import Connection 

93 from ..engine import CursorResult 

94 from ..engine.interfaces import _CoreMultiExecuteParams 

95 from ..engine.interfaces import _ExecuteOptions 

96 from ..engine.interfaces import _ImmutableExecuteOptions 

97 from ..engine.interfaces import CacheStats 

98 from ..engine.interfaces import Compiled 

99 from ..engine.interfaces import CompiledCacheType 

100 from ..engine.interfaces import CoreExecuteOptionsParameter 

101 from ..engine.interfaces import Dialect 

102 from ..engine.interfaces import IsolationLevel 

103 from ..engine.interfaces import SchemaTranslateMapType 

104 from ..event import dispatcher 

105 

106if not TYPE_CHECKING: 

107 coercions = None # noqa 

108 elements = None # noqa 

109 type_api = None # noqa 

110 

111 

112_Ts = TypeVarTuple("_Ts") 

113 

114 

115class _NoArg(Enum): 

116 NO_ARG = 0 

117 

118 def __repr__(self): 

119 return f"_NoArg.{self.name}" 

120 

121 

122NO_ARG: Final = _NoArg.NO_ARG 

123 

124 

125class _NoneName(Enum): 

126 NONE_NAME = 0 

127 """indicate a 'deferred' name that was ultimately the value None.""" 

128 

129 

130_NONE_NAME: Final = _NoneName.NONE_NAME 

131 

132_T = TypeVar("_T", bound=Any) 

133 

134_Fn = TypeVar("_Fn", bound=Callable[..., Any]) 

135 

136_AmbiguousTableNameMap = MutableMapping[str, str] 

137 

138 

139class _DefaultDescriptionTuple(NamedTuple): 

140 arg: Any 

141 is_scalar: Optional[bool] 

142 is_callable: Optional[bool] 

143 is_sentinel: Optional[bool] 

144 

145 @classmethod 

146 def _from_column_default( 

147 cls, default: Optional[DefaultGenerator] 

148 ) -> _DefaultDescriptionTuple: 

149 return ( 

150 _DefaultDescriptionTuple( 

151 default.arg, # type: ignore 

152 default.is_scalar, 

153 default.is_callable, 

154 default.is_sentinel, 

155 ) 

156 if default 

157 and ( 

158 default.has_arg 

159 or (not default.for_update and default.is_sentinel) 

160 ) 

161 else _DefaultDescriptionTuple(None, None, None, None) 

162 ) 

163 

164 

165_never_select_column: operator.attrgetter[Any] = operator.attrgetter( 

166 "_omit_from_statements" 

167) 

168 

169 

170class _EntityNamespace(Protocol): 

171 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ... 

172 

173 

174class _HasEntityNamespace(Protocol): 

175 @util.ro_non_memoized_property 

176 def entity_namespace(self) -> _EntityNamespace: ... 

177 

178 

179def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]: 

180 return hasattr(element, "entity_namespace") 

181 

182 

183# Remove when https://github.com/python/mypy/issues/14640 will be fixed 

184_Self = TypeVar("_Self", bound=Any) 

185 

186 

187class Immutable: 

188 """mark a ClauseElement as 'immutable' when expressions are cloned. 

189 

190 "immutable" objects refers to the "mutability" of an object in the 

191 context of SQL DQL and DML generation. Such as, in DQL, one can 

192 compose a SELECT or subquery of varied forms, but one cannot modify 

193 the structure of a specific table or column within DQL. 

194 :class:`.Immutable` is mostly intended to follow this concept, and as 

195 such the primary "immutable" objects are :class:`.ColumnClause`, 

196 :class:`.Column`, :class:`.TableClause`, :class:`.Table`. 

197 

198 """ 

199 

200 __slots__ = () 

201 

202 _is_immutable: bool = True 

203 

204 def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn: 

205 raise NotImplementedError("Immutable objects do not support copying") 

206 

207 def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn: 

208 raise NotImplementedError("Immutable objects do not support copying") 

209 

210 def _clone(self: _Self, **kw: Any) -> _Self: 

211 return self 

212 

213 def _copy_internals( 

214 self, *, omit_attrs: Iterable[str] = (), **kw: Any 

215 ) -> None: 

216 pass 

217 

218 

219class SingletonConstant(Immutable): 

220 """Represent SQL constants like NULL, TRUE, FALSE""" 

221 

222 _is_singleton_constant: bool = True 

223 

224 _singleton: SingletonConstant 

225 

226 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T: 

227 return cast(_T, cls._singleton) 

228 

229 @util.non_memoized_property 

230 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]: 

231 raise NotImplementedError() 

232 

233 @classmethod 

234 def _create_singleton(cls) -> None: 

235 obj = object.__new__(cls) 

236 obj.__init__() # type: ignore 

237 

238 # for a long time this was an empty frozenset, meaning 

239 # a SingletonConstant would never be a "corresponding column" in 

240 # a statement. This referred to #6259. However, in #7154 we see 

241 # that we do in fact need "correspondence" to work when matching cols 

242 # in result sets, so the non-correspondence was moved to a more 

243 # specific level when we are actually adapting expressions for SQL 

244 # render only. 

245 obj.proxy_set = frozenset([obj]) 

246 cls._singleton = obj 

247 

248 

249def _from_objects( 

250 *elements: Union[ 

251 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement 

252 ] 

253) -> Iterator[FromClause]: 

254 return itertools.chain.from_iterable( 

255 [element._from_objects for element in elements] 

256 ) 

257 

258 

259def _select_iterables( 

260 elements: Iterable[roles.ColumnsClauseRole], 

261) -> _SelectIterable: 

262 """expand tables into individual columns in the 

263 given list of column expressions. 

264 

265 """ 

266 return itertools.chain.from_iterable( 

267 [c._select_iterable for c in elements] 

268 ) 

269 

270 

271_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType") 

272 

273 

274class _GenerativeType(Protocol): 

275 def _generate(self) -> Self: ... 

276 

277 

278def _generative(fn: _Fn) -> _Fn: 

279 """non-caching _generative() decorator. 

280 

281 This is basically the legacy decorator that copies the object and 

282 runs a method on the new copy. 

283 

284 """ 

285 

286 @util.decorator 

287 def _generative( 

288 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any 

289 ) -> _SelfGenerativeType: 

290 """Mark a method as generative.""" 

291 

292 self = self._generate() 

293 x = fn(self, *args, **kw) 

294 assert x is self, "generative methods must return self" 

295 return self 

296 

297 decorated = _generative(fn) 

298 decorated.non_generative = fn # type: ignore 

299 return decorated 

300 

301 

302def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]: 

303 msgs: Dict[str, str] = kw.pop("msgs", {}) 

304 

305 defaults: Dict[str, str] = kw.pop("defaults", {}) 

306 

307 getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [ 

308 (name, operator.attrgetter(name), defaults.get(name, None)) 

309 for name in names 

310 ] 

311 

312 @util.decorator 

313 def check(fn: _Fn, *args: Any, **kw: Any) -> Any: 

314 # make pylance happy by not including "self" in the argument 

315 # list 

316 self = args[0] 

317 args = args[1:] 

318 for name, getter, default_ in getters: 

319 if getter(self) is not default_: 

320 msg = msgs.get( 

321 name, 

322 "Method %s() has already been invoked on this %s construct" 

323 % (fn.__name__, self.__class__), 

324 ) 

325 raise exc.InvalidRequestError(msg) 

326 return fn(self, *args, **kw) 

327 

328 return check 

329 

330 

331def _clone(element, **kw): 

332 return element._clone(**kw) 

333 

334 

335def _expand_cloned( 

336 elements: Iterable[_CLE], 

337) -> Iterable[_CLE]: 

338 """expand the given set of ClauseElements to be the set of all 'cloned' 

339 predecessors. 

340 

341 """ 

342 # TODO: cython candidate 

343 return itertools.chain(*[x._cloned_set for x in elements]) 

344 

345 

346def _de_clone( 

347 elements: Iterable[_CLE], 

348) -> Iterable[_CLE]: 

349 for x in elements: 

350 while x._is_clone_of is not None: 

351 x = x._is_clone_of 

352 yield x 

353 

354 

355def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

356 """return the intersection of sets a and b, counting 

357 any overlap between 'cloned' predecessors. 

358 

359 The returned set is in terms of the entities present within 'a'. 

360 

361 """ 

362 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection( 

363 _expand_cloned(b) 

364 ) 

365 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)} 

366 

367 

368def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

369 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection( 

370 _expand_cloned(b) 

371 ) 

372 return { 

373 elem for elem in a if not all_overlap.intersection(elem._cloned_set) 

374 } 

375 

376 

377class _DialectArgView(MutableMapping[str, Any]): 

378 """A dictionary view of dialect-level arguments in the form 

379 <dialectname>_<argument_name>. 

380 

381 """ 

382 

383 __slots__ = ("obj",) 

384 

385 def __init__(self, obj: DialectKWArgs) -> None: 

386 self.obj = obj 

387 

388 def _key(self, key: str) -> Tuple[str, str]: 

389 try: 

390 dialect, value_key = key.split("_", 1) 

391 except ValueError as err: 

392 raise KeyError(key) from err 

393 else: 

394 return dialect, value_key 

395 

396 def __getitem__(self, key: str) -> Any: 

397 dialect, value_key = self._key(key) 

398 

399 try: 

400 opt = self.obj.dialect_options[dialect] 

401 except exc.NoSuchModuleError as err: 

402 raise KeyError(key) from err 

403 else: 

404 return opt[value_key] 

405 

406 def __setitem__(self, key: str, value: Any) -> None: 

407 try: 

408 dialect, value_key = self._key(key) 

409 except KeyError as err: 

410 raise exc.ArgumentError( 

411 "Keys must be of the form <dialectname>_<argname>" 

412 ) from err 

413 else: 

414 self.obj.dialect_options[dialect][value_key] = value 

415 

416 def __delitem__(self, key: str) -> None: 

417 dialect, value_key = self._key(key) 

418 del self.obj.dialect_options[dialect][value_key] 

419 

420 def __len__(self) -> int: 

421 return sum( 

422 len(args._non_defaults) 

423 for args in self.obj.dialect_options.values() 

424 ) 

425 

426 def __iter__(self) -> Generator[str, None, None]: 

427 return ( 

428 "%s_%s" % (dialect_name, value_name) 

429 for dialect_name in self.obj.dialect_options 

430 for value_name in self.obj.dialect_options[ 

431 dialect_name 

432 ]._non_defaults 

433 ) 

434 

435 

436class _DialectArgDict(MutableMapping[str, Any]): 

437 """A dictionary view of dialect-level arguments for a specific 

438 dialect. 

439 

440 Maintains a separate collection of user-specified arguments 

441 and dialect-specified default arguments. 

442 

443 """ 

444 

445 def __init__(self) -> None: 

446 self._non_defaults: Dict[str, Any] = {} 

447 self._defaults: Dict[str, Any] = {} 

448 

449 def __len__(self) -> int: 

450 return len(set(self._non_defaults).union(self._defaults)) 

451 

452 def __iter__(self) -> Iterator[str]: 

453 return iter(set(self._non_defaults).union(self._defaults)) 

454 

455 def __getitem__(self, key: str) -> Any: 

456 if key in self._non_defaults: 

457 return self._non_defaults[key] 

458 else: 

459 return self._defaults[key] 

460 

461 def __setitem__(self, key: str, value: Any) -> None: 

462 self._non_defaults[key] = value 

463 

464 def __delitem__(self, key: str) -> None: 

465 del self._non_defaults[key] 

466 

467 

468@util.preload_module("sqlalchemy.dialects") 

469def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]: 

470 dialect_cls = util.preloaded.dialects.registry.load(dialect_name) 

471 if dialect_cls.construct_arguments is None: 

472 return None 

473 return dict(dialect_cls.construct_arguments) 

474 

475 

476class DialectKWArgs: 

477 """Establish the ability for a class to have dialect-specific arguments 

478 with defaults and constructor validation. 

479 

480 The :class:`.DialectKWArgs` interacts with the 

481 :attr:`.DefaultDialect.construct_arguments` present on a dialect. 

482 

483 .. seealso:: 

484 

485 :attr:`.DefaultDialect.construct_arguments` 

486 

487 """ 

488 

489 __slots__ = () 

490 

491 _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [ 

492 ("dialect_options", InternalTraversal.dp_dialect_options) 

493 ] 

494 

495 @classmethod 

496 def argument_for( 

497 cls, dialect_name: str, argument_name: str, default: Any 

498 ) -> None: 

499 """Add a new kind of dialect-specific keyword argument for this class. 

500 

501 E.g.:: 

502 

503 Index.argument_for("mydialect", "length", None) 

504 

505 some_index = Index("a", "b", mydialect_length=5) 

506 

507 The :meth:`.DialectKWArgs.argument_for` method is a per-argument 

508 way adding extra arguments to the 

509 :attr:`.DefaultDialect.construct_arguments` dictionary. This 

510 dictionary provides a list of argument names accepted by various 

511 schema-level constructs on behalf of a dialect. 

512 

513 New dialects should typically specify this dictionary all at once as a 

514 data member of the dialect class. The use case for ad-hoc addition of 

515 argument names is typically for end-user code that is also using 

516 a custom compilation scheme which consumes the additional arguments. 

517 

518 :param dialect_name: name of a dialect. The dialect must be 

519 locatable, else a :class:`.NoSuchModuleError` is raised. The 

520 dialect must also include an existing 

521 :attr:`.DefaultDialect.construct_arguments` collection, indicating 

522 that it participates in the keyword-argument validation and default 

523 system, else :class:`.ArgumentError` is raised. If the dialect does 

524 not include this collection, then any keyword argument can be 

525 specified on behalf of this dialect already. All dialects packaged 

526 within SQLAlchemy include this collection, however for third party 

527 dialects, support may vary. 

528 

529 :param argument_name: name of the parameter. 

530 

531 :param default: default value of the parameter. 

532 

533 """ 

534 

535 construct_arg_dictionary: Optional[Dict[Any, Any]] = ( 

536 DialectKWArgs._kw_registry[dialect_name] 

537 ) 

538 if construct_arg_dictionary is None: 

539 raise exc.ArgumentError( 

540 "Dialect '%s' does have keyword-argument " 

541 "validation and defaults enabled configured" % dialect_name 

542 ) 

543 if cls not in construct_arg_dictionary: 

544 construct_arg_dictionary[cls] = {} 

545 construct_arg_dictionary[cls][argument_name] = default 

546 

547 @property 

548 def dialect_kwargs(self) -> _DialectArgView: 

549 """A collection of keyword arguments specified as dialect-specific 

550 options to this construct. 

551 

552 The arguments are present here in their original ``<dialect>_<kwarg>`` 

553 format. Only arguments that were actually passed are included; 

554 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which 

555 contains all options known by this dialect including defaults. 

556 

557 The collection is also writable; keys are accepted of the 

558 form ``<dialect>_<kwarg>`` where the value will be assembled 

559 into the list of options. 

560 

561 .. seealso:: 

562 

563 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form 

564 

565 """ 

566 return _DialectArgView(self) 

567 

568 @property 

569 def kwargs(self) -> _DialectArgView: 

570 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`.""" 

571 return self.dialect_kwargs 

572 

573 _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = ( 

574 util.PopulateDict(_kw_reg_for_dialect) 

575 ) 

576 

577 @classmethod 

578 def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict: 

579 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name] 

580 d = _DialectArgDict() 

581 

582 if construct_arg_dictionary is None: 

583 d._defaults.update({"*": None}) 

584 else: 

585 for cls in reversed(cls.__mro__): 

586 if cls in construct_arg_dictionary: 

587 d._defaults.update(construct_arg_dictionary[cls]) 

588 return d 

589 

590 @util.memoized_property 

591 def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]: 

592 """A collection of keyword arguments specified as dialect-specific 

593 options to this construct. 

594 

595 This is a two-level nested registry, keyed to ``<dialect_name>`` 

596 and ``<argument_name>``. For example, the ``postgresql_where`` 

597 argument would be locatable as:: 

598 

599 arg = my_object.dialect_options["postgresql"]["where"] 

600 

601 .. versionadded:: 0.9.2 

602 

603 .. seealso:: 

604 

605 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form 

606 

607 """ 

608 

609 return util.PopulateDict(self._kw_reg_for_dialect_cls) 

610 

611 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None: 

612 # validate remaining kwargs that they all specify DB prefixes 

613 

614 if not kwargs: 

615 return 

616 

617 for k in kwargs: 

618 m = re.match("^(.+?)_(.+)$", k) 

619 if not m: 

620 raise TypeError( 

621 "Additional arguments should be " 

622 "named <dialectname>_<argument>, got '%s'" % k 

623 ) 

624 dialect_name, arg_name = m.group(1, 2) 

625 

626 try: 

627 construct_arg_dictionary = self.dialect_options[dialect_name] 

628 except exc.NoSuchModuleError: 

629 util.warn( 

630 "Can't validate argument %r; can't " 

631 "locate any SQLAlchemy dialect named %r" 

632 % (k, dialect_name) 

633 ) 

634 self.dialect_options[dialect_name] = d = _DialectArgDict() 

635 d._defaults.update({"*": None}) 

636 d._non_defaults[arg_name] = kwargs[k] 

637 else: 

638 if ( 

639 "*" not in construct_arg_dictionary 

640 and arg_name not in construct_arg_dictionary 

641 ): 

642 raise exc.ArgumentError( 

643 "Argument %r is not accepted by " 

644 "dialect %r on behalf of %r" 

645 % (k, dialect_name, self.__class__) 

646 ) 

647 else: 

648 construct_arg_dictionary[arg_name] = kwargs[k] 

649 

650 

651class CompileState: 

652 """Produces additional object state necessary for a statement to be 

653 compiled. 

654 

655 the :class:`.CompileState` class is at the base of classes that assemble 

656 state for a particular statement object that is then used by the 

657 compiler. This process is essentially an extension of the process that 

658 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis 

659 on converting raw user intent into more organized structures rather than 

660 producing string output. The top-level :class:`.CompileState` for the 

661 statement being executed is also accessible when the execution context 

662 works with invoking the statement and collecting results. 

663 

664 The production of :class:`.CompileState` is specific to the compiler, such 

665 as within the :meth:`.SQLCompiler.visit_insert`, 

666 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also 

667 responsible for associating the :class:`.CompileState` with the 

668 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement, 

669 i.e. the outermost SQL statement that's actually being executed. 

670 There can be other :class:`.CompileState` objects that are not the 

671 toplevel, such as when a SELECT subquery or CTE-nested 

672 INSERT/UPDATE/DELETE is generated. 

673 

674 .. versionadded:: 1.4 

675 

676 """ 

677 

678 __slots__ = ("statement", "_ambiguous_table_name_map") 

679 

680 plugins: Dict[Tuple[str, str], Type[CompileState]] = {} 

681 

682 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] 

683 

684 @classmethod 

685 def create_for_statement( 

686 cls, statement: Executable, compiler: SQLCompiler, **kw: Any 

687 ) -> CompileState: 

688 # factory construction. 

689 

690 if statement._propagate_attrs: 

691 plugin_name = statement._propagate_attrs.get( 

692 "compile_state_plugin", "default" 

693 ) 

694 klass = cls.plugins.get( 

695 (plugin_name, statement._effective_plugin_target), None 

696 ) 

697 if klass is None: 

698 klass = cls.plugins[ 

699 ("default", statement._effective_plugin_target) 

700 ] 

701 

702 else: 

703 klass = cls.plugins[ 

704 ("default", statement._effective_plugin_target) 

705 ] 

706 

707 if klass is cls: 

708 return cls(statement, compiler, **kw) 

709 else: 

710 return klass.create_for_statement(statement, compiler, **kw) 

711 

712 def __init__(self, statement, compiler, **kw): 

713 self.statement = statement 

714 

715 @classmethod 

716 def get_plugin_class( 

717 cls, statement: Executable 

718 ) -> Optional[Type[CompileState]]: 

719 plugin_name = statement._propagate_attrs.get( 

720 "compile_state_plugin", None 

721 ) 

722 

723 if plugin_name: 

724 key = (plugin_name, statement._effective_plugin_target) 

725 if key in cls.plugins: 

726 return cls.plugins[key] 

727 

728 # there's no case where we call upon get_plugin_class() and want 

729 # to get None back, there should always be a default. return that 

730 # if there was no plugin-specific class (e.g. Insert with "orm" 

731 # plugin) 

732 try: 

733 return cls.plugins[("default", statement._effective_plugin_target)] 

734 except KeyError: 

735 return None 

736 

737 @classmethod 

738 def _get_plugin_class_for_plugin( 

739 cls, statement: Executable, plugin_name: str 

740 ) -> Optional[Type[CompileState]]: 

741 try: 

742 return cls.plugins[ 

743 (plugin_name, statement._effective_plugin_target) 

744 ] 

745 except KeyError: 

746 return None 

747 

748 @classmethod 

749 def plugin_for( 

750 cls, plugin_name: str, visit_name: str 

751 ) -> Callable[[_Fn], _Fn]: 

752 def decorate(cls_to_decorate): 

753 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate 

754 return cls_to_decorate 

755 

756 return decorate 

757 

758 

759class Generative(HasMemoized): 

760 """Provide a method-chaining pattern in conjunction with the 

761 @_generative decorator.""" 

762 

763 def _generate(self) -> Self: 

764 skip = self._memoized_keys 

765 cls = self.__class__ 

766 s = cls.__new__(cls) 

767 if skip: 

768 # ensure this iteration remains atomic 

769 s.__dict__ = { 

770 k: v for k, v in self.__dict__.copy().items() if k not in skip 

771 } 

772 else: 

773 s.__dict__ = self.__dict__.copy() 

774 return s 

775 

776 

777class InPlaceGenerative(HasMemoized): 

778 """Provide a method-chaining pattern in conjunction with the 

779 @_generative decorator that mutates in place.""" 

780 

781 __slots__ = () 

782 

783 def _generate(self): 

784 skip = self._memoized_keys 

785 # note __dict__ needs to be in __slots__ if this is used 

786 for k in skip: 

787 self.__dict__.pop(k, None) 

788 return self 

789 

790 

791class HasCompileState(Generative): 

792 """A class that has a :class:`.CompileState` associated with it.""" 

793 

794 _compile_state_plugin: Optional[Type[CompileState]] = None 

795 

796 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT 

797 

798 _compile_state_factory = CompileState.create_for_statement 

799 

800 

801class _MetaOptions(type): 

802 """metaclass for the Options class. 

803 

804 This metaclass is actually necessary despite the availability of the 

805 ``__init_subclass__()`` hook as this type also provides custom class-level 

806 behavior for the ``__add__()`` method. 

807 

808 """ 

809 

810 _cache_attrs: Tuple[str, ...] 

811 

812 def __add__(self, other): 

813 o1 = self() 

814 

815 if set(other).difference(self._cache_attrs): 

816 raise TypeError( 

817 "dictionary contains attributes not covered by " 

818 "Options class %s: %r" 

819 % (self, set(other).difference(self._cache_attrs)) 

820 ) 

821 

822 o1.__dict__.update(other) 

823 return o1 

824 

825 if TYPE_CHECKING: 

826 

827 def __getattr__(self, key: str) -> Any: ... 

828 

829 def __setattr__(self, key: str, value: Any) -> None: ... 

830 

831 def __delattr__(self, key: str) -> None: ... 

832 

833 

834class Options(metaclass=_MetaOptions): 

835 """A cacheable option dictionary with defaults.""" 

836 

837 __slots__ = () 

838 

839 _cache_attrs: Tuple[str, ...] 

840 

841 def __init_subclass__(cls) -> None: 

842 dict_ = cls.__dict__ 

843 cls._cache_attrs = tuple( 

844 sorted( 

845 d 

846 for d in dict_ 

847 if not d.startswith("__") 

848 and d not in ("_cache_key_traversal",) 

849 ) 

850 ) 

851 super().__init_subclass__() 

852 

853 def __init__(self, **kw: Any) -> None: 

854 self.__dict__.update(kw) 

855 

856 def __add__(self, other): 

857 o1 = self.__class__.__new__(self.__class__) 

858 o1.__dict__.update(self.__dict__) 

859 

860 if set(other).difference(self._cache_attrs): 

861 raise TypeError( 

862 "dictionary contains attributes not covered by " 

863 "Options class %s: %r" 

864 % (self, set(other).difference(self._cache_attrs)) 

865 ) 

866 

867 o1.__dict__.update(other) 

868 return o1 

869 

870 def __eq__(self, other): 

871 # TODO: very inefficient. This is used only in test suites 

872 # right now. 

873 for a, b in zip_longest(self._cache_attrs, other._cache_attrs): 

874 if getattr(self, a) != getattr(other, b): 

875 return False 

876 return True 

877 

878 def __repr__(self) -> str: 

879 # TODO: fairly inefficient, used only in debugging right now. 

880 

881 return "%s(%s)" % ( 

882 self.__class__.__name__, 

883 ", ".join( 

884 "%s=%r" % (k, self.__dict__[k]) 

885 for k in self._cache_attrs 

886 if k in self.__dict__ 

887 ), 

888 ) 

889 

890 @classmethod 

891 def isinstance(cls, klass: Type[Any]) -> bool: 

892 return issubclass(cls, klass) 

893 

894 @hybridmethod 

895 def add_to_element(self, name: str, value: str) -> Any: 

896 return self + {name: getattr(self, name) + value} 

897 

898 @hybridmethod 

899 def _state_dict_inst(self) -> Mapping[str, Any]: 

900 return self.__dict__ 

901 

902 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT 

903 

904 @_state_dict_inst.classlevel 

905 def _state_dict(cls) -> Mapping[str, Any]: 

906 return cls._state_dict_const 

907 

908 @classmethod 

909 def safe_merge(cls, other: "Options") -> Any: 

910 d = other._state_dict() 

911 

912 # only support a merge with another object of our class 

913 # and which does not have attrs that we don't. otherwise 

914 # we risk having state that might not be part of our cache 

915 # key strategy 

916 

917 if ( 

918 cls is not other.__class__ 

919 and other._cache_attrs 

920 and set(other._cache_attrs).difference(cls._cache_attrs) 

921 ): 

922 raise TypeError( 

923 "other element %r is not empty, is not of type %s, " 

924 "and contains attributes not covered here %r" 

925 % ( 

926 other, 

927 cls, 

928 set(other._cache_attrs).difference(cls._cache_attrs), 

929 ) 

930 ) 

931 return cls + d 

932 

933 @classmethod 

934 def from_execution_options( 

935 cls, 

936 key: str, 

937 attrs: set[str], 

938 exec_options: Mapping[str, Any], 

939 statement_exec_options: Mapping[str, Any], 

940 ) -> Tuple["Options", Mapping[str, Any]]: 

941 """process Options argument in terms of execution options. 

942 

943 

944 e.g.:: 

945 

946 ( 

947 load_options, 

948 execution_options, 

949 ) = QueryContext.default_load_options.from_execution_options( 

950 "_sa_orm_load_options", 

951 {"populate_existing", "autoflush", "yield_per"}, 

952 execution_options, 

953 statement._execution_options, 

954 ) 

955 

956 get back the Options and refresh "_sa_orm_load_options" in the 

957 exec options dict w/ the Options as well 

958 

959 """ 

960 

961 # common case is that no options we are looking for are 

962 # in either dictionary, so cancel for that first 

963 check_argnames = attrs.intersection( 

964 set(exec_options).union(statement_exec_options) 

965 ) 

966 

967 existing_options = exec_options.get(key, cls) 

968 

969 if check_argnames: 

970 result = {} 

971 for argname in check_argnames: 

972 local = "_" + argname 

973 if argname in exec_options: 

974 result[local] = exec_options[argname] 

975 elif argname in statement_exec_options: 

976 result[local] = statement_exec_options[argname] 

977 

978 new_options = existing_options + result 

979 exec_options = util.immutabledict().merge_with( 

980 exec_options, {key: new_options} 

981 ) 

982 return new_options, exec_options 

983 

984 else: 

985 return existing_options, exec_options 

986 

987 if TYPE_CHECKING: 

988 

989 def __getattr__(self, key: str) -> Any: ... 

990 

991 def __setattr__(self, key: str, value: Any) -> None: ... 

992 

993 def __delattr__(self, key: str) -> None: ... 

994 

995 

996class CacheableOptions(Options, HasCacheKey): 

997 __slots__ = () 

998 

999 @hybridmethod 

1000 def _gen_cache_key_inst( 

1001 self, anon_map: Any, bindparams: List[BindParameter[Any]] 

1002 ) -> Optional[Tuple[Any]]: 

1003 return HasCacheKey._gen_cache_key(self, anon_map, bindparams) 

1004 

1005 @_gen_cache_key_inst.classlevel 

1006 def _gen_cache_key( 

1007 cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]] 

1008 ) -> Tuple[CacheableOptions, Any]: 

1009 return (cls, ()) 

1010 

1011 @hybridmethod 

1012 def _generate_cache_key(self) -> Optional[CacheKey]: 

1013 return HasCacheKey._generate_cache_key_for_object(self) 

1014 

1015 

1016class ExecutableOption(HasCopyInternals): 

1017 __slots__ = () 

1018 

1019 _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT 

1020 

1021 __visit_name__: str = "executable_option" 

1022 

1023 _is_has_cache_key: bool = False 

1024 

1025 _is_core: bool = True 

1026 

1027 def _clone(self, **kw): 

1028 """Create a shallow copy of this ExecutableOption.""" 

1029 c = self.__class__.__new__(self.__class__) 

1030 c.__dict__ = dict(self.__dict__) # type: ignore 

1031 return c 

1032 

1033 

1034_L = TypeVar("_L", bound=str) 

1035 

1036 

1037class HasSyntaxExtensions(Generic[_L]): 

1038 

1039 _position_map: Mapping[_L, str] 

1040 

1041 @_generative 

1042 def ext(self, extension: SyntaxExtension) -> Self: 

1043 """Applies a SQL syntax extension to this statement. 

1044 

1045 SQL syntax extensions are :class:`.ClauseElement` objects that define 

1046 some vendor-specific syntactical construct that take place in specific 

1047 parts of a SQL statement. Examples include vendor extensions like 

1048 PostgreSQL / SQLite's "ON DUPLICATE KEY UPDATE", PostgreSQL's 

1049 "DISTINCT ON", and MySQL's "LIMIT" that can be applied to UPDATE 

1050 and DELETE statements. 

1051 

1052 .. seealso:: 

1053 

1054 :ref:`examples_syntax_extensions` 

1055 

1056 :func:`_mysql.limit` - DML LIMIT for MySQL 

1057 

1058 :func:`_postgresql.distinct_on` - DISTINCT ON for PostgreSQL 

1059 

1060 .. versionadded:: 2.1 

1061 

1062 """ 

1063 extension = coercions.expect( 

1064 roles.SyntaxExtensionRole, extension, apply_propagate_attrs=self 

1065 ) 

1066 self._apply_syntax_extension_to_self(extension) 

1067 return self 

1068 

1069 @util.preload_module("sqlalchemy.sql.elements") 

1070 def apply_syntax_extension_point( 

1071 self, 

1072 apply_fn: Callable[[Sequence[ClauseElement]], Sequence[ClauseElement]], 

1073 position: _L, 

1074 ) -> None: 

1075 """Apply a :class:`.SyntaxExtension` to a known extension point. 

1076 

1077 Should be used only internally by :class:`.SyntaxExtension`. 

1078 

1079 E.g.:: 

1080 

1081 class Qualify(SyntaxExtension, ClauseElement): 

1082 

1083 # ... 

1084 

1085 def apply_to_select(self, select_stmt: Select) -> None: 

1086 # append self to existing 

1087 select_stmt.apply_extension_point( 

1088 lambda existing: [*existing, self], "post_criteria" 

1089 ) 

1090 

1091 

1092 class ReplaceExt(SyntaxExtension, ClauseElement): 

1093 

1094 # ... 

1095 

1096 def apply_to_select(self, select_stmt: Select) -> None: 

1097 # replace any existing elements regardless of type 

1098 select_stmt.apply_extension_point( 

1099 lambda existing: [self], "post_criteria" 

1100 ) 

1101 

1102 

1103 class ReplaceOfTypeExt(SyntaxExtension, ClauseElement): 

1104 

1105 # ... 

1106 

1107 def apply_to_select(self, select_stmt: Select) -> None: 

1108 # replace any existing elements of the same type 

1109 select_stmt.apply_extension_point( 

1110 self.append_replacing_same_type, "post_criteria" 

1111 ) 

1112 

1113 :param apply_fn: callable function that will receive a sequence of 

1114 :class:`.ClauseElement` that is already populating the extension 

1115 point (the sequence is empty if there isn't one), and should return 

1116 a new sequence of :class:`.ClauseElement` that will newly populate 

1117 that point. The function typically can choose to concatenate the 

1118 existing values with the new one, or to replace the values that are 

1119 there with a new one by returning a list of a single element, or 

1120 to perform more complex operations like removing only the same 

1121 type element from the input list of merging already existing elements 

1122 of the same type. Some examples are shown in the examples above 

1123 :param position: string name of the position to apply to. This 

1124 varies per statement type. IDEs should show the possible values 

1125 for each statement type as it's typed with a ``typing.Literal`` per 

1126 statement. 

1127 

1128 .. seealso:: 

1129 

1130 :ref:`examples_syntax_extensions` 

1131 

1132 

1133 """ # noqa: E501 

1134 

1135 try: 

1136 attrname = self._position_map[position] 

1137 except KeyError as ke: 

1138 raise ValueError( 

1139 f"Unknown position {position!r} for {self.__class__} " 

1140 f"construct; known positions: " 

1141 f"{', '.join(repr(k) for k in self._position_map)}" 

1142 ) from ke 

1143 else: 

1144 ElementList = util.preloaded.sql_elements.ElementList 

1145 existing: Optional[ClauseElement] = getattr(self, attrname, None) 

1146 if existing is None: 

1147 input_seq: Tuple[ClauseElement, ...] = () 

1148 elif isinstance(existing, ElementList): 

1149 input_seq = existing.clauses 

1150 else: 

1151 input_seq = (existing,) 

1152 

1153 new_seq = apply_fn(input_seq) 

1154 assert new_seq, "cannot return empty sequence" 

1155 new = new_seq[0] if len(new_seq) == 1 else ElementList(new_seq) 

1156 setattr(self, attrname, new) 

1157 

1158 def _apply_syntax_extension_to_self( 

1159 self, extension: SyntaxExtension 

1160 ) -> None: 

1161 raise NotImplementedError() 

1162 

1163 def _get_syntax_extensions_as_dict(self) -> Mapping[_L, SyntaxExtension]: 

1164 res: Dict[_L, SyntaxExtension] = {} 

1165 for name, attr in self._position_map.items(): 

1166 value = getattr(self, attr) 

1167 if value is not None: 

1168 res[name] = value 

1169 return res 

1170 

1171 def _set_syntax_extensions(self, **extensions: SyntaxExtension) -> None: 

1172 for name, value in extensions.items(): 

1173 setattr(self, self._position_map[name], value) # type: ignore[index] # noqa: E501 

1174 

1175 

1176class SyntaxExtension(roles.SyntaxExtensionRole): 

1177 """Defines a unit that when also extending from :class:`.ClauseElement` 

1178 can be applied to SQLAlchemy statements :class:`.Select`, 

1179 :class:`_sql.Insert`, :class:`.Update` and :class:`.Delete` making use of 

1180 pre-established SQL insertion points within these constructs. 

1181 

1182 .. versionadded:: 2.1 

1183 

1184 .. seealso:: 

1185 

1186 :ref:`examples_syntax_extensions` 

1187 

1188 """ 

1189 

1190 def append_replacing_same_type( 

1191 self, existing: Sequence[ClauseElement] 

1192 ) -> Sequence[ClauseElement]: 

1193 """Utility function that can be used as 

1194 :paramref:`_sql.HasSyntaxExtensions.apply_extension_point.apply_fn` 

1195 to remove any other element of the same type in existing and appending 

1196 ``self`` to the list. 

1197 

1198 This is equivalent to:: 

1199 

1200 stmt.apply_extension_point( 

1201 lambda existing: [ 

1202 *(e for e in existing if not isinstance(e, ReplaceOfTypeExt)), 

1203 self, 

1204 ], 

1205 "post_criteria", 

1206 ) 

1207 

1208 .. seealso:: 

1209 

1210 :ref:`examples_syntax_extensions` 

1211 

1212 :meth:`_sql.HasSyntaxExtensions.apply_syntax_extension_point` 

1213 

1214 """ # noqa: E501 

1215 cls = type(self) 

1216 return [*(e for e in existing if not isinstance(e, cls)), self] # type: ignore[list-item] # noqa: E501 

1217 

1218 def apply_to_select(self, select_stmt: Select[Unpack[_Ts]]) -> None: 

1219 """Apply this :class:`.SyntaxExtension` to a :class:`.Select`""" 

1220 raise NotImplementedError( 

1221 f"Extension {type(self).__name__} cannot be applied to select" 

1222 ) 

1223 

1224 def apply_to_update(self, update_stmt: Update) -> None: 

1225 """Apply this :class:`.SyntaxExtension` to an :class:`.Update`""" 

1226 raise NotImplementedError( 

1227 f"Extension {type(self).__name__} cannot be applied to update" 

1228 ) 

1229 

1230 def apply_to_delete(self, delete_stmt: Delete) -> None: 

1231 """Apply this :class:`.SyntaxExtension` to a :class:`.Delete`""" 

1232 raise NotImplementedError( 

1233 f"Extension {type(self).__name__} cannot be applied to delete" 

1234 ) 

1235 

1236 def apply_to_insert(self, insert_stmt: Insert) -> None: 

1237 """Apply this :class:`.SyntaxExtension` to an :class:`_sql.Insert`""" 

1238 raise NotImplementedError( 

1239 f"Extension {type(self).__name__} cannot be applied to insert" 

1240 ) 

1241 

1242 

1243class Executable(roles.StatementRole): 

1244 """Mark a :class:`_expression.ClauseElement` as supporting execution. 

1245 

1246 :class:`.Executable` is a superclass for all "statement" types 

1247 of objects, including :func:`select`, :func:`delete`, :func:`update`, 

1248 :func:`insert`, :func:`text`. 

1249 

1250 """ 

1251 

1252 supports_execution: bool = True 

1253 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT 

1254 _is_default_generator: bool = False 

1255 _with_options: Tuple[ExecutableOption, ...] = () 

1256 _compile_state_funcs: Tuple[ 

1257 Tuple[Callable[[CompileState], None], Any], ... 

1258 ] = () 

1259 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]] 

1260 

1261 _executable_traverse_internals = [ 

1262 ("_with_options", InternalTraversal.dp_executable_options), 

1263 ( 

1264 "_compile_state_funcs", 

1265 ExtendedInternalTraversal.dp_compile_state_funcs, 

1266 ), 

1267 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs), 

1268 ] 

1269 

1270 is_select: bool = False 

1271 is_from_statement: bool = False 

1272 is_update: bool = False 

1273 is_insert: bool = False 

1274 is_text: bool = False 

1275 is_delete: bool = False 

1276 is_dml: bool = False 

1277 

1278 if TYPE_CHECKING: 

1279 __visit_name__: str 

1280 

1281 def _compile_w_cache( 

1282 self, 

1283 dialect: Dialect, 

1284 *, 

1285 compiled_cache: Optional[CompiledCacheType], 

1286 column_keys: List[str], 

1287 for_executemany: bool = False, 

1288 schema_translate_map: Optional[SchemaTranslateMapType] = None, 

1289 **kw: Any, 

1290 ) -> Tuple[ 

1291 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats 

1292 ]: ... 

1293 

1294 def _execute_on_connection( 

1295 self, 

1296 connection: Connection, 

1297 distilled_params: _CoreMultiExecuteParams, 

1298 execution_options: CoreExecuteOptionsParameter, 

1299 ) -> CursorResult[Any]: ... 

1300 

1301 def _execute_on_scalar( 

1302 self, 

1303 connection: Connection, 

1304 distilled_params: _CoreMultiExecuteParams, 

1305 execution_options: CoreExecuteOptionsParameter, 

1306 ) -> Any: ... 

1307 

1308 @util.ro_non_memoized_property 

1309 def _all_selected_columns(self) -> _SelectIterable: 

1310 raise NotImplementedError() 

1311 

1312 @property 

1313 def _effective_plugin_target(self) -> str: 

1314 return self.__visit_name__ 

1315 

1316 @_generative 

1317 def options(self, *options: ExecutableOption) -> Self: 

1318 """Apply options to this statement. 

1319 

1320 In the general sense, options are any kind of Python object 

1321 that can be interpreted by systems that consume the statement outside 

1322 of the regular SQL compiler chain. Specifically, these options are 

1323 the ORM level options that apply "eager load" and other loading 

1324 behaviors to an ORM query. 

1325 

1326 For background on specific kinds of options for specific kinds of 

1327 statements, refer to the documentation for those option objects. 

1328 

1329 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to 

1330 Core statement objects towards the goal of allowing unified 

1331 Core / ORM querying capabilities. 

1332 

1333 .. seealso:: 

1334 

1335 :ref:`loading_columns` - refers to options specific to the usage 

1336 of ORM queries 

1337 

1338 :ref:`relationship_loader_options` - refers to options specific 

1339 to the usage of ORM queries 

1340 

1341 """ 

1342 self._with_options += tuple( 

1343 coercions.expect(roles.ExecutableOptionRole, opt) 

1344 for opt in options 

1345 ) 

1346 return self 

1347 

1348 @_generative 

1349 def _set_compile_options(self, compile_options: CacheableOptions) -> Self: 

1350 """Assign the compile options to a new value. 

1351 

1352 :param compile_options: appropriate CacheableOptions structure 

1353 

1354 """ 

1355 

1356 self._compile_options = compile_options 

1357 return self 

1358 

1359 @_generative 

1360 def _update_compile_options(self, options: CacheableOptions) -> Self: 

1361 """update the _compile_options with new keys.""" 

1362 

1363 assert self._compile_options is not None 

1364 self._compile_options += options 

1365 return self 

1366 

1367 @_generative 

1368 def _add_compile_state_func( 

1369 self, 

1370 callable_: Callable[[CompileState], None], 

1371 cache_args: Any, 

1372 ) -> Self: 

1373 """Add a compile state function to this statement. 

1374 

1375 When using the ORM only, these are callable functions that will 

1376 be given the CompileState object upon compilation. 

1377 

1378 A second argument cache_args is required, which will be combined with 

1379 the ``__code__`` identity of the function itself in order to produce a 

1380 cache key. 

1381 

1382 """ 

1383 self._compile_state_funcs += ((callable_, cache_args),) 

1384 return self 

1385 

1386 @overload 

1387 def execution_options( 

1388 self, 

1389 *, 

1390 compiled_cache: Optional[CompiledCacheType] = ..., 

1391 logging_token: str = ..., 

1392 isolation_level: IsolationLevel = ..., 

1393 no_parameters: bool = False, 

1394 stream_results: bool = False, 

1395 max_row_buffer: int = ..., 

1396 yield_per: int = ..., 

1397 driver_column_names: bool = ..., 

1398 insertmanyvalues_page_size: int = ..., 

1399 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

1400 populate_existing: bool = False, 

1401 autoflush: bool = False, 

1402 synchronize_session: SynchronizeSessionArgument = ..., 

1403 dml_strategy: DMLStrategyArgument = ..., 

1404 render_nulls: bool = ..., 

1405 is_delete_using: bool = ..., 

1406 is_update_from: bool = ..., 

1407 preserve_rowcount: bool = False, 

1408 **opt: Any, 

1409 ) -> Self: ... 

1410 

1411 @overload 

1412 def execution_options(self, **opt: Any) -> Self: ... 

1413 

1414 @_generative 

1415 def execution_options(self, **kw: Any) -> Self: 

1416 """Set non-SQL options for the statement which take effect during 

1417 execution. 

1418 

1419 Execution options can be set at many scopes, including per-statement, 

1420 per-connection, or per execution, using methods such as 

1421 :meth:`_engine.Connection.execution_options` and parameters which 

1422 accept a dictionary of options such as 

1423 :paramref:`_engine.Connection.execute.execution_options` and 

1424 :paramref:`_orm.Session.execute.execution_options`. 

1425 

1426 The primary characteristic of an execution option, as opposed to 

1427 other kinds of options such as ORM loader options, is that 

1428 **execution options never affect the compiled SQL of a query, only 

1429 things that affect how the SQL statement itself is invoked or how 

1430 results are fetched**. That is, execution options are not part of 

1431 what's accommodated by SQL compilation nor are they considered part of 

1432 the cached state of a statement. 

1433 

1434 The :meth:`_sql.Executable.execution_options` method is 

1435 :term:`generative`, as 

1436 is the case for the method as applied to the :class:`_engine.Engine` 

1437 and :class:`_orm.Query` objects, which means when the method is called, 

1438 a copy of the object is returned, which applies the given parameters to 

1439 that new copy, but leaves the original unchanged:: 

1440 

1441 statement = select(table.c.x, table.c.y) 

1442 new_statement = statement.execution_options(my_option=True) 

1443 

1444 An exception to this behavior is the :class:`_engine.Connection` 

1445 object, where the :meth:`_engine.Connection.execution_options` method 

1446 is explicitly **not** generative. 

1447 

1448 The kinds of options that may be passed to 

1449 :meth:`_sql.Executable.execution_options` and other related methods and 

1450 parameter dictionaries include parameters that are explicitly consumed 

1451 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not 

1452 defined by SQLAlchemy, which means the methods and/or parameter 

1453 dictionaries may be used for user-defined parameters that interact with 

1454 custom code, which may access the parameters using methods such as 

1455 :meth:`_sql.Executable.get_execution_options` and 

1456 :meth:`_engine.Connection.get_execution_options`, or within selected 

1457 event hooks using a dedicated ``execution_options`` event parameter 

1458 such as 

1459 :paramref:`_events.ConnectionEvents.before_execute.execution_options` 

1460 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.:: 

1461 

1462 from sqlalchemy import event 

1463 

1464 

1465 @event.listens_for(some_engine, "before_execute") 

1466 def _process_opt(conn, statement, multiparams, params, execution_options): 

1467 "run a SQL function before invoking a statement" 

1468 

1469 if execution_options.get("do_special_thing", False): 

1470 conn.exec_driver_sql("run_special_function()") 

1471 

1472 Within the scope of options that are explicitly recognized by 

1473 SQLAlchemy, most apply to specific classes of objects and not others. 

1474 The most common execution options include: 

1475 

1476 * :paramref:`_engine.Connection.execution_options.isolation_level` - 

1477 sets the isolation level for a connection or a class of connections 

1478 via an :class:`_engine.Engine`. This option is accepted only 

1479 by :class:`_engine.Connection` or :class:`_engine.Engine`. 

1480 

1481 * :paramref:`_engine.Connection.execution_options.stream_results` - 

1482 indicates results should be fetched using a server side cursor; 

1483 this option is accepted by :class:`_engine.Connection`, by the 

1484 :paramref:`_engine.Connection.execute.execution_options` parameter 

1485 on :meth:`_engine.Connection.execute`, and additionally by 

1486 :meth:`_sql.Executable.execution_options` on a SQL statement object, 

1487 as well as by ORM constructs like :meth:`_orm.Session.execute`. 

1488 

1489 * :paramref:`_engine.Connection.execution_options.compiled_cache` - 

1490 indicates a dictionary that will serve as the 

1491 :ref:`SQL compilation cache <sql_caching>` 

1492 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as 

1493 well as for ORM methods like :meth:`_orm.Session.execute`. 

1494 Can be passed as ``None`` to disable caching for statements. 

1495 This option is not accepted by 

1496 :meth:`_sql.Executable.execution_options` as it is inadvisable to 

1497 carry along a compilation cache within a statement object. 

1498 

1499 * :paramref:`_engine.Connection.execution_options.schema_translate_map` 

1500 - a mapping of schema names used by the 

1501 :ref:`Schema Translate Map <schema_translating>` feature, accepted 

1502 by :class:`_engine.Connection`, :class:`_engine.Engine`, 

1503 :class:`_sql.Executable`, as well as by ORM constructs 

1504 like :meth:`_orm.Session.execute`. 

1505 

1506 .. seealso:: 

1507 

1508 :meth:`_engine.Connection.execution_options` 

1509 

1510 :paramref:`_engine.Connection.execute.execution_options` 

1511 

1512 :paramref:`_orm.Session.execute.execution_options` 

1513 

1514 :ref:`orm_queryguide_execution_options` - documentation on all 

1515 ORM-specific execution options 

1516 

1517 """ # noqa: E501 

1518 if "isolation_level" in kw: 

1519 raise exc.ArgumentError( 

1520 "'isolation_level' execution option may only be specified " 

1521 "on Connection.execution_options(), or " 

1522 "per-engine using the isolation_level " 

1523 "argument to create_engine()." 

1524 ) 

1525 if "compiled_cache" in kw: 

1526 raise exc.ArgumentError( 

1527 "'compiled_cache' execution option may only be specified " 

1528 "on Connection.execution_options(), not per statement." 

1529 ) 

1530 self._execution_options = self._execution_options.union(kw) 

1531 return self 

1532 

1533 def get_execution_options(self) -> _ExecuteOptions: 

1534 """Get the non-SQL options which will take effect during execution. 

1535 

1536 .. seealso:: 

1537 

1538 :meth:`.Executable.execution_options` 

1539 """ 

1540 return self._execution_options 

1541 

1542 

1543class SchemaEventTarget(event.EventTarget): 

1544 """Base class for elements that are the targets of :class:`.DDLEvents` 

1545 events. 

1546 

1547 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`. 

1548 

1549 """ 

1550 

1551 dispatch: dispatcher[SchemaEventTarget] 

1552 

1553 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: 

1554 """Associate with this SchemaEvent's parent object.""" 

1555 

1556 def _set_parent_with_dispatch( 

1557 self, parent: SchemaEventTarget, **kw: Any 

1558 ) -> None: 

1559 self.dispatch.before_parent_attach(self, parent) 

1560 self._set_parent(parent, **kw) 

1561 self.dispatch.after_parent_attach(self, parent) 

1562 

1563 

1564class SchemaVisitable(SchemaEventTarget, visitors.Visitable): 

1565 """Base class for elements that are targets of a :class:`.SchemaVisitor`. 

1566 

1567 .. versionadded:: 2.0.41 

1568 

1569 """ 

1570 

1571 

1572class SchemaVisitor(ClauseVisitor): 

1573 """Define the visiting for ``SchemaItem`` and more 

1574 generally ``SchemaVisitable`` objects. 

1575 

1576 """ 

1577 

1578 __traverse_options__: Dict[str, Any] = {"schema_visitor": True} 

1579 

1580 

1581class _SentinelDefaultCharacterization(Enum): 

1582 NONE = "none" 

1583 UNKNOWN = "unknown" 

1584 CLIENTSIDE = "clientside" 

1585 SENTINEL_DEFAULT = "sentinel_default" 

1586 SERVERSIDE = "serverside" 

1587 IDENTITY = "identity" 

1588 SEQUENCE = "sequence" 

1589 

1590 

1591class _SentinelColumnCharacterization(NamedTuple): 

1592 columns: Optional[Sequence[Column[Any]]] = None 

1593 is_explicit: bool = False 

1594 is_autoinc: bool = False 

1595 default_characterization: _SentinelDefaultCharacterization = ( 

1596 _SentinelDefaultCharacterization.NONE 

1597 ) 

1598 

1599 

1600_COLKEY = TypeVar("_COLKEY", Union[None, str], str) 

1601 

1602_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True) 

1603_COL = TypeVar("_COL", bound="ColumnElement[Any]") 

1604 

1605 

1606class _ColumnMetrics(Generic[_COL_co]): 

1607 __slots__ = ("column",) 

1608 

1609 column: _COL_co 

1610 

1611 def __init__( 

1612 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co 

1613 ) -> None: 

1614 self.column = col 

1615 

1616 # proxy_index being non-empty means it was initialized. 

1617 # so we need to update it 

1618 pi = collection._proxy_index 

1619 if pi: 

1620 for eps_col in col._expanded_proxy_set: 

1621 pi[eps_col].add(self) 

1622 

1623 def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]: 

1624 return self.column._expanded_proxy_set 

1625 

1626 def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None: 

1627 pi = collection._proxy_index 

1628 if not pi: 

1629 return 

1630 for col in self.column._expanded_proxy_set: 

1631 colset = pi.get(col, None) 

1632 if colset: 

1633 colset.discard(self) 

1634 if colset is not None and not colset: 

1635 del pi[col] 

1636 

1637 def embedded( 

1638 self, 

1639 target_set: Union[ 

1640 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]] 

1641 ], 

1642 ) -> bool: 

1643 expanded_proxy_set = self.column._expanded_proxy_set 

1644 for t in target_set.difference(expanded_proxy_set): 

1645 if not expanded_proxy_set.intersection(_expand_cloned([t])): 

1646 return False 

1647 return True 

1648 

1649 

1650class ColumnCollection(Generic[_COLKEY, _COL_co]): 

1651 """Collection of :class:`_expression.ColumnElement` instances, 

1652 typically for 

1653 :class:`_sql.FromClause` objects. 

1654 

1655 The :class:`_sql.ColumnCollection` object is most commonly available 

1656 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection 

1657 on the :class:`_schema.Table` object, introduced at 

1658 :ref:`metadata_tables_and_columns`. 

1659 

1660 The :class:`_expression.ColumnCollection` has both mapping- and sequence- 

1661 like behaviors. A :class:`_expression.ColumnCollection` usually stores 

1662 :class:`_schema.Column` objects, which are then accessible both via mapping 

1663 style access as well as attribute access style. 

1664 

1665 To access :class:`_schema.Column` objects using ordinary attribute-style 

1666 access, specify the name like any other object attribute, such as below 

1667 a column named ``employee_name`` is accessed:: 

1668 

1669 >>> employee_table.c.employee_name 

1670 

1671 To access columns that have names with special characters or spaces, 

1672 index-style access is used, such as below which illustrates a column named 

1673 ``employee ' payment`` is accessed:: 

1674 

1675 >>> employee_table.c["employee ' payment"] 

1676 

1677 As the :class:`_sql.ColumnCollection` object provides a Python dictionary 

1678 interface, common dictionary method names like 

1679 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`, 

1680 and :meth:`_sql.ColumnCollection.items` are available, which means that 

1681 database columns that are keyed under these names also need to use indexed 

1682 access:: 

1683 

1684 >>> employee_table.c["values"] 

1685 

1686 

1687 The name for which a :class:`_schema.Column` would be present is normally 

1688 that of the :paramref:`_schema.Column.key` parameter. In some contexts, 

1689 such as a :class:`_sql.Select` object that uses a label style set 

1690 using the :meth:`_sql.Select.set_label_style` method, a column of a certain 

1691 key may instead be represented under a particular label name such 

1692 as ``tablename_columnname``:: 

1693 

1694 >>> from sqlalchemy import select, column, table 

1695 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL 

1696 >>> t = table("t", column("c")) 

1697 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) 

1698 >>> subq = stmt.subquery() 

1699 >>> subq.c.t_c 

1700 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c> 

1701 

1702 :class:`.ColumnCollection` also indexes the columns in order and allows 

1703 them to be accessible by their integer position:: 

1704 

1705 >>> cc[0] 

1706 Column('x', Integer(), table=None) 

1707 >>> cc[1] 

1708 Column('y', Integer(), table=None) 

1709 

1710 .. versionadded:: 1.4 :class:`_expression.ColumnCollection` 

1711 allows integer-based 

1712 index access to the collection. 

1713 

1714 Iterating the collection yields the column expressions in order:: 

1715 

1716 >>> list(cc) 

1717 [Column('x', Integer(), table=None), 

1718 Column('y', Integer(), table=None)] 

1719 

1720 The base :class:`_expression.ColumnCollection` object can store 

1721 duplicates, which can 

1722 mean either two columns with the same key, in which case the column 

1723 returned by key access is **arbitrary**:: 

1724 

1725 >>> x1, x2 = Column("x", Integer), Column("x", Integer) 

1726 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)]) 

1727 >>> list(cc) 

1728 [Column('x', Integer(), table=None), 

1729 Column('x', Integer(), table=None)] 

1730 >>> cc["x"] is x1 

1731 False 

1732 >>> cc["x"] is x2 

1733 True 

1734 

1735 Or it can also mean the same column multiple times. These cases are 

1736 supported as :class:`_expression.ColumnCollection` 

1737 is used to represent the columns in 

1738 a SELECT statement which may include duplicates. 

1739 

1740 A special subclass :class:`.DedupeColumnCollection` exists which instead 

1741 maintains SQLAlchemy's older behavior of not allowing duplicates; this 

1742 collection is used for schema level objects like :class:`_schema.Table` 

1743 and 

1744 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The 

1745 :class:`.DedupeColumnCollection` class also has additional mutation methods 

1746 as the schema constructs have more use cases that require removal and 

1747 replacement of columns. 

1748 

1749 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection` 

1750 now stores duplicate 

1751 column keys as well as the same column in multiple positions. The 

1752 :class:`.DedupeColumnCollection` class is added to maintain the 

1753 former behavior in those cases where deduplication as well as 

1754 additional replace/remove operations are needed. 

1755 

1756 

1757 """ 

1758 

1759 __slots__ = ("_collection", "_index", "_colset", "_proxy_index") 

1760 

1761 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]] 

1762 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]] 

1763 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]] 

1764 _colset: Set[_COL_co] 

1765 

1766 def __init__( 

1767 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None 

1768 ): 

1769 object.__setattr__(self, "_colset", set()) 

1770 object.__setattr__(self, "_index", {}) 

1771 object.__setattr__( 

1772 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

1773 ) 

1774 object.__setattr__(self, "_collection", []) 

1775 if columns: 

1776 self._initial_populate(columns) 

1777 

1778 @util.preload_module("sqlalchemy.sql.elements") 

1779 def __clause_element__(self) -> ClauseList: 

1780 elements = util.preloaded.sql_elements 

1781 

1782 return elements.ClauseList( 

1783 _literal_as_text_role=roles.ColumnsClauseRole, 

1784 group=False, 

1785 *self._all_columns, 

1786 ) 

1787 

1788 def _initial_populate( 

1789 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1790 ) -> None: 

1791 self._populate_separate_keys(iter_) 

1792 

1793 @property 

1794 def _all_columns(self) -> List[_COL_co]: 

1795 return [col for (_, col, _) in self._collection] 

1796 

1797 def keys(self) -> List[_COLKEY]: 

1798 """Return a sequence of string key names for all columns in this 

1799 collection.""" 

1800 return [k for (k, _, _) in self._collection] 

1801 

1802 def values(self) -> List[_COL_co]: 

1803 """Return a sequence of :class:`_sql.ColumnClause` or 

1804 :class:`_schema.Column` objects for all columns in this 

1805 collection.""" 

1806 return [col for (_, col, _) in self._collection] 

1807 

1808 def items(self) -> List[Tuple[_COLKEY, _COL_co]]: 

1809 """Return a sequence of (key, column) tuples for all columns in this 

1810 collection each consisting of a string key name and a 

1811 :class:`_sql.ColumnClause` or 

1812 :class:`_schema.Column` object. 

1813 """ 

1814 

1815 return [(k, col) for (k, col, _) in self._collection] 

1816 

1817 def __bool__(self) -> bool: 

1818 return bool(self._collection) 

1819 

1820 def __len__(self) -> int: 

1821 return len(self._collection) 

1822 

1823 def __iter__(self) -> Iterator[_COL_co]: 

1824 # turn to a list first to maintain over a course of changes 

1825 return iter([col for _, col, _ in self._collection]) 

1826 

1827 @overload 

1828 def __getitem__(self, key: Union[str, int]) -> _COL_co: ... 

1829 

1830 @overload 

1831 def __getitem__( 

1832 self, key: Tuple[Union[str, int], ...] 

1833 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1834 

1835 @overload 

1836 def __getitem__( 

1837 self, key: slice 

1838 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1839 

1840 def __getitem__( 

1841 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]] 

1842 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]: 

1843 try: 

1844 if isinstance(key, (tuple, slice)): 

1845 if isinstance(key, slice): 

1846 cols = ( 

1847 (sub_key, col) 

1848 for (sub_key, col, _) in self._collection[key] 

1849 ) 

1850 else: 

1851 cols = (self._index[sub_key] for sub_key in key) 

1852 

1853 return ColumnCollection(cols).as_readonly() 

1854 else: 

1855 return self._index[key][1] 

1856 except KeyError as err: 

1857 if isinstance(err.args[0], int): 

1858 raise IndexError(err.args[0]) from err 

1859 else: 

1860 raise 

1861 

1862 def __getattr__(self, key: str) -> _COL_co: 

1863 try: 

1864 return self._index[key][1] 

1865 except KeyError as err: 

1866 raise AttributeError(key) from err 

1867 

1868 def __contains__(self, key: str) -> bool: 

1869 if key not in self._index: 

1870 if not isinstance(key, str): 

1871 raise exc.ArgumentError( 

1872 "__contains__ requires a string argument" 

1873 ) 

1874 return False 

1875 else: 

1876 return True 

1877 

1878 def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool: 

1879 """Compare this :class:`_expression.ColumnCollection` to another 

1880 based on the names of the keys""" 

1881 

1882 for l, r in zip_longest(self, other): 

1883 if l is not r: 

1884 return False 

1885 else: 

1886 return True 

1887 

1888 def __eq__(self, other: Any) -> bool: 

1889 return self.compare(other) 

1890 

1891 @overload 

1892 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ... 

1893 

1894 @overload 

1895 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ... 

1896 

1897 def get( 

1898 self, key: str, default: Optional[_COL] = None 

1899 ) -> Optional[Union[_COL_co, _COL]]: 

1900 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object 

1901 based on a string key name from this 

1902 :class:`_expression.ColumnCollection`.""" 

1903 

1904 if key in self._index: 

1905 return self._index[key][1] 

1906 else: 

1907 return default 

1908 

1909 def __str__(self) -> str: 

1910 return "%s(%s)" % ( 

1911 self.__class__.__name__, 

1912 ", ".join(str(c) for c in self), 

1913 ) 

1914 

1915 def __setitem__(self, key: str, value: Any) -> NoReturn: 

1916 raise NotImplementedError() 

1917 

1918 def __delitem__(self, key: str) -> NoReturn: 

1919 raise NotImplementedError() 

1920 

1921 def __setattr__(self, key: str, obj: Any) -> NoReturn: 

1922 raise NotImplementedError() 

1923 

1924 def clear(self) -> NoReturn: 

1925 """Dictionary clear() is not implemented for 

1926 :class:`_sql.ColumnCollection`.""" 

1927 raise NotImplementedError() 

1928 

1929 def remove(self, column: Any) -> NoReturn: 

1930 raise NotImplementedError() 

1931 

1932 def update(self, iter_: Any) -> NoReturn: 

1933 """Dictionary update() is not implemented for 

1934 :class:`_sql.ColumnCollection`.""" 

1935 raise NotImplementedError() 

1936 

1937 # https://github.com/python/mypy/issues/4266 

1938 __hash__: Optional[int] = None # type: ignore 

1939 

1940 def _populate_separate_keys( 

1941 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1942 ) -> None: 

1943 """populate from an iterator of (key, column)""" 

1944 

1945 self._collection[:] = collection = [ 

1946 (k, c, _ColumnMetrics(self, c)) for k, c in iter_ 

1947 ] 

1948 self._colset.update(c._deannotate() for _, c, _ in collection) 

1949 self._index.update( 

1950 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)} 

1951 ) 

1952 self._index.update({k: (k, col) for k, col, _ in reversed(collection)}) 

1953 

1954 def add( 

1955 self, 

1956 column: ColumnElement[Any], 

1957 key: Optional[_COLKEY] = None, 

1958 ) -> None: 

1959 """Add a column to this :class:`_sql.ColumnCollection`. 

1960 

1961 .. note:: 

1962 

1963 This method is **not normally used by user-facing code**, as the 

1964 :class:`_sql.ColumnCollection` is usually part of an existing 

1965 object such as a :class:`_schema.Table`. To add a 

1966 :class:`_schema.Column` to an existing :class:`_schema.Table` 

1967 object, use the :meth:`_schema.Table.append_column` method. 

1968 

1969 """ 

1970 colkey: _COLKEY 

1971 

1972 if key is None: 

1973 colkey = column.key # type: ignore 

1974 else: 

1975 colkey = key 

1976 

1977 l = len(self._collection) 

1978 

1979 # don't really know how this part is supposed to work w/ the 

1980 # covariant thing 

1981 

1982 _column = cast(_COL_co, column) 

1983 

1984 self._collection.append( 

1985 (colkey, _column, _ColumnMetrics(self, _column)) 

1986 ) 

1987 self._colset.add(_column._deannotate()) 

1988 

1989 self._index[l] = (colkey, _column) 

1990 if colkey not in self._index: 

1991 self._index[colkey] = (colkey, _column) 

1992 

1993 def __getstate__(self) -> Dict[str, Any]: 

1994 return { 

1995 "_collection": [(k, c) for k, c, _ in self._collection], 

1996 "_index": self._index, 

1997 } 

1998 

1999 def __setstate__(self, state: Dict[str, Any]) -> None: 

2000 object.__setattr__(self, "_index", state["_index"]) 

2001 object.__setattr__( 

2002 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

2003 ) 

2004 object.__setattr__( 

2005 self, 

2006 "_collection", 

2007 [ 

2008 (k, c, _ColumnMetrics(self, c)) 

2009 for (k, c) in state["_collection"] 

2010 ], 

2011 ) 

2012 object.__setattr__( 

2013 self, "_colset", {col for k, col, _ in self._collection} 

2014 ) 

2015 

2016 def contains_column(self, col: ColumnElement[Any]) -> bool: 

2017 """Checks if a column object exists in this collection""" 

2018 if col not in self._colset: 

2019 if isinstance(col, str): 

2020 raise exc.ArgumentError( 

2021 "contains_column cannot be used with string arguments. " 

2022 "Use ``col_name in table.c`` instead." 

2023 ) 

2024 return False 

2025 else: 

2026 return True 

2027 

2028 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: 

2029 """Return a "read only" form of this 

2030 :class:`_sql.ColumnCollection`.""" 

2031 

2032 return ReadOnlyColumnCollection(self) 

2033 

2034 def _init_proxy_index(self) -> None: 

2035 """populate the "proxy index", if empty. 

2036 

2037 proxy index is added in 2.0 to provide more efficient operation 

2038 for the corresponding_column() method. 

2039 

2040 For reasons of both time to construct new .c collections as well as 

2041 memory conservation for large numbers of large .c collections, the 

2042 proxy_index is only filled if corresponding_column() is called. once 

2043 filled it stays that way, and new _ColumnMetrics objects created after 

2044 that point will populate it with new data. Note this case would be 

2045 unusual, if not nonexistent, as it means a .c collection is being 

2046 mutated after corresponding_column() were used, however it is tested in 

2047 test/base/test_utils.py. 

2048 

2049 """ 

2050 pi = self._proxy_index 

2051 if pi: 

2052 return 

2053 

2054 for _, _, metrics in self._collection: 

2055 eps = metrics.column._expanded_proxy_set 

2056 

2057 for eps_col in eps: 

2058 pi[eps_col].add(metrics) 

2059 

2060 def corresponding_column( 

2061 self, column: _COL, require_embedded: bool = False 

2062 ) -> Optional[Union[_COL, _COL_co]]: 

2063 """Given a :class:`_expression.ColumnElement`, return the exported 

2064 :class:`_expression.ColumnElement` object from this 

2065 :class:`_expression.ColumnCollection` 

2066 which corresponds to that original :class:`_expression.ColumnElement` 

2067 via a common 

2068 ancestor column. 

2069 

2070 :param column: the target :class:`_expression.ColumnElement` 

2071 to be matched. 

2072 

2073 :param require_embedded: only return corresponding columns for 

2074 the given :class:`_expression.ColumnElement`, if the given 

2075 :class:`_expression.ColumnElement` 

2076 is actually present within a sub-element 

2077 of this :class:`_expression.Selectable`. 

2078 Normally the column will match if 

2079 it merely shares a common ancestor with one of the exported 

2080 columns of this :class:`_expression.Selectable`. 

2081 

2082 .. seealso:: 

2083 

2084 :meth:`_expression.Selectable.corresponding_column` 

2085 - invokes this method 

2086 against the collection returned by 

2087 :attr:`_expression.Selectable.exported_columns`. 

2088 

2089 .. versionchanged:: 1.4 the implementation for ``corresponding_column`` 

2090 was moved onto the :class:`_expression.ColumnCollection` itself. 

2091 

2092 """ 

2093 # TODO: cython candidate 

2094 

2095 # don't dig around if the column is locally present 

2096 if column in self._colset: 

2097 return column 

2098 

2099 selected_intersection, selected_metrics = None, None 

2100 target_set = column.proxy_set 

2101 

2102 pi = self._proxy_index 

2103 if not pi: 

2104 self._init_proxy_index() 

2105 

2106 for current_metrics in ( 

2107 mm for ts in target_set if ts in pi for mm in pi[ts] 

2108 ): 

2109 if not require_embedded or current_metrics.embedded(target_set): 

2110 if selected_metrics is None: 

2111 # no corresponding column yet, pick this one. 

2112 selected_metrics = current_metrics 

2113 continue 

2114 

2115 current_intersection = target_set.intersection( 

2116 current_metrics.column._expanded_proxy_set 

2117 ) 

2118 if selected_intersection is None: 

2119 selected_intersection = target_set.intersection( 

2120 selected_metrics.column._expanded_proxy_set 

2121 ) 

2122 

2123 if len(current_intersection) > len(selected_intersection): 

2124 # 'current' has a larger field of correspondence than 

2125 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x 

2126 # matches a1.c.x->table.c.x better than 

2127 # selectable.c.x->table.c.x does. 

2128 

2129 selected_metrics = current_metrics 

2130 selected_intersection = current_intersection 

2131 elif current_intersection == selected_intersection: 

2132 # they have the same field of correspondence. see 

2133 # which proxy_set has fewer columns in it, which 

2134 # indicates a closer relationship with the root 

2135 # column. Also take into account the "weight" 

2136 # attribute which CompoundSelect() uses to give 

2137 # higher precedence to columns based on vertical 

2138 # position in the compound statement, and discard 

2139 # columns that have no reference to the target 

2140 # column (also occurs with CompoundSelect) 

2141 

2142 selected_col_distance = sum( 

2143 [ 

2144 sc._annotations.get("weight", 1) 

2145 for sc in ( 

2146 selected_metrics.column._uncached_proxy_list() 

2147 ) 

2148 if sc.shares_lineage(column) 

2149 ], 

2150 ) 

2151 current_col_distance = sum( 

2152 [ 

2153 sc._annotations.get("weight", 1) 

2154 for sc in ( 

2155 current_metrics.column._uncached_proxy_list() 

2156 ) 

2157 if sc.shares_lineage(column) 

2158 ], 

2159 ) 

2160 if current_col_distance < selected_col_distance: 

2161 selected_metrics = current_metrics 

2162 selected_intersection = current_intersection 

2163 

2164 return selected_metrics.column if selected_metrics else None 

2165 

2166 

2167_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]") 

2168 

2169 

2170class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]): 

2171 """A :class:`_expression.ColumnCollection` 

2172 that maintains deduplicating behavior. 

2173 

2174 This is useful by schema level objects such as :class:`_schema.Table` and 

2175 :class:`.PrimaryKeyConstraint`. The collection includes more 

2176 sophisticated mutator methods as well to suit schema objects which 

2177 require mutable column collections. 

2178 

2179 .. versionadded:: 1.4 

2180 

2181 """ 

2182 

2183 def add( # type: ignore[override] 

2184 self, 

2185 column: _NAMEDCOL, 

2186 key: Optional[str] = None, 

2187 *, 

2188 index: Optional[int] = None, 

2189 ) -> None: 

2190 if key is not None and column.key != key: 

2191 raise exc.ArgumentError( 

2192 "DedupeColumnCollection requires columns be under " 

2193 "the same key as their .key" 

2194 ) 

2195 key = column.key 

2196 

2197 if key is None: 

2198 raise exc.ArgumentError( 

2199 "Can't add unnamed column to column collection" 

2200 ) 

2201 

2202 if key in self._index: 

2203 existing = self._index[key][1] 

2204 

2205 if existing is column: 

2206 return 

2207 

2208 self.replace(column, index=index) 

2209 

2210 # pop out memoized proxy_set as this 

2211 # operation may very well be occurring 

2212 # in a _make_proxy operation 

2213 util.memoized_property.reset(column, "proxy_set") 

2214 else: 

2215 self._append_new_column(key, column, index=index) 

2216 

2217 def _append_new_column( 

2218 self, key: str, named_column: _NAMEDCOL, *, index: Optional[int] = None 

2219 ) -> None: 

2220 collection_length = len(self._collection) 

2221 

2222 if index is None: 

2223 l = collection_length 

2224 else: 

2225 if index < 0: 

2226 index = max(0, collection_length + index) 

2227 l = index 

2228 

2229 if index is None: 

2230 self._collection.append( 

2231 (key, named_column, _ColumnMetrics(self, named_column)) 

2232 ) 

2233 else: 

2234 self._collection.insert( 

2235 index, (key, named_column, _ColumnMetrics(self, named_column)) 

2236 ) 

2237 

2238 self._colset.add(named_column._deannotate()) 

2239 

2240 if index is not None: 

2241 for idx in reversed(range(index, collection_length)): 

2242 self._index[idx + 1] = self._index[idx] 

2243 

2244 self._index[l] = (key, named_column) 

2245 self._index[key] = (key, named_column) 

2246 

2247 def _populate_separate_keys( 

2248 self, iter_: Iterable[Tuple[str, _NAMEDCOL]] 

2249 ) -> None: 

2250 """populate from an iterator of (key, column)""" 

2251 cols = list(iter_) 

2252 

2253 replace_col = [] 

2254 for k, col in cols: 

2255 if col.key != k: 

2256 raise exc.ArgumentError( 

2257 "DedupeColumnCollection requires columns be under " 

2258 "the same key as their .key" 

2259 ) 

2260 if col.name in self._index and col.key != col.name: 

2261 replace_col.append(col) 

2262 elif col.key in self._index: 

2263 replace_col.append(col) 

2264 else: 

2265 self._index[k] = (k, col) 

2266 self._collection.append((k, col, _ColumnMetrics(self, col))) 

2267 self._colset.update(c._deannotate() for (k, c, _) in self._collection) 

2268 

2269 self._index.update( 

2270 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection) 

2271 ) 

2272 for col in replace_col: 

2273 self.replace(col) 

2274 

2275 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None: 

2276 self._populate_separate_keys((col.key, col) for col in iter_) 

2277 

2278 def remove(self, column: _NAMEDCOL) -> None: # type: ignore[override] 

2279 if column not in self._colset: 

2280 raise ValueError( 

2281 "Can't remove column %r; column is not in this collection" 

2282 % column 

2283 ) 

2284 del self._index[column.key] 

2285 self._colset.remove(column) 

2286 self._collection[:] = [ 

2287 (k, c, metrics) 

2288 for (k, c, metrics) in self._collection 

2289 if c is not column 

2290 ] 

2291 for metrics in self._proxy_index.get(column, ()): 

2292 metrics.dispose(self) 

2293 

2294 self._index.update( 

2295 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2296 ) 

2297 # delete higher index 

2298 del self._index[len(self._collection)] 

2299 

2300 def replace( 

2301 self, 

2302 column: _NAMEDCOL, 

2303 *, 

2304 extra_remove: Optional[Iterable[_NAMEDCOL]] = None, 

2305 index: Optional[int] = None, 

2306 ) -> None: 

2307 """add the given column to this collection, removing unaliased 

2308 versions of this column as well as existing columns with the 

2309 same key. 

2310 

2311 e.g.:: 

2312 

2313 t = Table("sometable", metadata, Column("col1", Integer)) 

2314 t.columns.replace(Column("col1", Integer, key="columnone")) 

2315 

2316 will remove the original 'col1' from the collection, and add 

2317 the new column under the name 'columnname'. 

2318 

2319 Used by schema.Column to override columns during table reflection. 

2320 

2321 """ 

2322 

2323 if extra_remove: 

2324 remove_col = set(extra_remove) 

2325 else: 

2326 remove_col = set() 

2327 # remove up to two columns based on matches of name as well as key 

2328 if column.name in self._index and column.key != column.name: 

2329 other = self._index[column.name][1] 

2330 if other.name == other.key: 

2331 remove_col.add(other) 

2332 

2333 if column.key in self._index: 

2334 remove_col.add(self._index[column.key][1]) 

2335 

2336 if not remove_col: 

2337 self._append_new_column(column.key, column, index=index) 

2338 return 

2339 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = [] 

2340 replace_index = None 

2341 

2342 for idx, (k, col, metrics) in enumerate(self._collection): 

2343 if col in remove_col: 

2344 if replace_index is None: 

2345 replace_index = idx 

2346 new_cols.append( 

2347 (column.key, column, _ColumnMetrics(self, column)) 

2348 ) 

2349 else: 

2350 new_cols.append((k, col, metrics)) 

2351 

2352 if remove_col: 

2353 self._colset.difference_update(remove_col) 

2354 

2355 for rc in remove_col: 

2356 for metrics in self._proxy_index.get(rc, ()): 

2357 metrics.dispose(self) 

2358 

2359 if replace_index is None: 

2360 if index is not None: 

2361 new_cols.insert( 

2362 index, (column.key, column, _ColumnMetrics(self, column)) 

2363 ) 

2364 

2365 else: 

2366 new_cols.append( 

2367 (column.key, column, _ColumnMetrics(self, column)) 

2368 ) 

2369 elif index is not None: 

2370 to_move = new_cols[replace_index] 

2371 effective_positive_index = ( 

2372 index if index >= 0 else max(0, len(new_cols) + index) 

2373 ) 

2374 new_cols.insert(index, to_move) 

2375 if replace_index > effective_positive_index: 

2376 del new_cols[replace_index + 1] 

2377 else: 

2378 del new_cols[replace_index] 

2379 

2380 self._colset.add(column._deannotate()) 

2381 self._collection[:] = new_cols 

2382 

2383 self._index.clear() 

2384 

2385 self._index.update( 

2386 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2387 ) 

2388 self._index.update({k: (k, col) for (k, col, _) in self._collection}) 

2389 

2390 

2391class ReadOnlyColumnCollection( 

2392 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co] 

2393): 

2394 __slots__ = ("_parent",) 

2395 

2396 def __init__(self, collection: ColumnCollection[_COLKEY, _COL_co]): 

2397 object.__setattr__(self, "_parent", collection) 

2398 object.__setattr__(self, "_colset", collection._colset) 

2399 object.__setattr__(self, "_index", collection._index) 

2400 object.__setattr__(self, "_collection", collection._collection) 

2401 object.__setattr__(self, "_proxy_index", collection._proxy_index) 

2402 

2403 def __getstate__(self) -> Dict[str, _COL_co]: 

2404 return {"_parent": self._parent} 

2405 

2406 def __setstate__(self, state: Dict[str, Any]) -> None: 

2407 parent = state["_parent"] 

2408 self.__init__(parent) # type: ignore 

2409 

2410 def add(self, column: Any, key: Any = ...) -> Any: 

2411 self._readonly() 

2412 

2413 def extend(self, elements: Any) -> NoReturn: 

2414 self._readonly() 

2415 

2416 def remove(self, item: Any) -> NoReturn: 

2417 self._readonly() 

2418 

2419 

2420class ColumnSet(util.OrderedSet["ColumnClause[Any]"]): 

2421 def contains_column(self, col: ColumnClause[Any]) -> bool: 

2422 return col in self 

2423 

2424 def extend(self, cols: Iterable[Any]) -> None: 

2425 for col in cols: 

2426 self.add(col) 

2427 

2428 def __eq__(self, other): 

2429 l = [] 

2430 for c in other: 

2431 for local in self: 

2432 if c.shares_lineage(local): 

2433 l.append(c == local) 

2434 return elements.and_(*l) 

2435 

2436 def __hash__(self) -> int: # type: ignore[override] 

2437 return hash(tuple(x for x in self)) 

2438 

2439 

2440def _entity_namespace( 

2441 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2442) -> _EntityNamespace: 

2443 """Return the nearest .entity_namespace for the given entity. 

2444 

2445 If not immediately available, does an iterate to find a sub-element 

2446 that has one, if any. 

2447 

2448 """ 

2449 try: 

2450 return cast(_HasEntityNamespace, entity).entity_namespace 

2451 except AttributeError: 

2452 for elem in visitors.iterate(cast(ExternallyTraversible, entity)): 

2453 if _is_has_entity_namespace(elem): 

2454 return elem.entity_namespace 

2455 else: 

2456 raise 

2457 

2458 

2459@overload 

2460def _entity_namespace_key( 

2461 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2462 key: str, 

2463) -> SQLCoreOperations[Any]: ... 

2464 

2465 

2466@overload 

2467def _entity_namespace_key( 

2468 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2469 key: str, 

2470 default: _NoArg, 

2471) -> SQLCoreOperations[Any]: ... 

2472 

2473 

2474@overload 

2475def _entity_namespace_key( 

2476 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2477 key: str, 

2478 default: _T, 

2479) -> Union[SQLCoreOperations[Any], _T]: ... 

2480 

2481 

2482def _entity_namespace_key( 

2483 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2484 key: str, 

2485 default: Union[SQLCoreOperations[Any], _T, _NoArg] = NO_ARG, 

2486) -> Union[SQLCoreOperations[Any], _T]: 

2487 """Return an entry from an entity_namespace. 

2488 

2489 

2490 Raises :class:`_exc.InvalidRequestError` rather than attribute error 

2491 on not found. 

2492 

2493 """ 

2494 

2495 try: 

2496 ns = _entity_namespace(entity) 

2497 if default is not NO_ARG: 

2498 return getattr(ns, key, default) 

2499 else: 

2500 return getattr(ns, key) # type: ignore 

2501 except AttributeError as err: 

2502 raise exc.InvalidRequestError( 

2503 'Entity namespace for "%s" has no property "%s"' % (entity, key) 

2504 ) from err