Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/base.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

954 statements  

1# sql/base.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Foundational utilities common to many sql modules.""" 

10 

11 

12from __future__ import annotations 

13 

14import collections 

15from enum import Enum 

16import itertools 

17from itertools import zip_longest 

18import operator 

19import re 

20from typing import Any 

21from typing import Callable 

22from typing import cast 

23from typing import Collection 

24from typing import Dict 

25from typing import Final 

26from typing import FrozenSet 

27from typing import Generator 

28from typing import Generic 

29from typing import Iterable 

30from typing import Iterator 

31from typing import List 

32from typing import Mapping 

33from typing import MutableMapping 

34from typing import NamedTuple 

35from typing import NoReturn 

36from typing import Optional 

37from typing import overload 

38from typing import Protocol 

39from typing import Sequence 

40from typing import Set 

41from typing import Tuple 

42from typing import Type 

43from typing import TYPE_CHECKING 

44from typing import TypeGuard 

45from typing import TypeVar 

46from typing import Union 

47 

48from . import roles 

49from . import visitors 

50from .cache_key import HasCacheKey # noqa 

51from .cache_key import MemoizedHasCacheKey # noqa 

52from .traversals import HasCopyInternals # noqa 

53from .visitors import ClauseVisitor 

54from .visitors import ExtendedInternalTraversal 

55from .visitors import ExternallyTraversible 

56from .visitors import InternalTraversal 

57from .. import event 

58from .. import exc 

59from .. import util 

60from ..util import EMPTY_DICT 

61from ..util import HasMemoized as HasMemoized 

62from ..util import hybridmethod 

63from ..util.typing import Self 

64from ..util.typing import TypeVarTuple 

65from ..util.typing import Unpack 

66 

67if TYPE_CHECKING: 

68 from . import coercions 

69 from . import elements 

70 from . import type_api 

71 from ._orm_types import DMLStrategyArgument 

72 from ._orm_types import SynchronizeSessionArgument 

73 from ._typing import _CLE 

74 from .cache_key import CacheKey 

75 from .compiler import SQLCompiler 

76 from .dml import Delete 

77 from .dml import Insert 

78 from .dml import Update 

79 from .elements import BindParameter 

80 from .elements import ClauseElement 

81 from .elements import ClauseList 

82 from .elements import ColumnClause # noqa 

83 from .elements import ColumnElement 

84 from .elements import NamedColumn 

85 from .elements import SQLCoreOperations 

86 from .elements import TextClause 

87 from .schema import Column 

88 from .schema import DefaultGenerator 

89 from .selectable import _JoinTargetElement 

90 from .selectable import _SelectIterable 

91 from .selectable import FromClause 

92 from .selectable import Select 

93 from .visitors import anon_map 

94 from ..engine import Connection 

95 from ..engine import CursorResult 

96 from ..engine.interfaces import _CoreMultiExecuteParams 

97 from ..engine.interfaces import _CoreSingleExecuteParams 

98 from ..engine.interfaces import _ExecuteOptions 

99 from ..engine.interfaces import _ImmutableExecuteOptions 

100 from ..engine.interfaces import CacheStats 

101 from ..engine.interfaces import Compiled 

102 from ..engine.interfaces import CompiledCacheType 

103 from ..engine.interfaces import CoreExecuteOptionsParameter 

104 from ..engine.interfaces import Dialect 

105 from ..engine.interfaces import IsolationLevel 

106 from ..engine.interfaces import SchemaTranslateMapType 

107 from ..event import dispatcher 

108 

109if not TYPE_CHECKING: 

110 coercions = None # noqa 

111 elements = None # noqa 

112 type_api = None # noqa 

113 

114 

115_Ts = TypeVarTuple("_Ts") 

116 

117 

118class _NoArg(Enum): 

119 NO_ARG = 0 

120 

121 def __repr__(self): 

122 return f"_NoArg.{self.name}" 

123 

124 

125NO_ARG: Final = _NoArg.NO_ARG 

126 

127 

128class _NoneName(Enum): 

129 NONE_NAME = 0 

130 """indicate a 'deferred' name that was ultimately the value None.""" 

131 

132 

133_NONE_NAME: Final = _NoneName.NONE_NAME 

134 

135_T = TypeVar("_T", bound=Any) 

136 

137_Fn = TypeVar("_Fn", bound=Callable[..., Any]) 

138 

139_AmbiguousTableNameMap = MutableMapping[str, str] 

140 

141 

142class _DefaultDescriptionTuple(NamedTuple): 

143 arg: Any 

144 is_scalar: Optional[bool] 

145 is_callable: Optional[bool] 

146 is_sentinel: Optional[bool] 

147 

148 @classmethod 

149 def _from_column_default( 

150 cls, default: Optional[DefaultGenerator] 

151 ) -> _DefaultDescriptionTuple: 

152 return ( 

153 _DefaultDescriptionTuple( 

154 default.arg, # type: ignore 

155 default.is_scalar, 

156 default.is_callable, 

157 default.is_sentinel, 

158 ) 

159 if default 

160 and ( 

161 default.has_arg 

162 or (not default.for_update and default.is_sentinel) 

163 ) 

164 else _DefaultDescriptionTuple(None, None, None, None) 

165 ) 

166 

167 

168_never_select_column: operator.attrgetter[Any] = operator.attrgetter( 

169 "_omit_from_statements" 

170) 

171 

172 

173class _EntityNamespace(Protocol): 

174 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ... 

175 

176 

177class _HasEntityNamespace(Protocol): 

178 @util.ro_non_memoized_property 

179 def entity_namespace(self) -> _EntityNamespace: ... 

180 

181 

182def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]: 

183 return hasattr(element, "entity_namespace") 

184 

185 

186# Remove when https://github.com/python/mypy/issues/14640 will be fixed 

187_Self = TypeVar("_Self", bound=Any) 

188 

189 

190class Immutable: 

191 """mark a ClauseElement as 'immutable' when expressions are cloned. 

192 

193 "immutable" objects refers to the "mutability" of an object in the 

194 context of SQL DQL and DML generation. Such as, in DQL, one can 

195 compose a SELECT or subquery of varied forms, but one cannot modify 

196 the structure of a specific table or column within DQL. 

197 :class:`.Immutable` is mostly intended to follow this concept, and as 

198 such the primary "immutable" objects are :class:`.ColumnClause`, 

199 :class:`.Column`, :class:`.TableClause`, :class:`.Table`. 

200 

201 """ 

202 

203 __slots__ = () 

204 

205 _is_immutable: bool = True 

206 

207 def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn: 

208 raise NotImplementedError("Immutable objects do not support copying") 

209 

210 def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn: 

211 raise NotImplementedError("Immutable objects do not support copying") 

212 

213 def _clone(self: _Self, **kw: Any) -> _Self: 

214 return self 

215 

216 def _copy_internals( 

217 self, *, omit_attrs: Iterable[str] = (), **kw: Any 

218 ) -> None: 

219 pass 

220 

221 

222class SingletonConstant(Immutable): 

223 """Represent SQL constants like NULL, TRUE, FALSE""" 

224 

225 _is_singleton_constant: bool = True 

226 

227 _singleton: SingletonConstant 

228 

229 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T: 

230 return cast(_T, cls._singleton) 

231 

232 @util.non_memoized_property 

233 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]: 

234 raise NotImplementedError() 

235 

236 @classmethod 

237 def _create_singleton(cls) -> None: 

238 obj = object.__new__(cls) 

239 obj.__init__() # type: ignore 

240 

241 # for a long time this was an empty frozenset, meaning 

242 # a SingletonConstant would never be a "corresponding column" in 

243 # a statement. This referred to #6259. However, in #7154 we see 

244 # that we do in fact need "correspondence" to work when matching cols 

245 # in result sets, so the non-correspondence was moved to a more 

246 # specific level when we are actually adapting expressions for SQL 

247 # render only. 

248 obj.proxy_set = frozenset([obj]) 

249 cls._singleton = obj 

250 

251 

252def _from_objects( 

253 *elements: Union[ 

254 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement 

255 ] 

256) -> Iterator[FromClause]: 

257 return itertools.chain.from_iterable( 

258 [element._from_objects for element in elements] 

259 ) 

260 

261 

262def _select_iterables( 

263 elements: Iterable[roles.ColumnsClauseRole], 

264) -> _SelectIterable: 

265 """expand tables into individual columns in the 

266 given list of column expressions. 

267 

268 """ 

269 return itertools.chain.from_iterable( 

270 [c._select_iterable for c in elements] 

271 ) 

272 

273 

274_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType") 

275 

276 

277class _GenerativeType(Protocol): 

278 def _generate(self) -> Self: ... 

279 

280 

281def _generative(fn: _Fn) -> _Fn: 

282 """non-caching _generative() decorator. 

283 

284 This is basically the legacy decorator that copies the object and 

285 runs a method on the new copy. 

286 

287 """ 

288 

289 @util.decorator 

290 def _generative( 

291 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any 

292 ) -> _SelfGenerativeType: 

293 """Mark a method as generative.""" 

294 

295 self = self._generate() 

296 x = fn(self, *args, **kw) 

297 assert x is self, "generative methods must return self" 

298 return self 

299 

300 decorated = _generative(fn) 

301 decorated.non_generative = fn # type: ignore 

302 return decorated 

303 

304 

305def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]: 

306 msgs: Dict[str, str] = kw.pop("msgs", {}) 

307 

308 defaults: Dict[str, str] = kw.pop("defaults", {}) 

309 

310 getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [ 

311 (name, operator.attrgetter(name), defaults.get(name, None)) 

312 for name in names 

313 ] 

314 

315 @util.decorator 

316 def check(fn: _Fn, *args: Any, **kw: Any) -> Any: 

317 # make pylance happy by not including "self" in the argument 

318 # list 

319 self = args[0] 

320 args = args[1:] 

321 for name, getter, default_ in getters: 

322 if getter(self) is not default_: 

323 msg = msgs.get( 

324 name, 

325 "Method %s() has already been invoked on this %s construct" 

326 % (fn.__name__, self.__class__), 

327 ) 

328 raise exc.InvalidRequestError(msg) 

329 return fn(self, *args, **kw) 

330 

331 return check 

332 

333 

334def _clone(element, **kw): 

335 return element._clone(**kw) 

336 

337 

338def _expand_cloned( 

339 elements: Iterable[_CLE], 

340) -> Iterable[_CLE]: 

341 """expand the given set of ClauseElements to be the set of all 'cloned' 

342 predecessors. 

343 

344 """ 

345 # TODO: cython candidate 

346 return itertools.chain(*[x._cloned_set for x in elements]) 

347 

348 

349def _de_clone( 

350 elements: Iterable[_CLE], 

351) -> Iterable[_CLE]: 

352 for x in elements: 

353 while x._is_clone_of is not None: 

354 x = x._is_clone_of 

355 yield x 

356 

357 

358def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

359 """return the intersection of sets a and b, counting 

360 any overlap between 'cloned' predecessors. 

361 

362 The returned set is in terms of the entities present within 'a'. 

363 

364 """ 

365 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection( 

366 _expand_cloned(b) 

367 ) 

368 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)} 

369 

370 

371def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

372 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection( 

373 _expand_cloned(b) 

374 ) 

375 return { 

376 elem for elem in a if not all_overlap.intersection(elem._cloned_set) 

377 } 

378 

379 

380class _DialectArgView(MutableMapping[str, Any]): 

381 """A dictionary view of dialect-level arguments in the form 

382 <dialectname>_<argument_name>. 

383 

384 """ 

385 

386 __slots__ = ("obj",) 

387 

388 def __init__(self, obj: DialectKWArgs) -> None: 

389 self.obj = obj 

390 

391 def _key(self, key: str) -> Tuple[str, str]: 

392 try: 

393 dialect, value_key = key.split("_", 1) 

394 except ValueError as err: 

395 raise KeyError(key) from err 

396 else: 

397 return dialect, value_key 

398 

399 def __getitem__(self, key: str) -> Any: 

400 dialect, value_key = self._key(key) 

401 

402 try: 

403 opt = self.obj.dialect_options[dialect] 

404 except exc.NoSuchModuleError as err: 

405 raise KeyError(key) from err 

406 else: 

407 return opt[value_key] 

408 

409 def __setitem__(self, key: str, value: Any) -> None: 

410 try: 

411 dialect, value_key = self._key(key) 

412 except KeyError as err: 

413 raise exc.ArgumentError( 

414 "Keys must be of the form <dialectname>_<argname>" 

415 ) from err 

416 else: 

417 self.obj.dialect_options[dialect][value_key] = value 

418 

419 def __delitem__(self, key: str) -> None: 

420 dialect, value_key = self._key(key) 

421 del self.obj.dialect_options[dialect][value_key] 

422 

423 def __len__(self) -> int: 

424 return sum( 

425 len(args._non_defaults) 

426 for args in self.obj.dialect_options.values() 

427 ) 

428 

429 def __iter__(self) -> Generator[str, None, None]: 

430 return ( 

431 "%s_%s" % (dialect_name, value_name) 

432 for dialect_name in self.obj.dialect_options 

433 for value_name in self.obj.dialect_options[ 

434 dialect_name 

435 ]._non_defaults 

436 ) 

437 

438 

439class _DialectArgDict(MutableMapping[str, Any]): 

440 """A dictionary view of dialect-level arguments for a specific 

441 dialect. 

442 

443 Maintains a separate collection of user-specified arguments 

444 and dialect-specified default arguments. 

445 

446 """ 

447 

448 def __init__(self) -> None: 

449 self._non_defaults: Dict[str, Any] = {} 

450 self._defaults: Dict[str, Any] = {} 

451 

452 def __len__(self) -> int: 

453 return len(set(self._non_defaults).union(self._defaults)) 

454 

455 def __iter__(self) -> Iterator[str]: 

456 return iter(set(self._non_defaults).union(self._defaults)) 

457 

458 def __getitem__(self, key: str) -> Any: 

459 if key in self._non_defaults: 

460 return self._non_defaults[key] 

461 else: 

462 return self._defaults[key] 

463 

464 def __setitem__(self, key: str, value: Any) -> None: 

465 self._non_defaults[key] = value 

466 

467 def __delitem__(self, key: str) -> None: 

468 del self._non_defaults[key] 

469 

470 

471@util.preload_module("sqlalchemy.dialects") 

472def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]: 

473 dialect_cls = util.preloaded.dialects.registry.load(dialect_name) 

474 if dialect_cls.construct_arguments is None: 

475 return None 

476 return dict(dialect_cls.construct_arguments) 

477 

478 

479class DialectKWArgs: 

480 """Establish the ability for a class to have dialect-specific arguments 

481 with defaults and constructor validation. 

482 

483 The :class:`.DialectKWArgs` interacts with the 

484 :attr:`.DefaultDialect.construct_arguments` present on a dialect. 

485 

486 .. seealso:: 

487 

488 :attr:`.DefaultDialect.construct_arguments` 

489 

490 """ 

491 

492 __slots__ = () 

493 

494 _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [ 

495 ("dialect_options", InternalTraversal.dp_dialect_options) 

496 ] 

497 

498 @classmethod 

499 def argument_for( 

500 cls, dialect_name: str, argument_name: str, default: Any 

501 ) -> None: 

502 """Add a new kind of dialect-specific keyword argument for this class. 

503 

504 E.g.:: 

505 

506 Index.argument_for("mydialect", "length", None) 

507 

508 some_index = Index("a", "b", mydialect_length=5) 

509 

510 The :meth:`.DialectKWArgs.argument_for` method is a per-argument 

511 way adding extra arguments to the 

512 :attr:`.DefaultDialect.construct_arguments` dictionary. This 

513 dictionary provides a list of argument names accepted by various 

514 schema-level constructs on behalf of a dialect. 

515 

516 New dialects should typically specify this dictionary all at once as a 

517 data member of the dialect class. The use case for ad-hoc addition of 

518 argument names is typically for end-user code that is also using 

519 a custom compilation scheme which consumes the additional arguments. 

520 

521 :param dialect_name: name of a dialect. The dialect must be 

522 locatable, else a :class:`.NoSuchModuleError` is raised. The 

523 dialect must also include an existing 

524 :attr:`.DefaultDialect.construct_arguments` collection, indicating 

525 that it participates in the keyword-argument validation and default 

526 system, else :class:`.ArgumentError` is raised. If the dialect does 

527 not include this collection, then any keyword argument can be 

528 specified on behalf of this dialect already. All dialects packaged 

529 within SQLAlchemy include this collection, however for third party 

530 dialects, support may vary. 

531 

532 :param argument_name: name of the parameter. 

533 

534 :param default: default value of the parameter. 

535 

536 """ 

537 

538 construct_arg_dictionary: Optional[Dict[Any, Any]] = ( 

539 DialectKWArgs._kw_registry[dialect_name] 

540 ) 

541 if construct_arg_dictionary is None: 

542 raise exc.ArgumentError( 

543 "Dialect '%s' does have keyword-argument " 

544 "validation and defaults enabled configured" % dialect_name 

545 ) 

546 if cls not in construct_arg_dictionary: 

547 construct_arg_dictionary[cls] = {} 

548 construct_arg_dictionary[cls][argument_name] = default 

549 

550 @property 

551 def dialect_kwargs(self) -> _DialectArgView: 

552 """A collection of keyword arguments specified as dialect-specific 

553 options to this construct. 

554 

555 The arguments are present here in their original ``<dialect>_<kwarg>`` 

556 format. Only arguments that were actually passed are included; 

557 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which 

558 contains all options known by this dialect including defaults. 

559 

560 The collection is also writable; keys are accepted of the 

561 form ``<dialect>_<kwarg>`` where the value will be assembled 

562 into the list of options. 

563 

564 .. seealso:: 

565 

566 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form 

567 

568 """ 

569 return _DialectArgView(self) 

570 

571 @property 

572 def kwargs(self) -> _DialectArgView: 

573 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`.""" 

574 return self.dialect_kwargs 

575 

576 _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = ( 

577 util.PopulateDict(_kw_reg_for_dialect) 

578 ) 

579 

580 @classmethod 

581 def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict: 

582 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name] 

583 d = _DialectArgDict() 

584 

585 if construct_arg_dictionary is None: 

586 d._defaults.update({"*": None}) 

587 else: 

588 for cls in reversed(cls.__mro__): 

589 if cls in construct_arg_dictionary: 

590 d._defaults.update(construct_arg_dictionary[cls]) 

591 return d 

592 

593 @util.memoized_property 

594 def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]: 

595 """A collection of keyword arguments specified as dialect-specific 

596 options to this construct. 

597 

598 This is a two-level nested registry, keyed to ``<dialect_name>`` 

599 and ``<argument_name>``. For example, the ``postgresql_where`` 

600 argument would be locatable as:: 

601 

602 arg = my_object.dialect_options["postgresql"]["where"] 

603 

604 .. versionadded:: 0.9.2 

605 

606 .. seealso:: 

607 

608 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form 

609 

610 """ 

611 

612 return util.PopulateDict(self._kw_reg_for_dialect_cls) 

613 

614 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None: 

615 # validate remaining kwargs that they all specify DB prefixes 

616 

617 if not kwargs: 

618 return 

619 

620 for k in kwargs: 

621 m = re.match("^(.+?)_(.+)$", k) 

622 if not m: 

623 raise TypeError( 

624 "Additional arguments should be " 

625 "named <dialectname>_<argument>, got '%s'" % k 

626 ) 

627 dialect_name, arg_name = m.group(1, 2) 

628 

629 try: 

630 construct_arg_dictionary = self.dialect_options[dialect_name] 

631 except exc.NoSuchModuleError: 

632 util.warn( 

633 "Can't validate argument %r; can't " 

634 "locate any SQLAlchemy dialect named %r" 

635 % (k, dialect_name) 

636 ) 

637 self.dialect_options[dialect_name] = d = _DialectArgDict() 

638 d._defaults.update({"*": None}) 

639 d._non_defaults[arg_name] = kwargs[k] 

640 else: 

641 if ( 

642 "*" not in construct_arg_dictionary 

643 and arg_name not in construct_arg_dictionary 

644 ): 

645 raise exc.ArgumentError( 

646 "Argument %r is not accepted by " 

647 "dialect %r on behalf of %r" 

648 % (k, dialect_name, self.__class__) 

649 ) 

650 else: 

651 construct_arg_dictionary[arg_name] = kwargs[k] 

652 

653 

654class CompileState: 

655 """Produces additional object state necessary for a statement to be 

656 compiled. 

657 

658 the :class:`.CompileState` class is at the base of classes that assemble 

659 state for a particular statement object that is then used by the 

660 compiler. This process is essentially an extension of the process that 

661 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis 

662 on converting raw user intent into more organized structures rather than 

663 producing string output. The top-level :class:`.CompileState` for the 

664 statement being executed is also accessible when the execution context 

665 works with invoking the statement and collecting results. 

666 

667 The production of :class:`.CompileState` is specific to the compiler, such 

668 as within the :meth:`.SQLCompiler.visit_insert`, 

669 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also 

670 responsible for associating the :class:`.CompileState` with the 

671 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement, 

672 i.e. the outermost SQL statement that's actually being executed. 

673 There can be other :class:`.CompileState` objects that are not the 

674 toplevel, such as when a SELECT subquery or CTE-nested 

675 INSERT/UPDATE/DELETE is generated. 

676 

677 .. versionadded:: 1.4 

678 

679 """ 

680 

681 __slots__ = ("statement", "_ambiguous_table_name_map") 

682 

683 plugins: Dict[Tuple[str, str], Type[CompileState]] = {} 

684 

685 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] 

686 

687 @classmethod 

688 def create_for_statement( 

689 cls, statement: Executable, compiler: SQLCompiler, **kw: Any 

690 ) -> CompileState: 

691 # factory construction. 

692 

693 if statement._propagate_attrs: 

694 plugin_name = statement._propagate_attrs.get( 

695 "compile_state_plugin", "default" 

696 ) 

697 klass = cls.plugins.get( 

698 (plugin_name, statement._effective_plugin_target), None 

699 ) 

700 if klass is None: 

701 klass = cls.plugins[ 

702 ("default", statement._effective_plugin_target) 

703 ] 

704 

705 else: 

706 klass = cls.plugins[ 

707 ("default", statement._effective_plugin_target) 

708 ] 

709 

710 if klass is cls: 

711 return cls(statement, compiler, **kw) 

712 else: 

713 return klass.create_for_statement(statement, compiler, **kw) 

714 

715 def __init__(self, statement, compiler, **kw): 

716 self.statement = statement 

717 

718 @classmethod 

719 def get_plugin_class( 

720 cls, statement: Executable 

721 ) -> Optional[Type[CompileState]]: 

722 plugin_name = statement._propagate_attrs.get( 

723 "compile_state_plugin", None 

724 ) 

725 

726 if plugin_name: 

727 key = (plugin_name, statement._effective_plugin_target) 

728 if key in cls.plugins: 

729 return cls.plugins[key] 

730 

731 # there's no case where we call upon get_plugin_class() and want 

732 # to get None back, there should always be a default. return that 

733 # if there was no plugin-specific class (e.g. Insert with "orm" 

734 # plugin) 

735 try: 

736 return cls.plugins[("default", statement._effective_plugin_target)] 

737 except KeyError: 

738 return None 

739 

740 @classmethod 

741 def _get_plugin_class_for_plugin( 

742 cls, statement: Executable, plugin_name: str 

743 ) -> Optional[Type[CompileState]]: 

744 try: 

745 return cls.plugins[ 

746 (plugin_name, statement._effective_plugin_target) 

747 ] 

748 except KeyError: 

749 return None 

750 

751 @classmethod 

752 def plugin_for( 

753 cls, plugin_name: str, visit_name: str 

754 ) -> Callable[[_Fn], _Fn]: 

755 def decorate(cls_to_decorate): 

756 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate 

757 return cls_to_decorate 

758 

759 return decorate 

760 

761 

762class Generative(HasMemoized): 

763 """Provide a method-chaining pattern in conjunction with the 

764 @_generative decorator.""" 

765 

766 def _generate(self) -> Self: 

767 skip = self._memoized_keys 

768 cls = self.__class__ 

769 s = cls.__new__(cls) 

770 if skip: 

771 # ensure this iteration remains atomic 

772 s.__dict__ = { 

773 k: v for k, v in self.__dict__.copy().items() if k not in skip 

774 } 

775 else: 

776 s.__dict__ = self.__dict__.copy() 

777 return s 

778 

779 

780class InPlaceGenerative(HasMemoized): 

781 """Provide a method-chaining pattern in conjunction with the 

782 @_generative decorator that mutates in place.""" 

783 

784 __slots__ = () 

785 

786 def _generate(self) -> Self: 

787 skip = self._memoized_keys 

788 # note __dict__ needs to be in __slots__ if this is used 

789 for k in skip: 

790 self.__dict__.pop(k, None) 

791 return self 

792 

793 

794class HasCompileState(Generative): 

795 """A class that has a :class:`.CompileState` associated with it.""" 

796 

797 _compile_state_plugin: Optional[Type[CompileState]] = None 

798 

799 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT 

800 

801 _compile_state_factory = CompileState.create_for_statement 

802 

803 

804class _MetaOptions(type): 

805 """metaclass for the Options class. 

806 

807 This metaclass is actually necessary despite the availability of the 

808 ``__init_subclass__()`` hook as this type also provides custom class-level 

809 behavior for the ``__add__()`` method. 

810 

811 """ 

812 

813 _cache_attrs: Tuple[str, ...] 

814 

815 def __add__(self, other): 

816 o1 = self() 

817 

818 if set(other).difference(self._cache_attrs): 

819 raise TypeError( 

820 "dictionary contains attributes not covered by " 

821 "Options class %s: %r" 

822 % (self, set(other).difference(self._cache_attrs)) 

823 ) 

824 

825 o1.__dict__.update(other) 

826 return o1 

827 

828 if TYPE_CHECKING: 

829 

830 def __getattr__(self, key: str) -> Any: ... 

831 

832 def __setattr__(self, key: str, value: Any) -> None: ... 

833 

834 def __delattr__(self, key: str) -> None: ... 

835 

836 

837class Options(metaclass=_MetaOptions): 

838 """A cacheable option dictionary with defaults.""" 

839 

840 __slots__ = () 

841 

842 _cache_attrs: Tuple[str, ...] 

843 

844 def __init_subclass__(cls) -> None: 

845 dict_ = cls.__dict__ 

846 cls._cache_attrs = tuple( 

847 sorted( 

848 d 

849 for d in dict_ 

850 if not d.startswith("__") 

851 and d not in ("_cache_key_traversal",) 

852 ) 

853 ) 

854 super().__init_subclass__() 

855 

856 def __init__(self, **kw: Any) -> None: 

857 self.__dict__.update(kw) 

858 

859 def __add__(self, other): 

860 o1 = self.__class__.__new__(self.__class__) 

861 o1.__dict__.update(self.__dict__) 

862 

863 if set(other).difference(self._cache_attrs): 

864 raise TypeError( 

865 "dictionary contains attributes not covered by " 

866 "Options class %s: %r" 

867 % (self, set(other).difference(self._cache_attrs)) 

868 ) 

869 

870 o1.__dict__.update(other) 

871 return o1 

872 

873 def __eq__(self, other): 

874 # TODO: very inefficient. This is used only in test suites 

875 # right now. 

876 for a, b in zip_longest(self._cache_attrs, other._cache_attrs): 

877 if getattr(self, a) != getattr(other, b): 

878 return False 

879 return True 

880 

881 def __repr__(self) -> str: 

882 # TODO: fairly inefficient, used only in debugging right now. 

883 

884 return "%s(%s)" % ( 

885 self.__class__.__name__, 

886 ", ".join( 

887 "%s=%r" % (k, self.__dict__[k]) 

888 for k in self._cache_attrs 

889 if k in self.__dict__ 

890 ), 

891 ) 

892 

893 @classmethod 

894 def isinstance(cls, klass: Type[Any]) -> bool: 

895 return issubclass(cls, klass) 

896 

897 @hybridmethod 

898 def add_to_element(self, name: str, value: str) -> Any: 

899 return self + {name: getattr(self, name) + value} 

900 

901 @hybridmethod 

902 def _state_dict_inst(self) -> Mapping[str, Any]: 

903 return self.__dict__ 

904 

905 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT 

906 

907 @_state_dict_inst.classlevel 

908 def _state_dict(cls) -> Mapping[str, Any]: 

909 return cls._state_dict_const 

910 

911 @classmethod 

912 def safe_merge(cls, other: "Options") -> Any: 

913 d = other._state_dict() 

914 

915 # only support a merge with another object of our class 

916 # and which does not have attrs that we don't. otherwise 

917 # we risk having state that might not be part of our cache 

918 # key strategy 

919 

920 if ( 

921 cls is not other.__class__ 

922 and other._cache_attrs 

923 and set(other._cache_attrs).difference(cls._cache_attrs) 

924 ): 

925 raise TypeError( 

926 "other element %r is not empty, is not of type %s, " 

927 "and contains attributes not covered here %r" 

928 % ( 

929 other, 

930 cls, 

931 set(other._cache_attrs).difference(cls._cache_attrs), 

932 ) 

933 ) 

934 return cls + d 

935 

936 @classmethod 

937 def from_execution_options( 

938 cls, 

939 key: str, 

940 attrs: set[str], 

941 exec_options: Mapping[str, Any], 

942 statement_exec_options: Mapping[str, Any], 

943 ) -> Tuple["Options", Mapping[str, Any]]: 

944 """process Options argument in terms of execution options. 

945 

946 

947 e.g.:: 

948 

949 ( 

950 load_options, 

951 execution_options, 

952 ) = QueryContext.default_load_options.from_execution_options( 

953 "_sa_orm_load_options", 

954 {"populate_existing", "autoflush", "yield_per"}, 

955 execution_options, 

956 statement._execution_options, 

957 ) 

958 

959 get back the Options and refresh "_sa_orm_load_options" in the 

960 exec options dict w/ the Options as well 

961 

962 """ 

963 

964 # common case is that no options we are looking for are 

965 # in either dictionary, so cancel for that first 

966 check_argnames = attrs.intersection( 

967 set(exec_options).union(statement_exec_options) 

968 ) 

969 

970 existing_options = exec_options.get(key, cls) 

971 

972 if check_argnames: 

973 result = {} 

974 for argname in check_argnames: 

975 local = "_" + argname 

976 if argname in exec_options: 

977 result[local] = exec_options[argname] 

978 elif argname in statement_exec_options: 

979 result[local] = statement_exec_options[argname] 

980 

981 new_options = existing_options + result 

982 exec_options = util.EMPTY_DICT.merge_with( 

983 exec_options, {key: new_options} 

984 ) 

985 return new_options, exec_options 

986 

987 else: 

988 return existing_options, exec_options 

989 

990 if TYPE_CHECKING: 

991 

992 def __getattr__(self, key: str) -> Any: ... 

993 

994 def __setattr__(self, key: str, value: Any) -> None: ... 

995 

996 def __delattr__(self, key: str) -> None: ... 

997 

998 

999class CacheableOptions(Options, HasCacheKey): 

1000 __slots__ = () 

1001 

1002 @hybridmethod 

1003 def _gen_cache_key_inst( 

1004 self, anon_map: Any, bindparams: List[BindParameter[Any]] 

1005 ) -> Optional[Tuple[Any]]: 

1006 return HasCacheKey._gen_cache_key(self, anon_map, bindparams) 

1007 

1008 @_gen_cache_key_inst.classlevel 

1009 def _gen_cache_key( 

1010 cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]] 

1011 ) -> Tuple[CacheableOptions, Any]: 

1012 return (cls, ()) 

1013 

1014 @hybridmethod 

1015 def _generate_cache_key(self) -> Optional[CacheKey]: 

1016 return HasCacheKey._generate_cache_key(self) 

1017 

1018 

1019class ExecutableOption(HasCopyInternals): 

1020 __slots__ = () 

1021 

1022 _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT 

1023 

1024 __visit_name__: str = "executable_option" 

1025 

1026 _is_has_cache_key: bool = False 

1027 

1028 _is_core: bool = True 

1029 

1030 def _clone(self, **kw): 

1031 """Create a shallow copy of this ExecutableOption.""" 

1032 c = self.__class__.__new__(self.__class__) 

1033 c.__dict__ = dict(self.__dict__) # type: ignore 

1034 return c 

1035 

1036 

1037_L = TypeVar("_L", bound=str) 

1038 

1039 

1040class HasSyntaxExtensions(Generic[_L]): 

1041 

1042 _position_map: Mapping[_L, str] 

1043 

1044 @_generative 

1045 def ext(self, extension: SyntaxExtension) -> Self: 

1046 """Applies a SQL syntax extension to this statement. 

1047 

1048 SQL syntax extensions are :class:`.ClauseElement` objects that define 

1049 some vendor-specific syntactical construct that take place in specific 

1050 parts of a SQL statement. Examples include vendor extensions like 

1051 PostgreSQL / SQLite's "ON DUPLICATE KEY UPDATE", PostgreSQL's 

1052 "DISTINCT ON", and MySQL's "LIMIT" that can be applied to UPDATE 

1053 and DELETE statements. 

1054 

1055 .. seealso:: 

1056 

1057 :ref:`examples_syntax_extensions` 

1058 

1059 :func:`_mysql.limit` - DML LIMIT for MySQL 

1060 

1061 :func:`_postgresql.distinct_on` - DISTINCT ON for PostgreSQL 

1062 

1063 .. versionadded:: 2.1 

1064 

1065 """ 

1066 extension = coercions.expect( 

1067 roles.SyntaxExtensionRole, extension, apply_propagate_attrs=self 

1068 ) 

1069 self._apply_syntax_extension_to_self(extension) 

1070 return self 

1071 

1072 @util.preload_module("sqlalchemy.sql.elements") 

1073 def apply_syntax_extension_point( 

1074 self, 

1075 apply_fn: Callable[[Sequence[ClauseElement]], Sequence[ClauseElement]], 

1076 position: _L, 

1077 ) -> None: 

1078 """Apply a :class:`.SyntaxExtension` to a known extension point. 

1079 

1080 Should be used only internally by :class:`.SyntaxExtension`. 

1081 

1082 E.g.:: 

1083 

1084 class Qualify(SyntaxExtension, ClauseElement): 

1085 

1086 # ... 

1087 

1088 def apply_to_select(self, select_stmt: Select) -> None: 

1089 # append self to existing 

1090 select_stmt.apply_extension_point( 

1091 lambda existing: [*existing, self], "post_criteria" 

1092 ) 

1093 

1094 

1095 class ReplaceExt(SyntaxExtension, ClauseElement): 

1096 

1097 # ... 

1098 

1099 def apply_to_select(self, select_stmt: Select) -> None: 

1100 # replace any existing elements regardless of type 

1101 select_stmt.apply_extension_point( 

1102 lambda existing: [self], "post_criteria" 

1103 ) 

1104 

1105 

1106 class ReplaceOfTypeExt(SyntaxExtension, ClauseElement): 

1107 

1108 # ... 

1109 

1110 def apply_to_select(self, select_stmt: Select) -> None: 

1111 # replace any existing elements of the same type 

1112 select_stmt.apply_extension_point( 

1113 self.append_replacing_same_type, "post_criteria" 

1114 ) 

1115 

1116 :param apply_fn: callable function that will receive a sequence of 

1117 :class:`.ClauseElement` that is already populating the extension 

1118 point (the sequence is empty if there isn't one), and should return 

1119 a new sequence of :class:`.ClauseElement` that will newly populate 

1120 that point. The function typically can choose to concatenate the 

1121 existing values with the new one, or to replace the values that are 

1122 there with a new one by returning a list of a single element, or 

1123 to perform more complex operations like removing only the same 

1124 type element from the input list of merging already existing elements 

1125 of the same type. Some examples are shown in the examples above 

1126 :param position: string name of the position to apply to. This 

1127 varies per statement type. IDEs should show the possible values 

1128 for each statement type as it's typed with a ``typing.Literal`` per 

1129 statement. 

1130 

1131 .. seealso:: 

1132 

1133 :ref:`examples_syntax_extensions` 

1134 

1135 :meth:`.ext` 

1136 

1137 

1138 """ # noqa: E501 

1139 

1140 try: 

1141 attrname = self._position_map[position] 

1142 except KeyError as ke: 

1143 raise ValueError( 

1144 f"Unknown position {position!r} for {self.__class__} " 

1145 f"construct; known positions: " 

1146 f"{', '.join(repr(k) for k in self._position_map)}" 

1147 ) from ke 

1148 else: 

1149 ElementList = util.preloaded.sql_elements.ElementList 

1150 existing: Optional[ClauseElement] = getattr(self, attrname, None) 

1151 if existing is None: 

1152 input_seq: Tuple[ClauseElement, ...] = () 

1153 elif isinstance(existing, ElementList): 

1154 input_seq = existing.clauses 

1155 else: 

1156 input_seq = (existing,) 

1157 

1158 new_seq = apply_fn(input_seq) 

1159 assert new_seq, "cannot return empty sequence" 

1160 new = new_seq[0] if len(new_seq) == 1 else ElementList(new_seq) 

1161 setattr(self, attrname, new) 

1162 

1163 def _apply_syntax_extension_to_self( 

1164 self, extension: SyntaxExtension 

1165 ) -> None: 

1166 raise NotImplementedError() 

1167 

1168 def _get_syntax_extensions_as_dict(self) -> Mapping[_L, SyntaxExtension]: 

1169 res: Dict[_L, SyntaxExtension] = {} 

1170 for name, attr in self._position_map.items(): 

1171 value = getattr(self, attr) 

1172 if value is not None: 

1173 res[name] = value 

1174 return res 

1175 

1176 def _set_syntax_extensions(self, **extensions: SyntaxExtension) -> None: 

1177 for name, value in extensions.items(): 

1178 setattr(self, self._position_map[name], value) # type: ignore[index] # noqa: E501 

1179 

1180 

1181class SyntaxExtension(roles.SyntaxExtensionRole): 

1182 """Defines a unit that when also extending from :class:`.ClauseElement` 

1183 can be applied to SQLAlchemy statements :class:`.Select`, 

1184 :class:`_sql.Insert`, :class:`.Update` and :class:`.Delete` making use of 

1185 pre-established SQL insertion points within these constructs. 

1186 

1187 .. versionadded:: 2.1 

1188 

1189 .. seealso:: 

1190 

1191 :ref:`examples_syntax_extensions` 

1192 

1193 """ 

1194 

1195 def append_replacing_same_type( 

1196 self, existing: Sequence[ClauseElement] 

1197 ) -> Sequence[ClauseElement]: 

1198 """Utility function that can be used as 

1199 :paramref:`_sql.Select.apply_syntax_extension_point.apply_fn` 

1200 to remove any other element of the same type in existing and appending 

1201 ``self`` to the list. 

1202 

1203 This is equivalent to:: 

1204 

1205 stmt.apply_syntax_extension_point( 

1206 lambda existing: [ 

1207 *(e for e in existing if not isinstance(e, ReplaceOfTypeExt)), 

1208 self, 

1209 ], 

1210 "post_criteria", 

1211 ) 

1212 

1213 .. seealso:: 

1214 

1215 :ref:`examples_syntax_extensions` 

1216 

1217 :meth:`_sql.Select.apply_syntax_extension_point` and equivalents 

1218 in :class:`_dml.Insert`, :class:`_dml.Delete`, :class:`_dml.Update` 

1219 

1220 """ # noqa: E501 

1221 cls = type(self) 

1222 return [*(e for e in existing if not isinstance(e, cls)), self] # type: ignore[list-item] # noqa: E501 

1223 

1224 def apply_to_select(self, select_stmt: Select[Unpack[_Ts]]) -> None: 

1225 """Apply this :class:`.SyntaxExtension` to a :class:`.Select`""" 

1226 raise NotImplementedError( 

1227 f"Extension {type(self).__name__} cannot be applied to select" 

1228 ) 

1229 

1230 def apply_to_update(self, update_stmt: Update) -> None: 

1231 """Apply this :class:`.SyntaxExtension` to an :class:`.Update`""" 

1232 raise NotImplementedError( 

1233 f"Extension {type(self).__name__} cannot be applied to update" 

1234 ) 

1235 

1236 def apply_to_delete(self, delete_stmt: Delete) -> None: 

1237 """Apply this :class:`.SyntaxExtension` to a :class:`.Delete`""" 

1238 raise NotImplementedError( 

1239 f"Extension {type(self).__name__} cannot be applied to delete" 

1240 ) 

1241 

1242 def apply_to_insert(self, insert_stmt: Insert) -> None: 

1243 """Apply this :class:`.SyntaxExtension` to an :class:`_sql.Insert`""" 

1244 raise NotImplementedError( 

1245 f"Extension {type(self).__name__} cannot be applied to insert" 

1246 ) 

1247 

1248 

1249class Executable(roles.StatementRole): 

1250 """Mark a :class:`_expression.ClauseElement` as supporting execution. 

1251 

1252 :class:`.Executable` is a superclass for all "statement" types 

1253 of objects, including :func:`select`, :func:`delete`, :func:`update`, 

1254 :func:`insert`, :func:`text`. 

1255 

1256 """ 

1257 

1258 supports_execution: bool = True 

1259 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT 

1260 _is_default_generator: bool = False 

1261 _with_options: Tuple[ExecutableOption, ...] = () 

1262 _compile_state_funcs: Tuple[ 

1263 Tuple[Callable[[CompileState], None], Any], ... 

1264 ] = () 

1265 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]] 

1266 

1267 _executable_traverse_internals = [ 

1268 ("_with_options", InternalTraversal.dp_executable_options), 

1269 ( 

1270 "_compile_state_funcs", 

1271 ExtendedInternalTraversal.dp_compile_state_funcs, 

1272 ), 

1273 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs), 

1274 ] 

1275 

1276 is_select: bool = False 

1277 is_from_statement: bool = False 

1278 is_update: bool = False 

1279 is_insert: bool = False 

1280 is_text: bool = False 

1281 is_delete: bool = False 

1282 is_dml: bool = False 

1283 

1284 if TYPE_CHECKING: 

1285 __visit_name__: str 

1286 

1287 def _compile_w_cache( 

1288 self, 

1289 dialect: Dialect, 

1290 *, 

1291 compiled_cache: Optional[CompiledCacheType], 

1292 column_keys: List[str], 

1293 for_executemany: bool = False, 

1294 schema_translate_map: Optional[SchemaTranslateMapType] = None, 

1295 **kw: Any, 

1296 ) -> tuple[ 

1297 Compiled, 

1298 Sequence[BindParameter[Any]] | None, 

1299 _CoreSingleExecuteParams | None, 

1300 CacheStats, 

1301 ]: ... 

1302 

1303 def _execute_on_connection( 

1304 self, 

1305 connection: Connection, 

1306 distilled_params: _CoreMultiExecuteParams, 

1307 execution_options: CoreExecuteOptionsParameter, 

1308 ) -> CursorResult[Any]: ... 

1309 

1310 def _execute_on_scalar( 

1311 self, 

1312 connection: Connection, 

1313 distilled_params: _CoreMultiExecuteParams, 

1314 execution_options: CoreExecuteOptionsParameter, 

1315 ) -> Any: ... 

1316 

1317 @util.ro_non_memoized_property 

1318 def _all_selected_columns(self) -> _SelectIterable: 

1319 raise NotImplementedError() 

1320 

1321 @property 

1322 def _effective_plugin_target(self) -> str: 

1323 return self.__visit_name__ 

1324 

1325 @_generative 

1326 def options(self, *options: ExecutableOption) -> Self: 

1327 """Apply options to this statement. 

1328 

1329 In the general sense, options are any kind of Python object 

1330 that can be interpreted by systems that consume the statement outside 

1331 of the regular SQL compiler chain. Specifically, these options are 

1332 the ORM level options that apply "eager load" and other loading 

1333 behaviors to an ORM query. 

1334 

1335 For background on specific kinds of options for specific kinds of 

1336 statements, refer to the documentation for those option objects. 

1337 

1338 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to 

1339 Core statement objects towards the goal of allowing unified 

1340 Core / ORM querying capabilities. 

1341 

1342 .. seealso:: 

1343 

1344 :ref:`loading_columns` - refers to options specific to the usage 

1345 of ORM queries 

1346 

1347 :ref:`relationship_loader_options` - refers to options specific 

1348 to the usage of ORM queries 

1349 

1350 """ 

1351 self._with_options += tuple( 

1352 coercions.expect(roles.ExecutableOptionRole, opt) 

1353 for opt in options 

1354 ) 

1355 return self 

1356 

1357 @_generative 

1358 def _set_compile_options(self, compile_options: CacheableOptions) -> Self: 

1359 """Assign the compile options to a new value. 

1360 

1361 :param compile_options: appropriate CacheableOptions structure 

1362 

1363 """ 

1364 

1365 self._compile_options = compile_options 

1366 return self 

1367 

1368 @_generative 

1369 def _update_compile_options(self, options: CacheableOptions) -> Self: 

1370 """update the _compile_options with new keys.""" 

1371 

1372 assert self._compile_options is not None 

1373 self._compile_options += options 

1374 return self 

1375 

1376 @_generative 

1377 def _add_compile_state_func( 

1378 self, 

1379 callable_: Callable[[CompileState], None], 

1380 cache_args: Any, 

1381 ) -> Self: 

1382 """Add a compile state function to this statement. 

1383 

1384 When using the ORM only, these are callable functions that will 

1385 be given the CompileState object upon compilation. 

1386 

1387 A second argument cache_args is required, which will be combined with 

1388 the ``__code__`` identity of the function itself in order to produce a 

1389 cache key. 

1390 

1391 """ 

1392 self._compile_state_funcs += ((callable_, cache_args),) 

1393 return self 

1394 

1395 @overload 

1396 def execution_options( 

1397 self, 

1398 *, 

1399 compiled_cache: Optional[CompiledCacheType] = ..., 

1400 logging_token: str = ..., 

1401 isolation_level: IsolationLevel = ..., 

1402 no_parameters: bool = False, 

1403 stream_results: bool = False, 

1404 max_row_buffer: int = ..., 

1405 yield_per: int = ..., 

1406 driver_column_names: bool = ..., 

1407 insertmanyvalues_page_size: int = ..., 

1408 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

1409 populate_existing: bool = False, 

1410 autoflush: bool = False, 

1411 synchronize_session: SynchronizeSessionArgument = ..., 

1412 dml_strategy: DMLStrategyArgument = ..., 

1413 render_nulls: bool = ..., 

1414 is_delete_using: bool = ..., 

1415 is_update_from: bool = ..., 

1416 preserve_rowcount: bool = False, 

1417 **opt: Any, 

1418 ) -> Self: ... 

1419 

1420 @overload 

1421 def execution_options(self, **opt: Any) -> Self: ... 

1422 

1423 @_generative 

1424 def execution_options(self, **kw: Any) -> Self: 

1425 """Set non-SQL options for the statement which take effect during 

1426 execution. 

1427 

1428 Execution options can be set at many scopes, including per-statement, 

1429 per-connection, or per execution, using methods such as 

1430 :meth:`_engine.Connection.execution_options` and parameters which 

1431 accept a dictionary of options such as 

1432 :paramref:`_engine.Connection.execute.execution_options` and 

1433 :paramref:`_orm.Session.execute.execution_options`. 

1434 

1435 The primary characteristic of an execution option, as opposed to 

1436 other kinds of options such as ORM loader options, is that 

1437 **execution options never affect the compiled SQL of a query, only 

1438 things that affect how the SQL statement itself is invoked or how 

1439 results are fetched**. That is, execution options are not part of 

1440 what's accommodated by SQL compilation nor are they considered part of 

1441 the cached state of a statement. 

1442 

1443 The :meth:`_sql.Executable.execution_options` method is 

1444 :term:`generative`, as 

1445 is the case for the method as applied to the :class:`_engine.Engine` 

1446 and :class:`_orm.Query` objects, which means when the method is called, 

1447 a copy of the object is returned, which applies the given parameters to 

1448 that new copy, but leaves the original unchanged:: 

1449 

1450 statement = select(table.c.x, table.c.y) 

1451 new_statement = statement.execution_options(my_option=True) 

1452 

1453 An exception to this behavior is the :class:`_engine.Connection` 

1454 object, where the :meth:`_engine.Connection.execution_options` method 

1455 is explicitly **not** generative. 

1456 

1457 The kinds of options that may be passed to 

1458 :meth:`_sql.Executable.execution_options` and other related methods and 

1459 parameter dictionaries include parameters that are explicitly consumed 

1460 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not 

1461 defined by SQLAlchemy, which means the methods and/or parameter 

1462 dictionaries may be used for user-defined parameters that interact with 

1463 custom code, which may access the parameters using methods such as 

1464 :meth:`_sql.Executable.get_execution_options` and 

1465 :meth:`_engine.Connection.get_execution_options`, or within selected 

1466 event hooks using a dedicated ``execution_options`` event parameter 

1467 such as 

1468 :paramref:`_events.ConnectionEvents.before_execute.execution_options` 

1469 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.:: 

1470 

1471 from sqlalchemy import event 

1472 

1473 

1474 @event.listens_for(some_engine, "before_execute") 

1475 def _process_opt(conn, statement, multiparams, params, execution_options): 

1476 "run a SQL function before invoking a statement" 

1477 

1478 if execution_options.get("do_special_thing", False): 

1479 conn.exec_driver_sql("run_special_function()") 

1480 

1481 Within the scope of options that are explicitly recognized by 

1482 SQLAlchemy, most apply to specific classes of objects and not others. 

1483 The most common execution options include: 

1484 

1485 * :paramref:`_engine.Connection.execution_options.isolation_level` - 

1486 sets the isolation level for a connection or a class of connections 

1487 via an :class:`_engine.Engine`. This option is accepted only 

1488 by :class:`_engine.Connection` or :class:`_engine.Engine`. 

1489 

1490 * :paramref:`_engine.Connection.execution_options.stream_results` - 

1491 indicates results should be fetched using a server side cursor; 

1492 this option is accepted by :class:`_engine.Connection`, by the 

1493 :paramref:`_engine.Connection.execute.execution_options` parameter 

1494 on :meth:`_engine.Connection.execute`, and additionally by 

1495 :meth:`_sql.Executable.execution_options` on a SQL statement object, 

1496 as well as by ORM constructs like :meth:`_orm.Session.execute`. 

1497 

1498 * :paramref:`_engine.Connection.execution_options.compiled_cache` - 

1499 indicates a dictionary that will serve as the 

1500 :ref:`SQL compilation cache <sql_caching>` 

1501 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as 

1502 well as for ORM methods like :meth:`_orm.Session.execute`. 

1503 Can be passed as ``None`` to disable caching for statements. 

1504 This option is not accepted by 

1505 :meth:`_sql.Executable.execution_options` as it is inadvisable to 

1506 carry along a compilation cache within a statement object. 

1507 

1508 * :paramref:`_engine.Connection.execution_options.schema_translate_map` 

1509 - a mapping of schema names used by the 

1510 :ref:`Schema Translate Map <schema_translating>` feature, accepted 

1511 by :class:`_engine.Connection`, :class:`_engine.Engine`, 

1512 :class:`_sql.Executable`, as well as by ORM constructs 

1513 like :meth:`_orm.Session.execute`. 

1514 

1515 .. seealso:: 

1516 

1517 :meth:`_engine.Connection.execution_options` 

1518 

1519 :paramref:`_engine.Connection.execute.execution_options` 

1520 

1521 :paramref:`_orm.Session.execute.execution_options` 

1522 

1523 :ref:`orm_queryguide_execution_options` - documentation on all 

1524 ORM-specific execution options 

1525 

1526 """ # noqa: E501 

1527 if "isolation_level" in kw: 

1528 raise exc.ArgumentError( 

1529 "'isolation_level' execution option may only be specified " 

1530 "on Connection.execution_options(), or " 

1531 "per-engine using the isolation_level " 

1532 "argument to create_engine()." 

1533 ) 

1534 if "compiled_cache" in kw: 

1535 raise exc.ArgumentError( 

1536 "'compiled_cache' execution option may only be specified " 

1537 "on Connection.execution_options(), not per statement." 

1538 ) 

1539 self._execution_options = self._execution_options.union(kw) 

1540 return self 

1541 

1542 def get_execution_options(self) -> _ExecuteOptions: 

1543 """Get the non-SQL options which will take effect during execution. 

1544 

1545 .. seealso:: 

1546 

1547 :meth:`.Executable.execution_options` 

1548 """ 

1549 return self._execution_options 

1550 

1551 

1552class ExecutableStatement(Executable): 

1553 """Executable subclass that implements a lightweight version of ``params`` 

1554 that avoids a full cloned traverse. 

1555 

1556 .. versionadded:: 2.1 

1557 

1558 """ 

1559 

1560 _params: util.immutabledict[str, Any] = EMPTY_DICT 

1561 

1562 _executable_traverse_internals = ( 

1563 Executable._executable_traverse_internals 

1564 + [("_params", InternalTraversal.dp_params)] 

1565 ) 

1566 

1567 @_generative 

1568 def params( 

1569 self, 

1570 __optionaldict: _CoreSingleExecuteParams | None = None, 

1571 /, 

1572 **kwargs: Any, 

1573 ) -> Self: 

1574 """Return a copy with the provided bindparam values. 

1575 

1576 Returns a copy of this Executable with bindparam values set 

1577 to the given dictionary:: 

1578 

1579 >>> clause = column("x") + bindparam("foo") 

1580 >>> print(clause.compile().params) 

1581 {'foo': None} 

1582 >>> print(clause.params({"foo": 7}).compile().params) 

1583 {'foo': 7} 

1584 

1585 """ 

1586 if __optionaldict: 

1587 kwargs.update(__optionaldict) 

1588 self._params = ( 

1589 util.immutabledict(kwargs) 

1590 if not self._params 

1591 else self._params | kwargs 

1592 ) 

1593 return self 

1594 

1595 

1596class SchemaEventTarget(event.EventTarget): 

1597 """Base class for elements that are the targets of :class:`.DDLEvents` 

1598 events. 

1599 

1600 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`. 

1601 

1602 """ 

1603 

1604 dispatch: dispatcher[SchemaEventTarget] 

1605 

1606 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: 

1607 """Associate with this SchemaEvent's parent object.""" 

1608 

1609 def _set_parent_with_dispatch( 

1610 self, parent: SchemaEventTarget, **kw: Any 

1611 ) -> None: 

1612 self.dispatch.before_parent_attach(self, parent) 

1613 self._set_parent(parent, **kw) 

1614 self.dispatch.after_parent_attach(self, parent) 

1615 

1616 

1617class SchemaVisitable(SchemaEventTarget, visitors.Visitable): 

1618 """Base class for elements that are targets of a :class:`.SchemaVisitor`. 

1619 

1620 .. versionadded:: 2.0.41 

1621 

1622 """ 

1623 

1624 

1625class SchemaVisitor(ClauseVisitor): 

1626 """Define the visiting for ``SchemaItem`` and more 

1627 generally ``SchemaVisitable`` objects. 

1628 

1629 """ 

1630 

1631 __traverse_options__: Dict[str, Any] = {"schema_visitor": True} 

1632 

1633 

1634class _SentinelDefaultCharacterization(Enum): 

1635 NONE = "none" 

1636 UNKNOWN = "unknown" 

1637 CLIENTSIDE = "clientside" 

1638 SENTINEL_DEFAULT = "sentinel_default" 

1639 SERVERSIDE = "serverside" 

1640 IDENTITY = "identity" 

1641 SEQUENCE = "sequence" 

1642 MONOTONIC_FUNCTION = "monotonic" 

1643 

1644 

1645class _SentinelColumnCharacterization(NamedTuple): 

1646 columns: Optional[Sequence[Column[Any]]] = None 

1647 is_explicit: bool = False 

1648 is_autoinc: bool = False 

1649 default_characterization: _SentinelDefaultCharacterization = ( 

1650 _SentinelDefaultCharacterization.NONE 

1651 ) 

1652 

1653 

1654_COLKEY = TypeVar("_COLKEY", Union[None, str], str) 

1655 

1656_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True) 

1657_COL = TypeVar("_COL", bound="ColumnElement[Any]") 

1658 

1659 

1660class _ColumnMetrics(Generic[_COL_co]): 

1661 __slots__ = ("column",) 

1662 

1663 column: _COL_co 

1664 

1665 def __init__( 

1666 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co 

1667 ) -> None: 

1668 self.column = col 

1669 

1670 # proxy_index being non-empty means it was initialized. 

1671 # so we need to update it 

1672 pi = collection._proxy_index 

1673 if pi: 

1674 for eps_col in col._expanded_proxy_set: 

1675 pi[eps_col].add(self) 

1676 

1677 def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]: 

1678 return self.column._expanded_proxy_set 

1679 

1680 def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None: 

1681 pi = collection._proxy_index 

1682 if not pi: 

1683 return 

1684 for col in self.column._expanded_proxy_set: 

1685 colset = pi.get(col, None) 

1686 if colset: 

1687 colset.discard(self) 

1688 if colset is not None and not colset: 

1689 del pi[col] 

1690 

1691 def embedded( 

1692 self, 

1693 target_set: Union[ 

1694 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]] 

1695 ], 

1696 ) -> bool: 

1697 expanded_proxy_set = self.column._expanded_proxy_set 

1698 for t in target_set.difference(expanded_proxy_set): 

1699 if not expanded_proxy_set.intersection(_expand_cloned([t])): 

1700 return False 

1701 return True 

1702 

1703 

1704class ColumnCollection(Generic[_COLKEY, _COL_co]): 

1705 """Base class for collection of :class:`_expression.ColumnElement` 

1706 instances, typically for :class:`_sql.FromClause` objects. 

1707 

1708 The :class:`_sql.ColumnCollection` object is most commonly available 

1709 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection 

1710 on the :class:`_schema.Table` object, introduced at 

1711 :ref:`metadata_tables_and_columns`. 

1712 

1713 The :class:`_expression.ColumnCollection` has both mapping- and sequence- 

1714 like behaviors. A :class:`_expression.ColumnCollection` usually stores 

1715 :class:`_schema.Column` objects, which are then accessible both via mapping 

1716 style access as well as attribute access style. 

1717 

1718 To access :class:`_schema.Column` objects using ordinary attribute-style 

1719 access, specify the name like any other object attribute, such as below 

1720 a column named ``employee_name`` is accessed:: 

1721 

1722 >>> employee_table.c.employee_name 

1723 

1724 To access columns that have names with special characters or spaces, 

1725 index-style access is used, such as below which illustrates a column named 

1726 ``employee ' payment`` is accessed:: 

1727 

1728 >>> employee_table.c["employee ' payment"] 

1729 

1730 As the :class:`_sql.ColumnCollection` object provides a Python dictionary 

1731 interface, common dictionary method names like 

1732 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`, 

1733 and :meth:`_sql.ColumnCollection.items` are available, which means that 

1734 database columns that are keyed under these names also need to use indexed 

1735 access:: 

1736 

1737 >>> employee_table.c["values"] 

1738 

1739 

1740 The name for which a :class:`_schema.Column` would be present is normally 

1741 that of the :paramref:`_schema.Column.key` parameter. In some contexts, 

1742 such as a :class:`_sql.Select` object that uses a label style set 

1743 using the :meth:`_sql.Select.set_label_style` method, a column of a certain 

1744 key may instead be represented under a particular label name such 

1745 as ``tablename_columnname``:: 

1746 

1747 >>> from sqlalchemy import select, column, table 

1748 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL 

1749 >>> t = table("t", column("c")) 

1750 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) 

1751 >>> subq = stmt.subquery() 

1752 >>> subq.c.t_c 

1753 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c> 

1754 

1755 :class:`.ColumnCollection` also indexes the columns in order and allows 

1756 them to be accessible by their integer position:: 

1757 

1758 >>> cc[0] 

1759 Column('x', Integer(), table=None) 

1760 >>> cc[1] 

1761 Column('y', Integer(), table=None) 

1762 

1763 .. versionadded:: 1.4 :class:`_expression.ColumnCollection` 

1764 allows integer-based 

1765 index access to the collection. 

1766 

1767 Iterating the collection yields the column expressions in order:: 

1768 

1769 >>> list(cc) 

1770 [Column('x', Integer(), table=None), 

1771 Column('y', Integer(), table=None)] 

1772 

1773 The :class:`_expression.ColumnCollection` base class is read-only. 

1774 For mutation operations, the :class:`.WriteableColumnCollection` subclass 

1775 provides methods such as :meth:`.WriteableColumnCollection.add`. 

1776 A special subclass :class:`.DedupeColumnCollection` exists which 

1777 maintains SQLAlchemy's older behavior of not allowing duplicates; this 

1778 collection is used for schema level objects like :class:`_schema.Table` 

1779 and :class:`.PrimaryKeyConstraint` where this deduping is helpful. 

1780 The :class:`.DedupeColumnCollection` class also has additional mutation 

1781 methods as the schema constructs have more use cases that require removal 

1782 and replacement of columns. 

1783 

1784 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection` 

1785 now stores duplicate 

1786 column keys as well as the same column in multiple positions. The 

1787 :class:`.DedupeColumnCollection` class is added to maintain the 

1788 former behavior in those cases where deduplication as well as 

1789 additional replace/remove operations are needed. 

1790 

1791 .. versionchanged:: 2.1 :class:`_expression.ColumnCollection` is now 

1792 a read-only base class. Mutation operations are available through 

1793 :class:`.WriteableColumnCollection` and :class:`.DedupeColumnCollection` 

1794 subclasses. 

1795 

1796 

1797 """ 

1798 

1799 __slots__ = ("_collection", "_index", "_colset", "_proxy_index") 

1800 

1801 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]] 

1802 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]] 

1803 _colset: Set[_COL_co] 

1804 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]] 

1805 

1806 def __init__(self) -> None: 

1807 raise TypeError( 

1808 "ColumnCollection is an abstract base class and cannot be " 

1809 "instantiated directly. Use WriteableColumnCollection or " 

1810 "DedupeColumnCollection instead." 

1811 ) 

1812 

1813 @util.preload_module("sqlalchemy.sql.elements") 

1814 def __clause_element__(self) -> ClauseList: 

1815 elements = util.preloaded.sql_elements 

1816 

1817 return elements.ClauseList( 

1818 _literal_as_text_role=roles.ColumnsClauseRole, 

1819 group=False, 

1820 *self._all_columns, 

1821 ) 

1822 

1823 @property 

1824 def _all_columns(self) -> List[_COL_co]: 

1825 return [col for (_, col, _) in self._collection] 

1826 

1827 def keys(self) -> List[_COLKEY]: 

1828 """Return a sequence of string key names for all columns in this 

1829 collection.""" 

1830 return [k for (k, _, _) in self._collection] 

1831 

1832 def values(self) -> List[_COL_co]: 

1833 """Return a sequence of :class:`_sql.ColumnClause` or 

1834 :class:`_schema.Column` objects for all columns in this 

1835 collection.""" 

1836 return [col for (_, col, _) in self._collection] 

1837 

1838 def items(self) -> List[Tuple[_COLKEY, _COL_co]]: 

1839 """Return a sequence of (key, column) tuples for all columns in this 

1840 collection each consisting of a string key name and a 

1841 :class:`_sql.ColumnClause` or 

1842 :class:`_schema.Column` object. 

1843 """ 

1844 

1845 return [(k, col) for (k, col, _) in self._collection] 

1846 

1847 def __bool__(self) -> bool: 

1848 return bool(self._collection) 

1849 

1850 def __len__(self) -> int: 

1851 return len(self._collection) 

1852 

1853 def __iter__(self) -> Iterator[_COL_co]: 

1854 # turn to a list first to maintain over a course of changes 

1855 return iter([col for _, col, _ in self._collection]) 

1856 

1857 @overload 

1858 def __getitem__(self, key: Union[str, int]) -> _COL_co: ... 

1859 

1860 @overload 

1861 def __getitem__( 

1862 self, key: Tuple[Union[str, int], ...] 

1863 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1864 

1865 @overload 

1866 def __getitem__( 

1867 self, key: slice 

1868 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1869 

1870 def __getitem__( 

1871 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]] 

1872 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]: 

1873 try: 

1874 if isinstance(key, (tuple, slice)): 

1875 if isinstance(key, slice): 

1876 cols = ( 

1877 (sub_key, col) 

1878 for (sub_key, col, _) in self._collection[key] 

1879 ) 

1880 else: 

1881 cols = (self._index[sub_key] for sub_key in key) 

1882 

1883 return WriteableColumnCollection(cols).as_readonly() 

1884 else: 

1885 return self._index[key][1] 

1886 except KeyError as err: 

1887 if isinstance(err.args[0], int): 

1888 raise IndexError(err.args[0]) from err 

1889 else: 

1890 raise 

1891 

1892 def __getattr__(self, key: str) -> _COL_co: 

1893 try: 

1894 return self._index[key][1] 

1895 except KeyError as err: 

1896 raise AttributeError(key) from err 

1897 

1898 def __contains__(self, key: str) -> bool: 

1899 if key not in self._index: 

1900 if not isinstance(key, str): 

1901 raise exc.ArgumentError( 

1902 "__contains__ requires a string argument" 

1903 ) 

1904 return False 

1905 else: 

1906 return True 

1907 

1908 def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool: 

1909 """Compare this :class:`_expression.ColumnCollection` to another 

1910 based on the names of the keys""" 

1911 

1912 for l, r in zip_longest(self, other): 

1913 if l is not r: 

1914 return False 

1915 else: 

1916 return True 

1917 

1918 def __eq__(self, other: Any) -> bool: 

1919 return self.compare(other) 

1920 

1921 @overload 

1922 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ... 

1923 

1924 @overload 

1925 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ... 

1926 

1927 def get( 

1928 self, key: str, default: Optional[_COL] = None 

1929 ) -> Optional[Union[_COL_co, _COL]]: 

1930 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object 

1931 based on a string key name from this 

1932 :class:`_expression.ColumnCollection`.""" 

1933 

1934 if key in self._index: 

1935 return self._index[key][1] 

1936 else: 

1937 return default 

1938 

1939 def __str__(self) -> str: 

1940 return "%s(%s)" % ( 

1941 self.__class__.__name__, 

1942 ", ".join(str(c) for c in self), 

1943 ) 

1944 

1945 # https://github.com/python/mypy/issues/4266 

1946 __hash__: Optional[int] = None # type: ignore 

1947 

1948 def contains_column(self, col: ColumnElement[Any]) -> bool: 

1949 """Checks if a column object exists in this collection""" 

1950 if col not in self._colset: 

1951 if isinstance(col, str): 

1952 raise exc.ArgumentError( 

1953 "contains_column cannot be used with string arguments. " 

1954 "Use ``col_name in table.c`` instead." 

1955 ) 

1956 return False 

1957 else: 

1958 return True 

1959 

1960 def _as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: 

1961 raise NotImplementedError() 

1962 

1963 def corresponding_column( 

1964 self, column: _COL, require_embedded: bool = False 

1965 ) -> Optional[Union[_COL, _COL_co]]: 

1966 """Given a :class:`_expression.ColumnElement`, return the exported 

1967 :class:`_expression.ColumnElement` object from this 

1968 :class:`_expression.ColumnCollection` 

1969 which corresponds to that original :class:`_expression.ColumnElement` 

1970 via a common 

1971 ancestor column. 

1972 

1973 :param column: the target :class:`_expression.ColumnElement` 

1974 to be matched. 

1975 

1976 :param require_embedded: only return corresponding columns for 

1977 the given :class:`_expression.ColumnElement`, if the given 

1978 :class:`_expression.ColumnElement` 

1979 is actually present within a sub-element 

1980 of this :class:`_expression.Selectable`. 

1981 Normally the column will match if 

1982 it merely shares a common ancestor with one of the exported 

1983 columns of this :class:`_expression.Selectable`. 

1984 

1985 .. seealso:: 

1986 

1987 :meth:`_expression.Selectable.corresponding_column` 

1988 - invokes this method 

1989 against the collection returned by 

1990 :attr:`_expression.Selectable.exported_columns`. 

1991 

1992 .. versionchanged:: 1.4 the implementation for ``corresponding_column`` 

1993 was moved onto the :class:`_expression.ColumnCollection` itself. 

1994 

1995 """ 

1996 raise NotImplementedError() 

1997 

1998 

1999class WriteableColumnCollection(ColumnCollection[_COLKEY, _COL_co]): 

2000 """A :class:`_sql.ColumnCollection` that allows mutation operations. 

2001 

2002 This is the writable form of :class:`_sql.ColumnCollection` that 

2003 implements methods such as :meth:`.add`, :meth:`.remove`, :meth:`.update`, 

2004 and :meth:`.clear`. 

2005 

2006 This class is used internally for building column collections during 

2007 construction of SQL constructs. For schema-level objects that require 

2008 deduplication behavior, use :class:`.DedupeColumnCollection`. 

2009 

2010 .. versionadded:: 2.1 

2011 

2012 """ 

2013 

2014 __slots__ = () 

2015 

2016 def __init__( 

2017 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None 

2018 ): 

2019 object.__setattr__(self, "_colset", set()) 

2020 object.__setattr__(self, "_index", {}) 

2021 object.__setattr__( 

2022 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

2023 ) 

2024 object.__setattr__(self, "_collection", []) 

2025 if columns: 

2026 self._initial_populate(columns) 

2027 

2028 def _initial_populate( 

2029 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

2030 ) -> None: 

2031 self._populate_separate_keys(iter_) 

2032 

2033 def _populate_separate_keys( 

2034 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

2035 ) -> None: 

2036 """populate from an iterator of (key, column)""" 

2037 

2038 self._collection[:] = collection = [ 

2039 (k, c, _ColumnMetrics(self, c)) for k, c in iter_ 

2040 ] 

2041 self._colset.update(c._deannotate() for _, c, _ in collection) 

2042 self._index.update( 

2043 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)} 

2044 ) 

2045 self._index.update({k: (k, col) for k, col, _ in reversed(collection)}) 

2046 

2047 def __getstate__(self) -> Dict[str, Any]: 

2048 return { 

2049 "_collection": [(k, c) for k, c, _ in self._collection], 

2050 "_index": self._index, 

2051 } 

2052 

2053 def __setstate__(self, state: Dict[str, Any]) -> None: 

2054 object.__setattr__(self, "_index", state["_index"]) 

2055 object.__setattr__( 

2056 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

2057 ) 

2058 object.__setattr__( 

2059 self, 

2060 "_collection", 

2061 [ 

2062 (k, c, _ColumnMetrics(self, c)) 

2063 for (k, c) in state["_collection"] 

2064 ], 

2065 ) 

2066 object.__setattr__( 

2067 self, "_colset", {col for k, col, _ in self._collection} 

2068 ) 

2069 

2070 def add( 

2071 self, 

2072 column: ColumnElement[Any], 

2073 key: Optional[_COLKEY] = None, 

2074 ) -> None: 

2075 """Add a column to this :class:`_sql.WriteableColumnCollection`. 

2076 

2077 .. note:: 

2078 

2079 This method is **not normally used by user-facing code**, as the 

2080 :class:`_sql.WriteableColumnCollection` is usually part of an 

2081 existing object such as a :class:`_schema.Table`. To add a 

2082 :class:`_schema.Column` to an existing :class:`_schema.Table` 

2083 object, use the :meth:`_schema.Table.append_column` method. 

2084 

2085 """ 

2086 colkey: _COLKEY 

2087 

2088 if key is None: 

2089 colkey = column.key # type: ignore 

2090 else: 

2091 colkey = key 

2092 

2093 l = len(self._collection) 

2094 

2095 # don't really know how this part is supposed to work w/ the 

2096 # covariant thing 

2097 

2098 _column = cast(_COL_co, column) 

2099 

2100 self._collection.append( 

2101 (colkey, _column, _ColumnMetrics(self, _column)) 

2102 ) 

2103 self._colset.add(_column._deannotate()) 

2104 

2105 self._index[l] = (colkey, _column) 

2106 if colkey not in self._index: 

2107 self._index[colkey] = (colkey, _column) 

2108 

2109 def _as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: 

2110 return ReadOnlyColumnCollection(self) 

2111 

2112 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: 

2113 """Return a "read only" form of this 

2114 :class:`_sql.WriteableColumnCollection`.""" 

2115 

2116 return self._as_readonly() 

2117 

2118 def _init_proxy_index(self) -> None: 

2119 """populate the "proxy index", if empty. 

2120 

2121 proxy index is added in 2.0 to provide more efficient operation 

2122 for the corresponding_column() method. 

2123 

2124 For reasons of both time to construct new .c collections as well as 

2125 memory conservation for large numbers of large .c collections, the 

2126 proxy_index is only filled if corresponding_column() is called. once 

2127 filled it stays that way, and new _ColumnMetrics objects created after 

2128 that point will populate it with new data. Note this case would be 

2129 unusual, if not nonexistent, as it means a .c collection is being 

2130 mutated after corresponding_column() were used, however it is tested in 

2131 test/base/test_utils.py. 

2132 

2133 """ 

2134 pi = self._proxy_index 

2135 if pi: 

2136 return 

2137 

2138 for _, _, metrics in self._collection: 

2139 eps = metrics.column._expanded_proxy_set 

2140 

2141 for eps_col in eps: 

2142 pi[eps_col].add(metrics) 

2143 

2144 def corresponding_column( 

2145 self, column: _COL, require_embedded: bool = False 

2146 ) -> Optional[Union[_COL, _COL_co]]: 

2147 """Given a :class:`_expression.ColumnElement`, return the exported 

2148 :class:`_expression.ColumnElement` object from this 

2149 :class:`_expression.ColumnCollection` 

2150 which corresponds to that original :class:`_expression.ColumnElement` 

2151 via a common 

2152 ancestor column. 

2153 

2154 See :meth:`.ColumnCollection.corresponding_column` for parameter 

2155 information. 

2156 

2157 """ 

2158 # TODO: cython candidate 

2159 

2160 # don't dig around if the column is locally present 

2161 if column in self._colset: 

2162 return column 

2163 

2164 selected_intersection, selected_metrics = None, None 

2165 target_set = column.proxy_set 

2166 

2167 pi = self._proxy_index 

2168 if not pi: 

2169 self._init_proxy_index() 

2170 

2171 for current_metrics in ( 

2172 mm for ts in target_set if ts in pi for mm in pi[ts] 

2173 ): 

2174 if not require_embedded or current_metrics.embedded(target_set): 

2175 if selected_metrics is None: 

2176 # no corresponding column yet, pick this one. 

2177 selected_metrics = current_metrics 

2178 continue 

2179 

2180 current_intersection = target_set.intersection( 

2181 current_metrics.column._expanded_proxy_set 

2182 ) 

2183 if selected_intersection is None: 

2184 selected_intersection = target_set.intersection( 

2185 selected_metrics.column._expanded_proxy_set 

2186 ) 

2187 

2188 if len(current_intersection) > len(selected_intersection): 

2189 # 'current' has a larger field of correspondence than 

2190 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x 

2191 # matches a1.c.x->table.c.x better than 

2192 # selectable.c.x->table.c.x does. 

2193 

2194 selected_metrics = current_metrics 

2195 selected_intersection = current_intersection 

2196 elif current_intersection == selected_intersection: 

2197 # they have the same field of correspondence. see 

2198 # which proxy_set has fewer columns in it, which 

2199 # indicates a closer relationship with the root 

2200 # column. Also take into account the "weight" 

2201 # attribute which CompoundSelect() uses to give 

2202 # higher precedence to columns based on vertical 

2203 # position in the compound statement, and discard 

2204 # columns that have no reference to the target 

2205 # column (also occurs with CompoundSelect) 

2206 

2207 selected_col_distance = sum( 

2208 [ 

2209 sc._annotations.get("weight", 1) 

2210 for sc in ( 

2211 selected_metrics.column._uncached_proxy_list() 

2212 ) 

2213 if sc.shares_lineage(column) 

2214 ], 

2215 ) 

2216 current_col_distance = sum( 

2217 [ 

2218 sc._annotations.get("weight", 1) 

2219 for sc in ( 

2220 current_metrics.column._uncached_proxy_list() 

2221 ) 

2222 if sc.shares_lineage(column) 

2223 ], 

2224 ) 

2225 if current_col_distance < selected_col_distance: 

2226 selected_metrics = current_metrics 

2227 selected_intersection = current_intersection 

2228 

2229 return selected_metrics.column if selected_metrics else None 

2230 

2231 

2232_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]") 

2233 

2234 

2235class DedupeColumnCollection(WriteableColumnCollection[str, _NAMEDCOL]): 

2236 """A :class:`_expression.ColumnCollection` 

2237 that maintains deduplicating behavior. 

2238 

2239 This is useful by schema level objects such as :class:`_schema.Table` and 

2240 :class:`.PrimaryKeyConstraint`. The collection includes more 

2241 sophisticated mutator methods as well to suit schema objects which 

2242 require mutable column collections. 

2243 

2244 .. versionadded:: 1.4 

2245 

2246 """ 

2247 

2248 def add( # type: ignore[override] 

2249 self, 

2250 column: _NAMEDCOL, 

2251 key: Optional[str] = None, 

2252 *, 

2253 index: Optional[int] = None, 

2254 ) -> None: 

2255 if key is not None and column.key != key: 

2256 raise exc.ArgumentError( 

2257 "DedupeColumnCollection requires columns be under " 

2258 "the same key as their .key" 

2259 ) 

2260 key = column.key 

2261 

2262 if key is None: 

2263 raise exc.ArgumentError( 

2264 "Can't add unnamed column to column collection" 

2265 ) 

2266 

2267 if key in self._index: 

2268 existing = self._index[key][1] 

2269 

2270 if existing is column: 

2271 return 

2272 

2273 self.replace(column, index=index) 

2274 

2275 # pop out memoized proxy_set as this 

2276 # operation may very well be occurring 

2277 # in a _make_proxy operation 

2278 util.memoized_property.reset(column, "proxy_set") 

2279 else: 

2280 self._append_new_column(key, column, index=index) 

2281 

2282 def _append_new_column( 

2283 self, key: str, named_column: _NAMEDCOL, *, index: Optional[int] = None 

2284 ) -> None: 

2285 collection_length = len(self._collection) 

2286 

2287 if index is None: 

2288 l = collection_length 

2289 else: 

2290 if index < 0: 

2291 index = max(0, collection_length + index) 

2292 l = index 

2293 

2294 if index is None: 

2295 self._collection.append( 

2296 (key, named_column, _ColumnMetrics(self, named_column)) 

2297 ) 

2298 else: 

2299 self._collection.insert( 

2300 index, (key, named_column, _ColumnMetrics(self, named_column)) 

2301 ) 

2302 

2303 self._colset.add(named_column._deannotate()) 

2304 

2305 if index is not None: 

2306 for idx in reversed(range(index, collection_length)): 

2307 self._index[idx + 1] = self._index[idx] 

2308 

2309 self._index[l] = (key, named_column) 

2310 self._index[key] = (key, named_column) 

2311 

2312 def _populate_separate_keys( 

2313 self, iter_: Iterable[Tuple[str, _NAMEDCOL]] 

2314 ) -> None: 

2315 """populate from an iterator of (key, column)""" 

2316 cols = list(iter_) 

2317 

2318 replace_col = [] 

2319 for k, col in cols: 

2320 if col.key != k: 

2321 raise exc.ArgumentError( 

2322 "DedupeColumnCollection requires columns be under " 

2323 "the same key as their .key" 

2324 ) 

2325 if col.name in self._index and col.key != col.name: 

2326 replace_col.append(col) 

2327 elif col.key in self._index: 

2328 replace_col.append(col) 

2329 else: 

2330 self._index[k] = (k, col) 

2331 self._collection.append((k, col, _ColumnMetrics(self, col))) 

2332 self._colset.update(c._deannotate() for (k, c, _) in self._collection) 

2333 

2334 self._index.update( 

2335 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection) 

2336 ) 

2337 for col in replace_col: 

2338 self.replace(col) 

2339 

2340 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None: 

2341 self._populate_separate_keys((col.key, col) for col in iter_) 

2342 

2343 def remove(self, column: _NAMEDCOL) -> None: 

2344 if column not in self._colset: 

2345 raise ValueError( 

2346 "Can't remove column %r; column is not in this collection" 

2347 % column 

2348 ) 

2349 del self._index[column.key] 

2350 self._colset.remove(column) 

2351 self._collection[:] = [ 

2352 (k, c, metrics) 

2353 for (k, c, metrics) in self._collection 

2354 if c is not column 

2355 ] 

2356 for metrics in self._proxy_index.get(column, ()): 

2357 metrics.dispose(self) 

2358 

2359 self._index.update( 

2360 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2361 ) 

2362 # delete higher index 

2363 del self._index[len(self._collection)] 

2364 

2365 def replace( 

2366 self, 

2367 column: _NAMEDCOL, 

2368 *, 

2369 extra_remove: Optional[Iterable[_NAMEDCOL]] = None, 

2370 index: Optional[int] = None, 

2371 ) -> None: 

2372 """add the given column to this collection, removing unaliased 

2373 versions of this column as well as existing columns with the 

2374 same key. 

2375 

2376 e.g.:: 

2377 

2378 t = Table("sometable", metadata, Column("col1", Integer)) 

2379 t.columns.replace(Column("col1", Integer, key="columnone")) 

2380 

2381 will remove the original 'col1' from the collection, and add 

2382 the new column under the name 'columnname'. 

2383 

2384 Used by schema.Column to override columns during table reflection. 

2385 

2386 """ 

2387 

2388 if extra_remove: 

2389 remove_col = set(extra_remove) 

2390 else: 

2391 remove_col = set() 

2392 # remove up to two columns based on matches of name as well as key 

2393 if column.name in self._index and column.key != column.name: 

2394 other = self._index[column.name][1] 

2395 if other.name == other.key: 

2396 remove_col.add(other) 

2397 

2398 if column.key in self._index: 

2399 remove_col.add(self._index[column.key][1]) 

2400 

2401 if not remove_col: 

2402 self._append_new_column(column.key, column, index=index) 

2403 return 

2404 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = [] 

2405 replace_index = None 

2406 

2407 for idx, (k, col, metrics) in enumerate(self._collection): 

2408 if col in remove_col: 

2409 if replace_index is None: 

2410 replace_index = idx 

2411 new_cols.append( 

2412 (column.key, column, _ColumnMetrics(self, column)) 

2413 ) 

2414 else: 

2415 new_cols.append((k, col, metrics)) 

2416 

2417 if remove_col: 

2418 self._colset.difference_update(remove_col) 

2419 

2420 for rc in remove_col: 

2421 for metrics in self._proxy_index.get(rc, ()): 

2422 metrics.dispose(self) 

2423 

2424 if replace_index is None: 

2425 if index is not None: 

2426 new_cols.insert( 

2427 index, (column.key, column, _ColumnMetrics(self, column)) 

2428 ) 

2429 

2430 else: 

2431 new_cols.append( 

2432 (column.key, column, _ColumnMetrics(self, column)) 

2433 ) 

2434 elif index is not None: 

2435 to_move = new_cols[replace_index] 

2436 effective_positive_index = ( 

2437 index if index >= 0 else max(0, len(new_cols) + index) 

2438 ) 

2439 new_cols.insert(index, to_move) 

2440 if replace_index > effective_positive_index: 

2441 del new_cols[replace_index + 1] 

2442 else: 

2443 del new_cols[replace_index] 

2444 

2445 self._colset.add(column._deannotate()) 

2446 self._collection[:] = new_cols 

2447 

2448 self._index.clear() 

2449 

2450 self._index.update( 

2451 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2452 ) 

2453 self._index.update({k: (k, col) for (k, col, _) in self._collection}) 

2454 

2455 

2456class ReadOnlyColumnCollection( 

2457 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co] 

2458): 

2459 __slots__ = ("_parent",) 

2460 

2461 _parent: WriteableColumnCollection[_COLKEY, _COL_co] 

2462 

2463 def __init__( 

2464 self, collection: WriteableColumnCollection[_COLKEY, _COL_co] 

2465 ): 

2466 object.__setattr__(self, "_parent", collection) 

2467 object.__setattr__(self, "_index", collection._index) 

2468 object.__setattr__(self, "_collection", collection._collection) 

2469 object.__setattr__(self, "_colset", collection._colset) 

2470 object.__setattr__(self, "_proxy_index", collection._proxy_index) 

2471 

2472 def _as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: 

2473 return self 

2474 

2475 def __getstate__(self) -> Dict[str, ColumnCollection[_COLKEY, _COL_co]]: 

2476 return {"_parent": self._parent} 

2477 

2478 def __setstate__(self, state: Dict[str, Any]) -> None: 

2479 parent = state["_parent"] 

2480 self.__init__(parent) # type: ignore 

2481 

2482 def corresponding_column( 

2483 self, column: _COL, require_embedded: bool = False 

2484 ) -> Optional[Union[_COL, _COL_co]]: 

2485 """Given a :class:`_expression.ColumnElement`, return the exported 

2486 :class:`_expression.ColumnElement` object from this 

2487 :class:`_expression.ColumnCollection` 

2488 which corresponds to that original :class:`_expression.ColumnElement` 

2489 via a common 

2490 ancestor column. 

2491 

2492 See :meth:`.ColumnCollection.corresponding_column` for parameter 

2493 information. 

2494 

2495 """ 

2496 return self._parent.corresponding_column(column, require_embedded) 

2497 

2498 

2499class ColumnSet(util.OrderedSet["ColumnClause[Any]"]): 

2500 def contains_column(self, col: ColumnClause[Any]) -> bool: 

2501 return col in self 

2502 

2503 def extend(self, cols: Iterable[Any]) -> None: 

2504 for col in cols: 

2505 self.add(col) 

2506 

2507 def __eq__(self, other): 

2508 l = [] 

2509 for c in other: 

2510 for local in self: 

2511 if c.shares_lineage(local): 

2512 l.append(c == local) 

2513 return elements.and_(*l) 

2514 

2515 def __hash__(self) -> int: # type: ignore[override] 

2516 return hash(tuple(x for x in self)) 

2517 

2518 

2519def _entity_namespace( 

2520 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2521) -> _EntityNamespace: 

2522 """Return the nearest .entity_namespace for the given entity. 

2523 

2524 If not immediately available, does an iterate to find a sub-element 

2525 that has one, if any. 

2526 

2527 """ 

2528 try: 

2529 return cast(_HasEntityNamespace, entity).entity_namespace 

2530 except AttributeError: 

2531 for elem in visitors.iterate(cast(ExternallyTraversible, entity)): 

2532 if _is_has_entity_namespace(elem): 

2533 return elem.entity_namespace 

2534 else: 

2535 raise 

2536 

2537 

2538@overload 

2539def _entity_namespace_key( 

2540 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2541 key: str, 

2542) -> SQLCoreOperations[Any]: ... 

2543 

2544 

2545@overload 

2546def _entity_namespace_key( 

2547 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2548 key: str, 

2549 default: _NoArg, 

2550) -> SQLCoreOperations[Any]: ... 

2551 

2552 

2553@overload 

2554def _entity_namespace_key( 

2555 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2556 key: str, 

2557 default: _T, 

2558) -> Union[SQLCoreOperations[Any], _T]: ... 

2559 

2560 

2561def _entity_namespace_key( 

2562 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2563 key: str, 

2564 default: Union[SQLCoreOperations[Any], _T, _NoArg] = NO_ARG, 

2565) -> Union[SQLCoreOperations[Any], _T]: 

2566 """Return an entry from an entity_namespace. 

2567 

2568 

2569 Raises :class:`_exc.InvalidRequestError` rather than attribute error 

2570 on not found. 

2571 

2572 """ 

2573 

2574 try: 

2575 ns = _entity_namespace(entity) 

2576 if default is not NO_ARG: 

2577 return getattr(ns, key, default) 

2578 else: 

2579 return getattr(ns, key) # type: ignore 

2580 except AttributeError as err: 

2581 raise exc.InvalidRequestError( 

2582 'Entity namespace for "%s" has no property "%s"' % (entity, key) 

2583 ) from err 

2584 

2585 

2586def _entity_namespace_key_search_all( 

2587 entities: Collection[Any], 

2588 key: str, 

2589) -> SQLCoreOperations[Any]: 

2590 """Search multiple entities for a key, raise if ambiguous or not found. 

2591 

2592 This is used by filter_by() to search across all FROM clause entities 

2593 when a single entity doesn't have the requested attribute. 

2594 

2595 .. versionadded:: 2.1 

2596 

2597 Raises: 

2598 AmbiguousColumnError: If key exists in multiple entities 

2599 InvalidRequestError: If key doesn't exist in any entity 

2600 """ 

2601 

2602 match_: SQLCoreOperations[Any] | None = None 

2603 

2604 for entity in entities: 

2605 ns = _entity_namespace(entity) 

2606 # Check if the attribute exists 

2607 if hasattr(ns, key): 

2608 if match_ is not None: 

2609 entity_desc = ", ".join(str(e) for e in list(entities)[:3]) 

2610 if len(entities) > 3: 

2611 entity_desc += f", ... ({len(entities)} total)" 

2612 raise exc.AmbiguousColumnError( 

2613 f'Attribute name "{key}" is ambiguous; it exists in ' 

2614 f"multiple FROM clause entities ({entity_desc}). " 

2615 f"Use filter() with explicit column references instead " 

2616 f"of filter_by()." 

2617 ) 

2618 match_ = getattr(ns, key) 

2619 

2620 if match_ is None: 

2621 # No entity has this attribute 

2622 entity_desc = ", ".join(str(e) for e in list(entities)[:3]) 

2623 if len(entities) > 3: 

2624 entity_desc += f", ... ({len(entities)} total)" 

2625 raise exc.InvalidRequestError( 

2626 f'None of the FROM clause entities have a property "{key}". ' 

2627 f"Searched entities: {entity_desc}" 

2628 ) 

2629 

2630 return match_