Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/compiler.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

3087 statements  

1# sql/compiler.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Base SQL and DDL compiler implementations. 

10 

11Classes provided include: 

12 

13:class:`.compiler.SQLCompiler` - renders SQL 

14strings 

15 

16:class:`.compiler.DDLCompiler` - renders DDL 

17(data definition language) strings 

18 

19:class:`.compiler.GenericTypeCompiler` - renders 

20type specification strings. 

21 

22To generate user-defined SQL strings, see 

23:doc:`/ext/compiler`. 

24 

25""" 

26from __future__ import annotations 

27 

28import collections 

29import collections.abc as collections_abc 

30import contextlib 

31from enum import IntEnum 

32import functools 

33import itertools 

34import operator 

35import re 

36from time import perf_counter 

37import typing 

38from typing import Any 

39from typing import Callable 

40from typing import cast 

41from typing import ClassVar 

42from typing import Dict 

43from typing import FrozenSet 

44from typing import Iterable 

45from typing import Iterator 

46from typing import List 

47from typing import Mapping 

48from typing import MutableMapping 

49from typing import NamedTuple 

50from typing import NoReturn 

51from typing import Optional 

52from typing import Pattern 

53from typing import Protocol 

54from typing import Sequence 

55from typing import Set 

56from typing import Tuple 

57from typing import Type 

58from typing import TYPE_CHECKING 

59from typing import TypedDict 

60from typing import Union 

61 

62from . import base 

63from . import coercions 

64from . import crud 

65from . import elements 

66from . import functions 

67from . import operators 

68from . import roles 

69from . import schema 

70from . import selectable 

71from . import sqltypes 

72from . import util as sql_util 

73from ._typing import is_column_element 

74from ._typing import is_dml 

75from .base import _de_clone 

76from .base import _from_objects 

77from .base import _NONE_NAME 

78from .base import _SentinelDefaultCharacterization 

79from .base import NO_ARG 

80from .elements import quoted_name 

81from .sqltypes import TupleType 

82from .visitors import prefix_anon_map 

83from .. import exc 

84from .. import util 

85from ..util import FastIntFlag 

86from ..util.typing import Literal 

87from ..util.typing import Self 

88from ..util.typing import TupleAny 

89from ..util.typing import Unpack 

90 

91if typing.TYPE_CHECKING: 

92 from .annotation import _AnnotationDict 

93 from .base import _AmbiguousTableNameMap 

94 from .base import CompileState 

95 from .base import Executable 

96 from .cache_key import CacheKey 

97 from .ddl import ExecutableDDLElement 

98 from .dml import Delete 

99 from .dml import Insert 

100 from .dml import Update 

101 from .dml import UpdateBase 

102 from .dml import UpdateDMLState 

103 from .dml import ValuesBase 

104 from .elements import _truncated_label 

105 from .elements import BinaryExpression 

106 from .elements import BindParameter 

107 from .elements import ClauseElement 

108 from .elements import ColumnClause 

109 from .elements import ColumnElement 

110 from .elements import False_ 

111 from .elements import Label 

112 from .elements import Null 

113 from .elements import True_ 

114 from .functions import Function 

115 from .schema import Column 

116 from .schema import Constraint 

117 from .schema import ForeignKeyConstraint 

118 from .schema import Index 

119 from .schema import PrimaryKeyConstraint 

120 from .schema import Table 

121 from .schema import UniqueConstraint 

122 from .selectable import _ColumnsClauseElement 

123 from .selectable import AliasedReturnsRows 

124 from .selectable import CompoundSelectState 

125 from .selectable import CTE 

126 from .selectable import FromClause 

127 from .selectable import NamedFromClause 

128 from .selectable import ReturnsRows 

129 from .selectable import Select 

130 from .selectable import SelectState 

131 from .type_api import _BindProcessorType 

132 from .type_api import TypeDecorator 

133 from .type_api import TypeEngine 

134 from .type_api import UserDefinedType 

135 from .visitors import Visitable 

136 from ..engine.cursor import CursorResultMetaData 

137 from ..engine.interfaces import _CoreSingleExecuteParams 

138 from ..engine.interfaces import _DBAPIAnyExecuteParams 

139 from ..engine.interfaces import _DBAPIMultiExecuteParams 

140 from ..engine.interfaces import _DBAPISingleExecuteParams 

141 from ..engine.interfaces import _ExecuteOptions 

142 from ..engine.interfaces import _GenericSetInputSizesType 

143 from ..engine.interfaces import _MutableCoreSingleExecuteParams 

144 from ..engine.interfaces import Dialect 

145 from ..engine.interfaces import SchemaTranslateMapType 

146 

147 

148_FromHintsType = Dict["FromClause", str] 

149 

150RESERVED_WORDS = { 

151 "all", 

152 "analyse", 

153 "analyze", 

154 "and", 

155 "any", 

156 "array", 

157 "as", 

158 "asc", 

159 "asymmetric", 

160 "authorization", 

161 "between", 

162 "binary", 

163 "both", 

164 "case", 

165 "cast", 

166 "check", 

167 "collate", 

168 "column", 

169 "constraint", 

170 "create", 

171 "cross", 

172 "current_date", 

173 "current_role", 

174 "current_time", 

175 "current_timestamp", 

176 "current_user", 

177 "default", 

178 "deferrable", 

179 "desc", 

180 "distinct", 

181 "do", 

182 "else", 

183 "end", 

184 "except", 

185 "false", 

186 "for", 

187 "foreign", 

188 "freeze", 

189 "from", 

190 "full", 

191 "grant", 

192 "group", 

193 "having", 

194 "ilike", 

195 "in", 

196 "initially", 

197 "inner", 

198 "intersect", 

199 "into", 

200 "is", 

201 "isnull", 

202 "join", 

203 "leading", 

204 "left", 

205 "like", 

206 "limit", 

207 "localtime", 

208 "localtimestamp", 

209 "natural", 

210 "new", 

211 "not", 

212 "notnull", 

213 "null", 

214 "off", 

215 "offset", 

216 "old", 

217 "on", 

218 "only", 

219 "or", 

220 "order", 

221 "outer", 

222 "overlaps", 

223 "placing", 

224 "primary", 

225 "references", 

226 "right", 

227 "select", 

228 "session_user", 

229 "set", 

230 "similar", 

231 "some", 

232 "symmetric", 

233 "table", 

234 "then", 

235 "to", 

236 "trailing", 

237 "true", 

238 "union", 

239 "unique", 

240 "user", 

241 "using", 

242 "verbose", 

243 "when", 

244 "where", 

245} 

246 

247LEGAL_CHARACTERS = re.compile(r"^[A-Z0-9_$]+$", re.I) 

248LEGAL_CHARACTERS_PLUS_SPACE = re.compile(r"^[A-Z0-9_ $]+$", re.I) 

249ILLEGAL_INITIAL_CHARACTERS = {str(x) for x in range(0, 10)}.union(["$"]) 

250 

251FK_ON_DELETE = re.compile( 

252 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I 

253) 

254FK_ON_UPDATE = re.compile( 

255 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I 

256) 

257FK_INITIALLY = re.compile(r"^(?:DEFERRED|IMMEDIATE)$", re.I) 

258BIND_PARAMS = re.compile(r"(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])", re.UNICODE) 

259BIND_PARAMS_ESC = re.compile(r"\x5c(:[\w\$]*)(?![:\w\$])", re.UNICODE) 

260 

261_pyformat_template = "%%(%(name)s)s" 

262BIND_TEMPLATES = { 

263 "pyformat": _pyformat_template, 

264 "qmark": "?", 

265 "format": "%%s", 

266 "numeric": ":[_POSITION]", 

267 "numeric_dollar": "$[_POSITION]", 

268 "named": ":%(name)s", 

269} 

270 

271 

272OPERATORS = { 

273 # binary 

274 operators.and_: " AND ", 

275 operators.or_: " OR ", 

276 operators.add: " + ", 

277 operators.mul: " * ", 

278 operators.sub: " - ", 

279 operators.mod: " % ", 

280 operators.neg: "-", 

281 operators.lt: " < ", 

282 operators.le: " <= ", 

283 operators.ne: " != ", 

284 operators.gt: " > ", 

285 operators.ge: " >= ", 

286 operators.eq: " = ", 

287 operators.is_distinct_from: " IS DISTINCT FROM ", 

288 operators.is_not_distinct_from: " IS NOT DISTINCT FROM ", 

289 operators.concat_op: " || ", 

290 operators.match_op: " MATCH ", 

291 operators.not_match_op: " NOT MATCH ", 

292 operators.in_op: " IN ", 

293 operators.not_in_op: " NOT IN ", 

294 operators.comma_op: ", ", 

295 operators.from_: " FROM ", 

296 operators.as_: " AS ", 

297 operators.is_: " IS ", 

298 operators.is_not: " IS NOT ", 

299 operators.collate: " COLLATE ", 

300 # unary 

301 operators.exists: "EXISTS ", 

302 operators.distinct_op: "DISTINCT ", 

303 operators.inv: "NOT ", 

304 operators.any_op: "ANY ", 

305 operators.all_op: "ALL ", 

306 # modifiers 

307 operators.desc_op: " DESC", 

308 operators.asc_op: " ASC", 

309 operators.nulls_first_op: " NULLS FIRST", 

310 operators.nulls_last_op: " NULLS LAST", 

311 # bitwise 

312 operators.bitwise_xor_op: " ^ ", 

313 operators.bitwise_or_op: " | ", 

314 operators.bitwise_and_op: " & ", 

315 operators.bitwise_not_op: "~", 

316 operators.bitwise_lshift_op: " << ", 

317 operators.bitwise_rshift_op: " >> ", 

318} 

319 

320FUNCTIONS: Dict[Type[Function[Any]], str] = { 

321 functions.coalesce: "coalesce", 

322 functions.current_date: "CURRENT_DATE", 

323 functions.current_time: "CURRENT_TIME", 

324 functions.current_timestamp: "CURRENT_TIMESTAMP", 

325 functions.current_user: "CURRENT_USER", 

326 functions.localtime: "LOCALTIME", 

327 functions.localtimestamp: "LOCALTIMESTAMP", 

328 functions.random: "random", 

329 functions.sysdate: "sysdate", 

330 functions.session_user: "SESSION_USER", 

331 functions.user: "USER", 

332 functions.cube: "CUBE", 

333 functions.rollup: "ROLLUP", 

334 functions.grouping_sets: "GROUPING SETS", 

335} 

336 

337 

338EXTRACT_MAP = { 

339 "month": "month", 

340 "day": "day", 

341 "year": "year", 

342 "second": "second", 

343 "hour": "hour", 

344 "doy": "doy", 

345 "minute": "minute", 

346 "quarter": "quarter", 

347 "dow": "dow", 

348 "week": "week", 

349 "epoch": "epoch", 

350 "milliseconds": "milliseconds", 

351 "microseconds": "microseconds", 

352 "timezone_hour": "timezone_hour", 

353 "timezone_minute": "timezone_minute", 

354} 

355 

356COMPOUND_KEYWORDS = { 

357 selectable._CompoundSelectKeyword.UNION: "UNION", 

358 selectable._CompoundSelectKeyword.UNION_ALL: "UNION ALL", 

359 selectable._CompoundSelectKeyword.EXCEPT: "EXCEPT", 

360 selectable._CompoundSelectKeyword.EXCEPT_ALL: "EXCEPT ALL", 

361 selectable._CompoundSelectKeyword.INTERSECT: "INTERSECT", 

362 selectable._CompoundSelectKeyword.INTERSECT_ALL: "INTERSECT ALL", 

363} 

364 

365 

366class ResultColumnsEntry(NamedTuple): 

367 """Tracks a column expression that is expected to be represented 

368 in the result rows for this statement. 

369 

370 This normally refers to the columns clause of a SELECT statement 

371 but may also refer to a RETURNING clause, as well as for dialect-specific 

372 emulations. 

373 

374 """ 

375 

376 keyname: str 

377 """string name that's expected in cursor.description""" 

378 

379 name: str 

380 """column name, may be labeled""" 

381 

382 objects: Tuple[Any, ...] 

383 """sequence of objects that should be able to locate this column 

384 in a RowMapping. This is typically string names and aliases 

385 as well as Column objects. 

386 

387 """ 

388 

389 type: TypeEngine[Any] 

390 """Datatype to be associated with this column. This is where 

391 the "result processing" logic directly links the compiled statement 

392 to the rows that come back from the cursor. 

393 

394 """ 

395 

396 

397class _ResultMapAppender(Protocol): 

398 def __call__( 

399 self, 

400 keyname: str, 

401 name: str, 

402 objects: Sequence[Any], 

403 type_: TypeEngine[Any], 

404 ) -> None: ... 

405 

406 

407# integer indexes into ResultColumnsEntry used by cursor.py. 

408# some profiling showed integer access faster than named tuple 

409RM_RENDERED_NAME: Literal[0] = 0 

410RM_NAME: Literal[1] = 1 

411RM_OBJECTS: Literal[2] = 2 

412RM_TYPE: Literal[3] = 3 

413 

414 

415class _BaseCompilerStackEntry(TypedDict): 

416 asfrom_froms: Set[FromClause] 

417 correlate_froms: Set[FromClause] 

418 selectable: ReturnsRows 

419 

420 

421class _CompilerStackEntry(_BaseCompilerStackEntry, total=False): 

422 compile_state: CompileState 

423 need_result_map_for_nested: bool 

424 need_result_map_for_compound: bool 

425 select_0: ReturnsRows 

426 insert_from_select: Select[Unpack[TupleAny]] 

427 

428 

429class ExpandedState(NamedTuple): 

430 """represents state to use when producing "expanded" and 

431 "post compile" bound parameters for a statement. 

432 

433 "expanded" parameters are parameters that are generated at 

434 statement execution time to suit a number of parameters passed, the most 

435 prominent example being the individual elements inside of an IN expression. 

436 

437 "post compile" parameters are parameters where the SQL literal value 

438 will be rendered into the SQL statement at execution time, rather than 

439 being passed as separate parameters to the driver. 

440 

441 To create an :class:`.ExpandedState` instance, use the 

442 :meth:`.SQLCompiler.construct_expanded_state` method on any 

443 :class:`.SQLCompiler` instance. 

444 

445 """ 

446 

447 statement: str 

448 """String SQL statement with parameters fully expanded""" 

449 

450 parameters: _CoreSingleExecuteParams 

451 """Parameter dictionary with parameters fully expanded. 

452 

453 For a statement that uses named parameters, this dictionary will map 

454 exactly to the names in the statement. For a statement that uses 

455 positional parameters, the :attr:`.ExpandedState.positional_parameters` 

456 will yield a tuple with the positional parameter set. 

457 

458 """ 

459 

460 processors: Mapping[str, _BindProcessorType[Any]] 

461 """mapping of bound value processors""" 

462 

463 positiontup: Optional[Sequence[str]] 

464 """Sequence of string names indicating the order of positional 

465 parameters""" 

466 

467 parameter_expansion: Mapping[str, List[str]] 

468 """Mapping representing the intermediary link from original parameter 

469 name to list of "expanded" parameter names, for those parameters that 

470 were expanded.""" 

471 

472 @property 

473 def positional_parameters(self) -> Tuple[Any, ...]: 

474 """Tuple of positional parameters, for statements that were compiled 

475 using a positional paramstyle. 

476 

477 """ 

478 if self.positiontup is None: 

479 raise exc.InvalidRequestError( 

480 "statement does not use a positional paramstyle" 

481 ) 

482 return tuple(self.parameters[key] for key in self.positiontup) 

483 

484 @property 

485 def additional_parameters(self) -> _CoreSingleExecuteParams: 

486 """synonym for :attr:`.ExpandedState.parameters`.""" 

487 return self.parameters 

488 

489 

490class _InsertManyValues(NamedTuple): 

491 """represents state to use for executing an "insertmanyvalues" statement. 

492 

493 The primary consumers of this object are the 

494 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and 

495 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods. 

496 

497 .. versionadded:: 2.0 

498 

499 """ 

500 

501 is_default_expr: bool 

502 """if True, the statement is of the form 

503 ``INSERT INTO TABLE DEFAULT VALUES``, and can't be rewritten as a "batch" 

504 

505 """ 

506 

507 single_values_expr: str 

508 """The rendered "values" clause of the INSERT statement. 

509 

510 This is typically the parenthesized section e.g. "(?, ?, ?)" or similar. 

511 The insertmanyvalues logic uses this string as a search and replace 

512 target. 

513 

514 """ 

515 

516 insert_crud_params: List[crud._CrudParamElementStr] 

517 """List of Column / bind names etc. used while rewriting the statement""" 

518 

519 num_positional_params_counted: int 

520 """the number of bound parameters in a single-row statement. 

521 

522 This count may be larger or smaller than the actual number of columns 

523 targeted in the INSERT, as it accommodates for SQL expressions 

524 in the values list that may have zero or more parameters embedded 

525 within them. 

526 

527 This count is part of what's used to organize rewritten parameter lists 

528 when batching. 

529 

530 """ 

531 

532 sort_by_parameter_order: bool = False 

533 """if the deterministic_returnined_order parameter were used on the 

534 insert. 

535 

536 All of the attributes following this will only be used if this is True. 

537 

538 """ 

539 

540 includes_upsert_behaviors: bool = False 

541 """if True, we have to accommodate for upsert behaviors. 

542 

543 This will in some cases downgrade "insertmanyvalues" that requests 

544 deterministic ordering. 

545 

546 """ 

547 

548 sentinel_columns: Optional[Sequence[Column[Any]]] = None 

549 """List of sentinel columns that were located. 

550 

551 This list is only here if the INSERT asked for 

552 sort_by_parameter_order=True, 

553 and dialect-appropriate sentinel columns were located. 

554 

555 .. versionadded:: 2.0.10 

556 

557 """ 

558 

559 num_sentinel_columns: int = 0 

560 """how many sentinel columns are in the above list, if any. 

561 

562 This is the same as 

563 ``len(sentinel_columns) if sentinel_columns is not None else 0`` 

564 

565 """ 

566 

567 sentinel_param_keys: Optional[Sequence[str]] = None 

568 """parameter str keys in each param dictionary / tuple 

569 that would link to the client side "sentinel" values for that row, which 

570 we can use to match up parameter sets to result rows. 

571 

572 This is only present if sentinel_columns is present and the INSERT 

573 statement actually refers to client side values for these sentinel 

574 columns. 

575 

576 .. versionadded:: 2.0.10 

577 

578 .. versionchanged:: 2.0.29 - the sequence is now string dictionary keys 

579 only, used against the "compiled parameteters" collection before 

580 the parameters were converted by bound parameter processors 

581 

582 """ 

583 

584 implicit_sentinel: bool = False 

585 """if True, we have exactly one sentinel column and it uses a server side 

586 value, currently has to generate an incrementing integer value. 

587 

588 The dialect in question would have asserted that it supports receiving 

589 these values back and sorting on that value as a means of guaranteeing 

590 correlation with the incoming parameter list. 

591 

592 .. versionadded:: 2.0.10 

593 

594 """ 

595 

596 embed_values_counter: bool = False 

597 """Whether to embed an incrementing integer counter in each parameter 

598 set within the VALUES clause as parameters are batched over. 

599 

600 This is only used for a specific INSERT..SELECT..VALUES..RETURNING syntax 

601 where a subquery is used to produce value tuples. Current support 

602 includes PostgreSQL, Microsoft SQL Server. 

603 

604 .. versionadded:: 2.0.10 

605 

606 """ 

607 

608 

609class _InsertManyValuesBatch(NamedTuple): 

610 """represents an individual batch SQL statement for insertmanyvalues. 

611 

612 This is passed through the 

613 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and 

614 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods out 

615 to the :class:`.Connection` within the 

616 :meth:`.Connection._exec_insertmany_context` method. 

617 

618 .. versionadded:: 2.0.10 

619 

620 """ 

621 

622 replaced_statement: str 

623 replaced_parameters: _DBAPIAnyExecuteParams 

624 processed_setinputsizes: Optional[_GenericSetInputSizesType] 

625 batch: Sequence[_DBAPISingleExecuteParams] 

626 sentinel_values: Sequence[Tuple[Any, ...]] 

627 current_batch_size: int 

628 batchnum: int 

629 total_batches: int 

630 rows_sorted: bool 

631 is_downgraded: bool 

632 

633 

634class InsertmanyvaluesSentinelOpts(FastIntFlag): 

635 """bitflag enum indicating styles of PK defaults 

636 which can work as implicit sentinel columns 

637 

638 """ 

639 

640 NOT_SUPPORTED = 1 

641 AUTOINCREMENT = 2 

642 IDENTITY = 4 

643 SEQUENCE = 8 

644 

645 ANY_AUTOINCREMENT = AUTOINCREMENT | IDENTITY | SEQUENCE 

646 _SUPPORTED_OR_NOT = NOT_SUPPORTED | ANY_AUTOINCREMENT 

647 

648 USE_INSERT_FROM_SELECT = 16 

649 RENDER_SELECT_COL_CASTS = 64 

650 

651 

652class CompilerState(IntEnum): 

653 COMPILING = 0 

654 """statement is present, compilation phase in progress""" 

655 

656 STRING_APPLIED = 1 

657 """statement is present, string form of the statement has been applied. 

658 

659 Additional processors by subclasses may still be pending. 

660 

661 """ 

662 

663 NO_STATEMENT = 2 

664 """compiler does not have a statement to compile, is used 

665 for method access""" 

666 

667 

668class Linting(IntEnum): 

669 """represent preferences for the 'SQL linting' feature. 

670 

671 this feature currently includes support for flagging cartesian products 

672 in SQL statements. 

673 

674 """ 

675 

676 NO_LINTING = 0 

677 "Disable all linting." 

678 

679 COLLECT_CARTESIAN_PRODUCTS = 1 

680 """Collect data on FROMs and cartesian products and gather into 

681 'self.from_linter'""" 

682 

683 WARN_LINTING = 2 

684 "Emit warnings for linters that find problems" 

685 

686 FROM_LINTING = COLLECT_CARTESIAN_PRODUCTS | WARN_LINTING 

687 """Warn for cartesian products; combines COLLECT_CARTESIAN_PRODUCTS 

688 and WARN_LINTING""" 

689 

690 

691NO_LINTING, COLLECT_CARTESIAN_PRODUCTS, WARN_LINTING, FROM_LINTING = tuple( 

692 Linting 

693) 

694 

695 

696class FromLinter(collections.namedtuple("FromLinter", ["froms", "edges"])): 

697 """represents current state for the "cartesian product" detection 

698 feature.""" 

699 

700 def lint(self, start=None): 

701 froms = self.froms 

702 if not froms: 

703 return None, None 

704 

705 edges = set(self.edges) 

706 the_rest = set(froms) 

707 

708 if start is not None: 

709 start_with = start 

710 the_rest.remove(start_with) 

711 else: 

712 start_with = the_rest.pop() 

713 

714 stack = collections.deque([start_with]) 

715 

716 while stack and the_rest: 

717 node = stack.popleft() 

718 the_rest.discard(node) 

719 

720 # comparison of nodes in edges here is based on hash equality, as 

721 # there are "annotated" elements that match the non-annotated ones. 

722 # to remove the need for in-python hash() calls, use native 

723 # containment routines (e.g. "node in edge", "edge.index(node)") 

724 to_remove = {edge for edge in edges if node in edge} 

725 

726 # appendleft the node in each edge that is not 

727 # the one that matched. 

728 stack.extendleft(edge[not edge.index(node)] for edge in to_remove) 

729 edges.difference_update(to_remove) 

730 

731 # FROMS left over? boom 

732 if the_rest: 

733 return the_rest, start_with 

734 else: 

735 return None, None 

736 

737 def warn(self, stmt_type="SELECT"): 

738 the_rest, start_with = self.lint() 

739 

740 # FROMS left over? boom 

741 if the_rest: 

742 froms = the_rest 

743 if froms: 

744 template = ( 

745 "{stmt_type} statement has a cartesian product between " 

746 "FROM element(s) {froms} and " 

747 'FROM element "{start}". Apply join condition(s) ' 

748 "between each element to resolve." 

749 ) 

750 froms_str = ", ".join( 

751 f'"{self.froms[from_]}"' for from_ in froms 

752 ) 

753 message = template.format( 

754 stmt_type=stmt_type, 

755 froms=froms_str, 

756 start=self.froms[start_with], 

757 ) 

758 

759 util.warn(message) 

760 

761 

762class Compiled: 

763 """Represent a compiled SQL or DDL expression. 

764 

765 The ``__str__`` method of the ``Compiled`` object should produce 

766 the actual text of the statement. ``Compiled`` objects are 

767 specific to their underlying database dialect, and also may 

768 or may not be specific to the columns referenced within a 

769 particular set of bind parameters. In no case should the 

770 ``Compiled`` object be dependent on the actual values of those 

771 bind parameters, even though it may reference those values as 

772 defaults. 

773 """ 

774 

775 statement: Optional[ClauseElement] = None 

776 "The statement to compile." 

777 string: str = "" 

778 "The string representation of the ``statement``" 

779 

780 state: CompilerState 

781 """description of the compiler's state""" 

782 

783 is_sql = False 

784 is_ddl = False 

785 

786 _cached_metadata: Optional[CursorResultMetaData] = None 

787 

788 _result_columns: Optional[List[ResultColumnsEntry]] = None 

789 

790 schema_translate_map: Optional[SchemaTranslateMapType] = None 

791 

792 execution_options: _ExecuteOptions = util.EMPTY_DICT 

793 """ 

794 Execution options propagated from the statement. In some cases, 

795 sub-elements of the statement can modify these. 

796 """ 

797 

798 preparer: IdentifierPreparer 

799 

800 _annotations: _AnnotationDict = util.EMPTY_DICT 

801 

802 compile_state: Optional[CompileState] = None 

803 """Optional :class:`.CompileState` object that maintains additional 

804 state used by the compiler. 

805 

806 Major executable objects such as :class:`_expression.Insert`, 

807 :class:`_expression.Update`, :class:`_expression.Delete`, 

808 :class:`_expression.Select` will generate this 

809 state when compiled in order to calculate additional information about the 

810 object. For the top level object that is to be executed, the state can be 

811 stored here where it can also have applicability towards result set 

812 processing. 

813 

814 .. versionadded:: 1.4 

815 

816 """ 

817 

818 dml_compile_state: Optional[CompileState] = None 

819 """Optional :class:`.CompileState` assigned at the same point that 

820 .isinsert, .isupdate, or .isdelete is assigned. 

821 

822 This will normally be the same object as .compile_state, with the 

823 exception of cases like the :class:`.ORMFromStatementCompileState` 

824 object. 

825 

826 .. versionadded:: 1.4.40 

827 

828 """ 

829 

830 cache_key: Optional[CacheKey] = None 

831 """The :class:`.CacheKey` that was generated ahead of creating this 

832 :class:`.Compiled` object. 

833 

834 This is used for routines that need access to the original 

835 :class:`.CacheKey` instance generated when the :class:`.Compiled` 

836 instance was first cached, typically in order to reconcile 

837 the original list of :class:`.BindParameter` objects with a 

838 per-statement list that's generated on each call. 

839 

840 """ 

841 

842 _gen_time: float 

843 """Generation time of this :class:`.Compiled`, used for reporting 

844 cache stats.""" 

845 

846 def __init__( 

847 self, 

848 dialect: Dialect, 

849 statement: Optional[ClauseElement], 

850 schema_translate_map: Optional[SchemaTranslateMapType] = None, 

851 render_schema_translate: bool = False, 

852 compile_kwargs: Mapping[str, Any] = util.immutabledict(), 

853 ): 

854 """Construct a new :class:`.Compiled` object. 

855 

856 :param dialect: :class:`.Dialect` to compile against. 

857 

858 :param statement: :class:`_expression.ClauseElement` to be compiled. 

859 

860 :param schema_translate_map: dictionary of schema names to be 

861 translated when forming the resultant SQL 

862 

863 .. seealso:: 

864 

865 :ref:`schema_translating` 

866 

867 :param compile_kwargs: additional kwargs that will be 

868 passed to the initial call to :meth:`.Compiled.process`. 

869 

870 

871 """ 

872 self.dialect = dialect 

873 self.preparer = self.dialect.identifier_preparer 

874 if schema_translate_map: 

875 self.schema_translate_map = schema_translate_map 

876 self.preparer = self.preparer._with_schema_translate( 

877 schema_translate_map 

878 ) 

879 

880 if statement is not None: 

881 self.state = CompilerState.COMPILING 

882 self.statement = statement 

883 self.can_execute = statement.supports_execution 

884 self._annotations = statement._annotations 

885 if self.can_execute: 

886 if TYPE_CHECKING: 

887 assert isinstance(statement, Executable) 

888 self.execution_options = statement._execution_options 

889 self.string = self.process(self.statement, **compile_kwargs) 

890 

891 if render_schema_translate: 

892 assert schema_translate_map is not None 

893 self.string = self.preparer._render_schema_translates( 

894 self.string, schema_translate_map 

895 ) 

896 

897 self.state = CompilerState.STRING_APPLIED 

898 else: 

899 self.state = CompilerState.NO_STATEMENT 

900 

901 self._gen_time = perf_counter() 

902 

903 def __init_subclass__(cls) -> None: 

904 cls._init_compiler_cls() 

905 return super().__init_subclass__() 

906 

907 @classmethod 

908 def _init_compiler_cls(cls): 

909 pass 

910 

911 def _execute_on_connection( 

912 self, connection, distilled_params, execution_options 

913 ): 

914 if self.can_execute: 

915 return connection._execute_compiled( 

916 self, distilled_params, execution_options 

917 ) 

918 else: 

919 raise exc.ObjectNotExecutableError(self.statement) 

920 

921 def visit_unsupported_compilation(self, element, err, **kw): 

922 raise exc.UnsupportedCompilationError(self, type(element)) from err 

923 

924 @property 

925 def sql_compiler(self) -> SQLCompiler: 

926 """Return a Compiled that is capable of processing SQL expressions. 

927 

928 If this compiler is one, it would likely just return 'self'. 

929 

930 """ 

931 

932 raise NotImplementedError() 

933 

934 def process(self, obj: Visitable, **kwargs: Any) -> str: 

935 return obj._compiler_dispatch(self, **kwargs) 

936 

937 def __str__(self) -> str: 

938 """Return the string text of the generated SQL or DDL.""" 

939 

940 if self.state is CompilerState.STRING_APPLIED: 

941 return self.string 

942 else: 

943 return "" 

944 

945 def construct_params( 

946 self, 

947 params: Optional[_CoreSingleExecuteParams] = None, 

948 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None, 

949 escape_names: bool = True, 

950 ) -> Optional[_MutableCoreSingleExecuteParams]: 

951 """Return the bind params for this compiled object. 

952 

953 :param params: a dict of string/object pairs whose values will 

954 override bind values compiled in to the 

955 statement. 

956 """ 

957 

958 raise NotImplementedError() 

959 

960 @property 

961 def params(self): 

962 """Return the bind params for this compiled object.""" 

963 return self.construct_params() 

964 

965 

966class TypeCompiler(util.EnsureKWArg): 

967 """Produces DDL specification for TypeEngine objects.""" 

968 

969 ensure_kwarg = r"visit_\w+" 

970 

971 def __init__(self, dialect: Dialect): 

972 self.dialect = dialect 

973 

974 def process(self, type_: TypeEngine[Any], **kw: Any) -> str: 

975 if ( 

976 type_._variant_mapping 

977 and self.dialect.name in type_._variant_mapping 

978 ): 

979 type_ = type_._variant_mapping[self.dialect.name] 

980 return type_._compiler_dispatch(self, **kw) 

981 

982 def visit_unsupported_compilation( 

983 self, element: Any, err: Exception, **kw: Any 

984 ) -> NoReturn: 

985 raise exc.UnsupportedCompilationError(self, element) from err 

986 

987 

988# this was a Visitable, but to allow accurate detection of 

989# column elements this is actually a column element 

990class _CompileLabel( 

991 roles.BinaryElementRole[Any], elements.CompilerColumnElement 

992): 

993 """lightweight label object which acts as an expression.Label.""" 

994 

995 __visit_name__ = "label" 

996 __slots__ = "element", "name", "_alt_names" 

997 

998 def __init__(self, col, name, alt_names=()): 

999 self.element = col 

1000 self.name = name 

1001 self._alt_names = (col,) + alt_names 

1002 

1003 @property 

1004 def proxy_set(self): 

1005 return self.element.proxy_set 

1006 

1007 @property 

1008 def type(self): 

1009 return self.element.type 

1010 

1011 def self_group(self, **kw): 

1012 return self 

1013 

1014 

1015class ilike_case_insensitive( 

1016 roles.BinaryElementRole[Any], elements.CompilerColumnElement 

1017): 

1018 """produce a wrapping element for a case-insensitive portion of 

1019 an ILIKE construct. 

1020 

1021 The construct usually renders the ``lower()`` function, but on 

1022 PostgreSQL will pass silently with the assumption that "ILIKE" 

1023 is being used. 

1024 

1025 .. versionadded:: 2.0 

1026 

1027 """ 

1028 

1029 __visit_name__ = "ilike_case_insensitive_operand" 

1030 __slots__ = "element", "comparator" 

1031 

1032 def __init__(self, element): 

1033 self.element = element 

1034 self.comparator = element.comparator 

1035 

1036 @property 

1037 def proxy_set(self): 

1038 return self.element.proxy_set 

1039 

1040 @property 

1041 def type(self): 

1042 return self.element.type 

1043 

1044 def self_group(self, **kw): 

1045 return self 

1046 

1047 def _with_binary_element_type(self, type_): 

1048 return ilike_case_insensitive( 

1049 self.element._with_binary_element_type(type_) 

1050 ) 

1051 

1052 

1053class SQLCompiler(Compiled): 

1054 """Default implementation of :class:`.Compiled`. 

1055 

1056 Compiles :class:`_expression.ClauseElement` objects into SQL strings. 

1057 

1058 """ 

1059 

1060 extract_map = EXTRACT_MAP 

1061 

1062 bindname_escape_characters: ClassVar[Mapping[str, str]] = ( 

1063 util.immutabledict( 

1064 { 

1065 "%": "P", 

1066 "(": "A", 

1067 ")": "Z", 

1068 ":": "C", 

1069 ".": "_", 

1070 "[": "_", 

1071 "]": "_", 

1072 " ": "_", 

1073 } 

1074 ) 

1075 ) 

1076 """A mapping (e.g. dict or similar) containing a lookup of 

1077 characters keyed to replacement characters which will be applied to all 

1078 'bind names' used in SQL statements as a form of 'escaping'; the given 

1079 characters are replaced entirely with the 'replacement' character when 

1080 rendered in the SQL statement, and a similar translation is performed 

1081 on the incoming names used in parameter dictionaries passed to methods 

1082 like :meth:`_engine.Connection.execute`. 

1083 

1084 This allows bound parameter names used in :func:`_sql.bindparam` and 

1085 other constructs to have any arbitrary characters present without any 

1086 concern for characters that aren't allowed at all on the target database. 

1087 

1088 Third party dialects can establish their own dictionary here to replace the 

1089 default mapping, which will ensure that the particular characters in the 

1090 mapping will never appear in a bound parameter name. 

1091 

1092 The dictionary is evaluated at **class creation time**, so cannot be 

1093 modified at runtime; it must be present on the class when the class 

1094 is first declared. 

1095 

1096 Note that for dialects that have additional bound parameter rules such 

1097 as additional restrictions on leading characters, the 

1098 :meth:`_sql.SQLCompiler.bindparam_string` method may need to be augmented. 

1099 See the cx_Oracle compiler for an example of this. 

1100 

1101 .. versionadded:: 2.0.0rc1 

1102 

1103 """ 

1104 

1105 _bind_translate_re: ClassVar[Pattern[str]] 

1106 _bind_translate_chars: ClassVar[Mapping[str, str]] 

1107 

1108 is_sql = True 

1109 

1110 compound_keywords = COMPOUND_KEYWORDS 

1111 

1112 isdelete: bool = False 

1113 isinsert: bool = False 

1114 isupdate: bool = False 

1115 """class-level defaults which can be set at the instance 

1116 level to define if this Compiled instance represents 

1117 INSERT/UPDATE/DELETE 

1118 """ 

1119 

1120 postfetch: Optional[List[Column[Any]]] 

1121 """list of columns that can be post-fetched after INSERT or UPDATE to 

1122 receive server-updated values""" 

1123 

1124 insert_prefetch: Sequence[Column[Any]] = () 

1125 """list of columns for which default values should be evaluated before 

1126 an INSERT takes place""" 

1127 

1128 update_prefetch: Sequence[Column[Any]] = () 

1129 """list of columns for which onupdate default values should be evaluated 

1130 before an UPDATE takes place""" 

1131 

1132 implicit_returning: Optional[Sequence[ColumnElement[Any]]] = None 

1133 """list of "implicit" returning columns for a toplevel INSERT or UPDATE 

1134 statement, used to receive newly generated values of columns. 

1135 

1136 .. versionadded:: 2.0 ``implicit_returning`` replaces the previous 

1137 ``returning`` collection, which was not a generalized RETURNING 

1138 collection and instead was in fact specific to the "implicit returning" 

1139 feature. 

1140 

1141 """ 

1142 

1143 isplaintext: bool = False 

1144 

1145 binds: Dict[str, BindParameter[Any]] 

1146 """a dictionary of bind parameter keys to BindParameter instances.""" 

1147 

1148 bind_names: Dict[BindParameter[Any], str] 

1149 """a dictionary of BindParameter instances to "compiled" names 

1150 that are actually present in the generated SQL""" 

1151 

1152 stack: List[_CompilerStackEntry] 

1153 """major statements such as SELECT, INSERT, UPDATE, DELETE are 

1154 tracked in this stack using an entry format.""" 

1155 

1156 returning_precedes_values: bool = False 

1157 """set to True classwide to generate RETURNING 

1158 clauses before the VALUES or WHERE clause (i.e. MSSQL) 

1159 """ 

1160 

1161 render_table_with_column_in_update_from: bool = False 

1162 """set to True classwide to indicate the SET clause 

1163 in a multi-table UPDATE statement should qualify 

1164 columns with the table name (i.e. MySQL only) 

1165 """ 

1166 

1167 ansi_bind_rules: bool = False 

1168 """SQL 92 doesn't allow bind parameters to be used 

1169 in the columns clause of a SELECT, nor does it allow 

1170 ambiguous expressions like "? = ?". A compiler 

1171 subclass can set this flag to False if the target 

1172 driver/DB enforces this 

1173 """ 

1174 

1175 bindtemplate: str 

1176 """template to render bound parameters based on paramstyle.""" 

1177 

1178 compilation_bindtemplate: str 

1179 """template used by compiler to render parameters before positional 

1180 paramstyle application""" 

1181 

1182 _numeric_binds_identifier_char: str 

1183 """Character that's used to as the identifier of a numerical bind param. 

1184 For example if this char is set to ``$``, numerical binds will be rendered 

1185 in the form ``$1, $2, $3``. 

1186 """ 

1187 

1188 _result_columns: List[ResultColumnsEntry] 

1189 """relates label names in the final SQL to a tuple of local 

1190 column/label name, ColumnElement object (if any) and 

1191 TypeEngine. CursorResult uses this for type processing and 

1192 column targeting""" 

1193 

1194 _textual_ordered_columns: bool = False 

1195 """tell the result object that the column names as rendered are important, 

1196 but they are also "ordered" vs. what is in the compiled object here. 

1197 

1198 As of 1.4.42 this condition is only present when the statement is a 

1199 TextualSelect, e.g. text("....").columns(...), where it is required 

1200 that the columns are considered positionally and not by name. 

1201 

1202 """ 

1203 

1204 _ad_hoc_textual: bool = False 

1205 """tell the result that we encountered text() or '*' constructs in the 

1206 middle of the result columns, but we also have compiled columns, so 

1207 if the number of columns in cursor.description does not match how many 

1208 expressions we have, that means we can't rely on positional at all and 

1209 should match on name. 

1210 

1211 """ 

1212 

1213 _ordered_columns: bool = True 

1214 """ 

1215 if False, means we can't be sure the list of entries 

1216 in _result_columns is actually the rendered order. Usually 

1217 True unless using an unordered TextualSelect. 

1218 """ 

1219 

1220 _loose_column_name_matching: bool = False 

1221 """tell the result object that the SQL statement is textual, wants to match 

1222 up to Column objects, and may be using the ._tq_label in the SELECT rather 

1223 than the base name. 

1224 

1225 """ 

1226 

1227 _numeric_binds: bool = False 

1228 """ 

1229 True if paramstyle is "numeric". This paramstyle is trickier than 

1230 all the others. 

1231 

1232 """ 

1233 

1234 _render_postcompile: bool = False 

1235 """ 

1236 whether to render out POSTCOMPILE params during the compile phase. 

1237 

1238 This attribute is used only for end-user invocation of stmt.compile(); 

1239 it's never used for actual statement execution, where instead the 

1240 dialect internals access and render the internal postcompile structure 

1241 directly. 

1242 

1243 """ 

1244 

1245 _post_compile_expanded_state: Optional[ExpandedState] = None 

1246 """When render_postcompile is used, the ``ExpandedState`` used to create 

1247 the "expanded" SQL is assigned here, and then used by the ``.params`` 

1248 accessor and ``.construct_params()`` methods for their return values. 

1249 

1250 .. versionadded:: 2.0.0rc1 

1251 

1252 """ 

1253 

1254 _pre_expanded_string: Optional[str] = None 

1255 """Stores the original string SQL before 'post_compile' is applied, 

1256 for cases where 'post_compile' were used. 

1257 

1258 """ 

1259 

1260 _pre_expanded_positiontup: Optional[List[str]] = None 

1261 

1262 _insertmanyvalues: Optional[_InsertManyValues] = None 

1263 

1264 _insert_crud_params: Optional[crud._CrudParamSequence] = None 

1265 

1266 literal_execute_params: FrozenSet[BindParameter[Any]] = frozenset() 

1267 """bindparameter objects that are rendered as literal values at statement 

1268 execution time. 

1269 

1270 """ 

1271 

1272 post_compile_params: FrozenSet[BindParameter[Any]] = frozenset() 

1273 """bindparameter objects that are rendered as bound parameter placeholders 

1274 at statement execution time. 

1275 

1276 """ 

1277 

1278 escaped_bind_names: util.immutabledict[str, str] = util.EMPTY_DICT 

1279 """Late escaping of bound parameter names that has to be converted 

1280 to the original name when looking in the parameter dictionary. 

1281 

1282 """ 

1283 

1284 has_out_parameters = False 

1285 """if True, there are bindparam() objects that have the isoutparam 

1286 flag set.""" 

1287 

1288 postfetch_lastrowid = False 

1289 """if True, and this in insert, use cursor.lastrowid to populate 

1290 result.inserted_primary_key. """ 

1291 

1292 _cache_key_bind_match: Optional[ 

1293 Tuple[ 

1294 Dict[ 

1295 BindParameter[Any], 

1296 List[BindParameter[Any]], 

1297 ], 

1298 Dict[ 

1299 str, 

1300 BindParameter[Any], 

1301 ], 

1302 ] 

1303 ] = None 

1304 """a mapping that will relate the BindParameter object we compile 

1305 to those that are part of the extracted collection of parameters 

1306 in the cache key, if we were given a cache key. 

1307 

1308 """ 

1309 

1310 positiontup: Optional[List[str]] = None 

1311 """for a compiled construct that uses a positional paramstyle, will be 

1312 a sequence of strings, indicating the names of bound parameters in order. 

1313 

1314 This is used in order to render bound parameters in their correct order, 

1315 and is combined with the :attr:`_sql.Compiled.params` dictionary to 

1316 render parameters. 

1317 

1318 This sequence always contains the unescaped name of the parameters. 

1319 

1320 .. seealso:: 

1321 

1322 :ref:`faq_sql_expression_string` - includes a usage example for 

1323 debugging use cases. 

1324 

1325 """ 

1326 _values_bindparam: Optional[List[str]] = None 

1327 

1328 _visited_bindparam: Optional[List[str]] = None 

1329 

1330 inline: bool = False 

1331 

1332 ctes: Optional[MutableMapping[CTE, str]] 

1333 

1334 # Detect same CTE references - Dict[(level, name), cte] 

1335 # Level is required for supporting nesting 

1336 ctes_by_level_name: Dict[Tuple[int, str], CTE] 

1337 

1338 # To retrieve key/level in ctes_by_level_name - 

1339 # Dict[cte_reference, (level, cte_name, cte_opts)] 

1340 level_name_by_cte: Dict[CTE, Tuple[int, str, selectable._CTEOpts]] 

1341 

1342 ctes_recursive: bool 

1343 

1344 _post_compile_pattern = re.compile(r"__\[POSTCOMPILE_(\S+?)(~~.+?~~)?\]") 

1345 _pyformat_pattern = re.compile(r"%\(([^)]+?)\)s") 

1346 _positional_pattern = re.compile( 

1347 f"{_pyformat_pattern.pattern}|{_post_compile_pattern.pattern}" 

1348 ) 

1349 

1350 @classmethod 

1351 def _init_compiler_cls(cls): 

1352 cls._init_bind_translate() 

1353 

1354 @classmethod 

1355 def _init_bind_translate(cls): 

1356 reg = re.escape("".join(cls.bindname_escape_characters)) 

1357 cls._bind_translate_re = re.compile(f"[{reg}]") 

1358 cls._bind_translate_chars = cls.bindname_escape_characters 

1359 

1360 def __init__( 

1361 self, 

1362 dialect: Dialect, 

1363 statement: Optional[ClauseElement], 

1364 cache_key: Optional[CacheKey] = None, 

1365 column_keys: Optional[Sequence[str]] = None, 

1366 for_executemany: bool = False, 

1367 linting: Linting = NO_LINTING, 

1368 _supporting_against: Optional[SQLCompiler] = None, 

1369 **kwargs: Any, 

1370 ): 

1371 """Construct a new :class:`.SQLCompiler` object. 

1372 

1373 :param dialect: :class:`.Dialect` to be used 

1374 

1375 :param statement: :class:`_expression.ClauseElement` to be compiled 

1376 

1377 :param column_keys: a list of column names to be compiled into an 

1378 INSERT or UPDATE statement. 

1379 

1380 :param for_executemany: whether INSERT / UPDATE statements should 

1381 expect that they are to be invoked in an "executemany" style, 

1382 which may impact how the statement will be expected to return the 

1383 values of defaults and autoincrement / sequences and similar. 

1384 Depending on the backend and driver in use, support for retrieving 

1385 these values may be disabled which means SQL expressions may 

1386 be rendered inline, RETURNING may not be rendered, etc. 

1387 

1388 :param kwargs: additional keyword arguments to be consumed by the 

1389 superclass. 

1390 

1391 """ 

1392 self.column_keys = column_keys 

1393 

1394 self.cache_key = cache_key 

1395 

1396 if cache_key: 

1397 cksm = {b.key: b for b in cache_key[1]} 

1398 ckbm = {b: [b] for b in cache_key[1]} 

1399 self._cache_key_bind_match = (ckbm, cksm) 

1400 

1401 # compile INSERT/UPDATE defaults/sequences to expect executemany 

1402 # style execution, which may mean no pre-execute of defaults, 

1403 # or no RETURNING 

1404 self.for_executemany = for_executemany 

1405 

1406 self.linting = linting 

1407 

1408 # a dictionary of bind parameter keys to BindParameter 

1409 # instances. 

1410 self.binds = {} 

1411 

1412 # a dictionary of BindParameter instances to "compiled" names 

1413 # that are actually present in the generated SQL 

1414 self.bind_names = util.column_dict() 

1415 

1416 # stack which keeps track of nested SELECT statements 

1417 self.stack = [] 

1418 

1419 self._result_columns = [] 

1420 

1421 # true if the paramstyle is positional 

1422 self.positional = dialect.positional 

1423 if self.positional: 

1424 self._numeric_binds = nb = dialect.paramstyle.startswith("numeric") 

1425 if nb: 

1426 self._numeric_binds_identifier_char = ( 

1427 "$" if dialect.paramstyle == "numeric_dollar" else ":" 

1428 ) 

1429 

1430 self.compilation_bindtemplate = _pyformat_template 

1431 else: 

1432 self.compilation_bindtemplate = BIND_TEMPLATES[dialect.paramstyle] 

1433 

1434 self.ctes = None 

1435 

1436 self.label_length = ( 

1437 dialect.label_length or dialect.max_identifier_length 

1438 ) 

1439 

1440 # a map which tracks "anonymous" identifiers that are created on 

1441 # the fly here 

1442 self.anon_map = prefix_anon_map() 

1443 

1444 # a map which tracks "truncated" names based on 

1445 # dialect.label_length or dialect.max_identifier_length 

1446 self.truncated_names: Dict[Tuple[str, str], str] = {} 

1447 self._truncated_counters: Dict[str, int] = {} 

1448 

1449 Compiled.__init__(self, dialect, statement, **kwargs) 

1450 

1451 if self.isinsert or self.isupdate or self.isdelete: 

1452 if TYPE_CHECKING: 

1453 assert isinstance(statement, UpdateBase) 

1454 

1455 if self.isinsert or self.isupdate: 

1456 if TYPE_CHECKING: 

1457 assert isinstance(statement, ValuesBase) 

1458 if statement._inline: 

1459 self.inline = True 

1460 elif self.for_executemany and ( 

1461 not self.isinsert 

1462 or ( 

1463 self.dialect.insert_executemany_returning 

1464 and statement._return_defaults 

1465 ) 

1466 ): 

1467 self.inline = True 

1468 

1469 self.bindtemplate = BIND_TEMPLATES[dialect.paramstyle] 

1470 

1471 if _supporting_against: 

1472 self.__dict__.update( 

1473 { 

1474 k: v 

1475 for k, v in _supporting_against.__dict__.items() 

1476 if k 

1477 not in { 

1478 "state", 

1479 "dialect", 

1480 "preparer", 

1481 "positional", 

1482 "_numeric_binds", 

1483 "compilation_bindtemplate", 

1484 "bindtemplate", 

1485 } 

1486 } 

1487 ) 

1488 

1489 if self.state is CompilerState.STRING_APPLIED: 

1490 if self.positional: 

1491 if self._numeric_binds: 

1492 self._process_numeric() 

1493 else: 

1494 self._process_positional() 

1495 

1496 if self._render_postcompile: 

1497 parameters = self.construct_params( 

1498 escape_names=False, 

1499 _no_postcompile=True, 

1500 ) 

1501 

1502 self._process_parameters_for_postcompile( 

1503 parameters, _populate_self=True 

1504 ) 

1505 

1506 @property 

1507 def insert_single_values_expr(self) -> Optional[str]: 

1508 """When an INSERT is compiled with a single set of parameters inside 

1509 a VALUES expression, the string is assigned here, where it can be 

1510 used for insert batching schemes to rewrite the VALUES expression. 

1511 

1512 .. versionchanged:: 2.0 This collection is no longer used by 

1513 SQLAlchemy's built-in dialects, in favor of the currently 

1514 internal ``_insertmanyvalues`` collection that is used only by 

1515 :class:`.SQLCompiler`. 

1516 

1517 """ 

1518 if self._insertmanyvalues is None: 

1519 return None 

1520 else: 

1521 return self._insertmanyvalues.single_values_expr 

1522 

1523 @util.ro_memoized_property 

1524 def effective_returning(self) -> Optional[Sequence[ColumnElement[Any]]]: 

1525 """The effective "returning" columns for INSERT, UPDATE or DELETE. 

1526 

1527 This is either the so-called "implicit returning" columns which are 

1528 calculated by the compiler on the fly, or those present based on what's 

1529 present in ``self.statement._returning`` (expanded into individual 

1530 columns using the ``._all_selected_columns`` attribute) i.e. those set 

1531 explicitly using the :meth:`.UpdateBase.returning` method. 

1532 

1533 .. versionadded:: 2.0 

1534 

1535 """ 

1536 if self.implicit_returning: 

1537 return self.implicit_returning 

1538 elif self.statement is not None and is_dml(self.statement): 

1539 return [ 

1540 c 

1541 for c in self.statement._all_selected_columns 

1542 if is_column_element(c) 

1543 ] 

1544 

1545 else: 

1546 return None 

1547 

1548 @property 

1549 def returning(self): 

1550 """backwards compatibility; returns the 

1551 effective_returning collection. 

1552 

1553 """ 

1554 return self.effective_returning 

1555 

1556 @property 

1557 def current_executable(self): 

1558 """Return the current 'executable' that is being compiled. 

1559 

1560 This is currently the :class:`_sql.Select`, :class:`_sql.Insert`, 

1561 :class:`_sql.Update`, :class:`_sql.Delete`, 

1562 :class:`_sql.CompoundSelect` object that is being compiled. 

1563 Specifically it's assigned to the ``self.stack`` list of elements. 

1564 

1565 When a statement like the above is being compiled, it normally 

1566 is also assigned to the ``.statement`` attribute of the 

1567 :class:`_sql.Compiler` object. However, all SQL constructs are 

1568 ultimately nestable, and this attribute should never be consulted 

1569 by a ``visit_`` method, as it is not guaranteed to be assigned 

1570 nor guaranteed to correspond to the current statement being compiled. 

1571 

1572 """ 

1573 try: 

1574 return self.stack[-1]["selectable"] 

1575 except IndexError as ie: 

1576 raise IndexError("Compiler does not have a stack entry") from ie 

1577 

1578 @property 

1579 def prefetch(self): 

1580 return list(self.insert_prefetch) + list(self.update_prefetch) 

1581 

1582 @util.memoized_property 

1583 def _global_attributes(self) -> Dict[Any, Any]: 

1584 return {} 

1585 

1586 @util.memoized_instancemethod 

1587 def _init_cte_state(self) -> MutableMapping[CTE, str]: 

1588 """Initialize collections related to CTEs only if 

1589 a CTE is located, to save on the overhead of 

1590 these collections otherwise. 

1591 

1592 """ 

1593 # collect CTEs to tack on top of a SELECT 

1594 # To store the query to print - Dict[cte, text_query] 

1595 ctes: MutableMapping[CTE, str] = util.OrderedDict() 

1596 self.ctes = ctes 

1597 

1598 # Detect same CTE references - Dict[(level, name), cte] 

1599 # Level is required for supporting nesting 

1600 self.ctes_by_level_name = {} 

1601 

1602 # To retrieve key/level in ctes_by_level_name - 

1603 # Dict[cte_reference, (level, cte_name, cte_opts)] 

1604 self.level_name_by_cte = {} 

1605 

1606 self.ctes_recursive = False 

1607 

1608 return ctes 

1609 

1610 @contextlib.contextmanager 

1611 def _nested_result(self): 

1612 """special API to support the use case of 'nested result sets'""" 

1613 result_columns, ordered_columns = ( 

1614 self._result_columns, 

1615 self._ordered_columns, 

1616 ) 

1617 self._result_columns, self._ordered_columns = [], False 

1618 

1619 try: 

1620 if self.stack: 

1621 entry = self.stack[-1] 

1622 entry["need_result_map_for_nested"] = True 

1623 else: 

1624 entry = None 

1625 yield self._result_columns, self._ordered_columns 

1626 finally: 

1627 if entry: 

1628 entry.pop("need_result_map_for_nested") 

1629 self._result_columns, self._ordered_columns = ( 

1630 result_columns, 

1631 ordered_columns, 

1632 ) 

1633 

1634 def _process_positional(self): 

1635 assert not self.positiontup 

1636 assert self.state is CompilerState.STRING_APPLIED 

1637 assert not self._numeric_binds 

1638 

1639 if self.dialect.paramstyle == "format": 

1640 placeholder = "%s" 

1641 else: 

1642 assert self.dialect.paramstyle == "qmark" 

1643 placeholder = "?" 

1644 

1645 positions = [] 

1646 

1647 def find_position(m: re.Match[str]) -> str: 

1648 normal_bind = m.group(1) 

1649 if normal_bind: 

1650 positions.append(normal_bind) 

1651 return placeholder 

1652 else: 

1653 # this a post-compile bind 

1654 positions.append(m.group(2)) 

1655 return m.group(0) 

1656 

1657 self.string = re.sub( 

1658 self._positional_pattern, find_position, self.string 

1659 ) 

1660 

1661 if self.escaped_bind_names: 

1662 reverse_escape = {v: k for k, v in self.escaped_bind_names.items()} 

1663 assert len(self.escaped_bind_names) == len(reverse_escape) 

1664 self.positiontup = [ 

1665 reverse_escape.get(name, name) for name in positions 

1666 ] 

1667 else: 

1668 self.positiontup = positions 

1669 

1670 if self._insertmanyvalues: 

1671 positions = [] 

1672 

1673 single_values_expr = re.sub( 

1674 self._positional_pattern, 

1675 find_position, 

1676 self._insertmanyvalues.single_values_expr, 

1677 ) 

1678 insert_crud_params = [ 

1679 ( 

1680 v[0], 

1681 v[1], 

1682 re.sub(self._positional_pattern, find_position, v[2]), 

1683 v[3], 

1684 ) 

1685 for v in self._insertmanyvalues.insert_crud_params 

1686 ] 

1687 

1688 self._insertmanyvalues = self._insertmanyvalues._replace( 

1689 single_values_expr=single_values_expr, 

1690 insert_crud_params=insert_crud_params, 

1691 ) 

1692 

1693 def _process_numeric(self): 

1694 assert self._numeric_binds 

1695 assert self.state is CompilerState.STRING_APPLIED 

1696 

1697 num = 1 

1698 param_pos: Dict[str, str] = {} 

1699 order: Iterable[str] 

1700 if self._insertmanyvalues and self._values_bindparam is not None: 

1701 # bindparams that are not in values are always placed first. 

1702 # this avoids the need of changing them when using executemany 

1703 # values () () 

1704 order = itertools.chain( 

1705 ( 

1706 name 

1707 for name in self.bind_names.values() 

1708 if name not in self._values_bindparam 

1709 ), 

1710 self.bind_names.values(), 

1711 ) 

1712 else: 

1713 order = self.bind_names.values() 

1714 

1715 for bind_name in order: 

1716 if bind_name in param_pos: 

1717 continue 

1718 bind = self.binds[bind_name] 

1719 if ( 

1720 bind in self.post_compile_params 

1721 or bind in self.literal_execute_params 

1722 ): 

1723 # set to None to just mark the in positiontup, it will not 

1724 # be replaced below. 

1725 param_pos[bind_name] = None # type: ignore 

1726 else: 

1727 ph = f"{self._numeric_binds_identifier_char}{num}" 

1728 num += 1 

1729 param_pos[bind_name] = ph 

1730 

1731 self.next_numeric_pos = num 

1732 

1733 self.positiontup = list(param_pos) 

1734 if self.escaped_bind_names: 

1735 len_before = len(param_pos) 

1736 param_pos = { 

1737 self.escaped_bind_names.get(name, name): pos 

1738 for name, pos in param_pos.items() 

1739 } 

1740 assert len(param_pos) == len_before 

1741 

1742 # Can't use format here since % chars are not escaped. 

1743 self.string = self._pyformat_pattern.sub( 

1744 lambda m: param_pos[m.group(1)], self.string 

1745 ) 

1746 

1747 if self._insertmanyvalues: 

1748 single_values_expr = ( 

1749 # format is ok here since single_values_expr includes only 

1750 # place-holders 

1751 self._insertmanyvalues.single_values_expr 

1752 % param_pos 

1753 ) 

1754 insert_crud_params = [ 

1755 (v[0], v[1], "%s", v[3]) 

1756 for v in self._insertmanyvalues.insert_crud_params 

1757 ] 

1758 

1759 self._insertmanyvalues = self._insertmanyvalues._replace( 

1760 # This has the numbers (:1, :2) 

1761 single_values_expr=single_values_expr, 

1762 # The single binds are instead %s so they can be formatted 

1763 insert_crud_params=insert_crud_params, 

1764 ) 

1765 

1766 @util.memoized_property 

1767 def _bind_processors( 

1768 self, 

1769 ) -> MutableMapping[ 

1770 str, Union[_BindProcessorType[Any], Sequence[_BindProcessorType[Any]]] 

1771 ]: 

1772 # mypy is not able to see the two value types as the above Union, 

1773 # it just sees "object". don't know how to resolve 

1774 return { 

1775 key: value # type: ignore 

1776 for key, value in ( 

1777 ( 

1778 self.bind_names[bindparam], 

1779 ( 

1780 bindparam.type._cached_bind_processor(self.dialect) 

1781 if not bindparam.type._is_tuple_type 

1782 else tuple( 

1783 elem_type._cached_bind_processor(self.dialect) 

1784 for elem_type in cast( 

1785 TupleType, bindparam.type 

1786 ).types 

1787 ) 

1788 ), 

1789 ) 

1790 for bindparam in self.bind_names 

1791 ) 

1792 if value is not None 

1793 } 

1794 

1795 def is_subquery(self): 

1796 return len(self.stack) > 1 

1797 

1798 @property 

1799 def sql_compiler(self) -> Self: 

1800 return self 

1801 

1802 def construct_expanded_state( 

1803 self, 

1804 params: Optional[_CoreSingleExecuteParams] = None, 

1805 escape_names: bool = True, 

1806 ) -> ExpandedState: 

1807 """Return a new :class:`.ExpandedState` for a given parameter set. 

1808 

1809 For queries that use "expanding" or other late-rendered parameters, 

1810 this method will provide for both the finalized SQL string as well 

1811 as the parameters that would be used for a particular parameter set. 

1812 

1813 .. versionadded:: 2.0.0rc1 

1814 

1815 """ 

1816 parameters = self.construct_params( 

1817 params, 

1818 escape_names=escape_names, 

1819 _no_postcompile=True, 

1820 ) 

1821 return self._process_parameters_for_postcompile( 

1822 parameters, 

1823 ) 

1824 

1825 def construct_params( 

1826 self, 

1827 params: Optional[_CoreSingleExecuteParams] = None, 

1828 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None, 

1829 escape_names: bool = True, 

1830 _group_number: Optional[int] = None, 

1831 _check: bool = True, 

1832 _no_postcompile: bool = False, 

1833 ) -> _MutableCoreSingleExecuteParams: 

1834 """return a dictionary of bind parameter keys and values""" 

1835 

1836 if self._render_postcompile and not _no_postcompile: 

1837 assert self._post_compile_expanded_state is not None 

1838 if not params: 

1839 return dict(self._post_compile_expanded_state.parameters) 

1840 else: 

1841 raise exc.InvalidRequestError( 

1842 "can't construct new parameters when render_postcompile " 

1843 "is used; the statement is hard-linked to the original " 

1844 "parameters. Use construct_expanded_state to generate a " 

1845 "new statement and parameters." 

1846 ) 

1847 

1848 has_escaped_names = escape_names and bool(self.escaped_bind_names) 

1849 

1850 if extracted_parameters: 

1851 # related the bound parameters collected in the original cache key 

1852 # to those collected in the incoming cache key. They will not have 

1853 # matching names but they will line up positionally in the same 

1854 # way. The parameters present in self.bind_names may be clones of 

1855 # these original cache key params in the case of DML but the .key 

1856 # will be guaranteed to match. 

1857 if self.cache_key is None: 

1858 raise exc.CompileError( 

1859 "This compiled object has no original cache key; " 

1860 "can't pass extracted_parameters to construct_params" 

1861 ) 

1862 else: 

1863 orig_extracted = self.cache_key[1] 

1864 

1865 ckbm_tuple = self._cache_key_bind_match 

1866 assert ckbm_tuple is not None 

1867 ckbm, _ = ckbm_tuple 

1868 resolved_extracted = { 

1869 bind: extracted 

1870 for b, extracted in zip(orig_extracted, extracted_parameters) 

1871 for bind in ckbm[b] 

1872 } 

1873 else: 

1874 resolved_extracted = None 

1875 

1876 if params: 

1877 pd = {} 

1878 for bindparam, name in self.bind_names.items(): 

1879 escaped_name = ( 

1880 self.escaped_bind_names.get(name, name) 

1881 if has_escaped_names 

1882 else name 

1883 ) 

1884 

1885 if bindparam.key in params: 

1886 pd[escaped_name] = params[bindparam.key] 

1887 elif name in params: 

1888 pd[escaped_name] = params[name] 

1889 

1890 elif _check and bindparam.required: 

1891 if _group_number: 

1892 raise exc.InvalidRequestError( 

1893 "A value is required for bind parameter %r, " 

1894 "in parameter group %d" 

1895 % (bindparam.key, _group_number), 

1896 code="cd3x", 

1897 ) 

1898 else: 

1899 raise exc.InvalidRequestError( 

1900 "A value is required for bind parameter %r" 

1901 % bindparam.key, 

1902 code="cd3x", 

1903 ) 

1904 else: 

1905 if resolved_extracted: 

1906 value_param = resolved_extracted.get( 

1907 bindparam, bindparam 

1908 ) 

1909 else: 

1910 value_param = bindparam 

1911 

1912 if bindparam.callable: 

1913 pd[escaped_name] = value_param.effective_value 

1914 else: 

1915 pd[escaped_name] = value_param.value 

1916 return pd 

1917 else: 

1918 pd = {} 

1919 for bindparam, name in self.bind_names.items(): 

1920 escaped_name = ( 

1921 self.escaped_bind_names.get(name, name) 

1922 if has_escaped_names 

1923 else name 

1924 ) 

1925 

1926 if _check and bindparam.required: 

1927 if _group_number: 

1928 raise exc.InvalidRequestError( 

1929 "A value is required for bind parameter %r, " 

1930 "in parameter group %d" 

1931 % (bindparam.key, _group_number), 

1932 code="cd3x", 

1933 ) 

1934 else: 

1935 raise exc.InvalidRequestError( 

1936 "A value is required for bind parameter %r" 

1937 % bindparam.key, 

1938 code="cd3x", 

1939 ) 

1940 

1941 if resolved_extracted: 

1942 value_param = resolved_extracted.get(bindparam, bindparam) 

1943 else: 

1944 value_param = bindparam 

1945 

1946 if bindparam.callable: 

1947 pd[escaped_name] = value_param.effective_value 

1948 else: 

1949 pd[escaped_name] = value_param.value 

1950 

1951 return pd 

1952 

1953 @util.memoized_instancemethod 

1954 def _get_set_input_sizes_lookup(self): 

1955 dialect = self.dialect 

1956 

1957 include_types = dialect.include_set_input_sizes 

1958 exclude_types = dialect.exclude_set_input_sizes 

1959 

1960 dbapi = dialect.dbapi 

1961 

1962 def lookup_type(typ): 

1963 dbtype = typ._unwrapped_dialect_impl(dialect).get_dbapi_type(dbapi) 

1964 

1965 if ( 

1966 dbtype is not None 

1967 and (exclude_types is None or dbtype not in exclude_types) 

1968 and (include_types is None or dbtype in include_types) 

1969 ): 

1970 return dbtype 

1971 else: 

1972 return None 

1973 

1974 inputsizes = {} 

1975 

1976 literal_execute_params = self.literal_execute_params 

1977 

1978 for bindparam in self.bind_names: 

1979 if bindparam in literal_execute_params: 

1980 continue 

1981 

1982 if bindparam.type._is_tuple_type: 

1983 inputsizes[bindparam] = [ 

1984 lookup_type(typ) 

1985 for typ in cast(TupleType, bindparam.type).types 

1986 ] 

1987 else: 

1988 inputsizes[bindparam] = lookup_type(bindparam.type) 

1989 

1990 return inputsizes 

1991 

1992 @property 

1993 def params(self): 

1994 """Return the bind param dictionary embedded into this 

1995 compiled object, for those values that are present. 

1996 

1997 .. seealso:: 

1998 

1999 :ref:`faq_sql_expression_string` - includes a usage example for 

2000 debugging use cases. 

2001 

2002 """ 

2003 return self.construct_params(_check=False) 

2004 

2005 def _process_parameters_for_postcompile( 

2006 self, 

2007 parameters: _MutableCoreSingleExecuteParams, 

2008 _populate_self: bool = False, 

2009 ) -> ExpandedState: 

2010 """handle special post compile parameters. 

2011 

2012 These include: 

2013 

2014 * "expanding" parameters -typically IN tuples that are rendered 

2015 on a per-parameter basis for an otherwise fixed SQL statement string. 

2016 

2017 * literal_binds compiled with the literal_execute flag. Used for 

2018 things like SQL Server "TOP N" where the driver does not accommodate 

2019 N as a bound parameter. 

2020 

2021 """ 

2022 

2023 expanded_parameters = {} 

2024 new_positiontup: Optional[List[str]] 

2025 

2026 pre_expanded_string = self._pre_expanded_string 

2027 if pre_expanded_string is None: 

2028 pre_expanded_string = self.string 

2029 

2030 if self.positional: 

2031 new_positiontup = [] 

2032 

2033 pre_expanded_positiontup = self._pre_expanded_positiontup 

2034 if pre_expanded_positiontup is None: 

2035 pre_expanded_positiontup = self.positiontup 

2036 

2037 else: 

2038 new_positiontup = pre_expanded_positiontup = None 

2039 

2040 processors = self._bind_processors 

2041 single_processors = cast( 

2042 "Mapping[str, _BindProcessorType[Any]]", processors 

2043 ) 

2044 tuple_processors = cast( 

2045 "Mapping[str, Sequence[_BindProcessorType[Any]]]", processors 

2046 ) 

2047 

2048 new_processors: Dict[str, _BindProcessorType[Any]] = {} 

2049 

2050 replacement_expressions: Dict[str, Any] = {} 

2051 to_update_sets: Dict[str, Any] = {} 

2052 

2053 # notes: 

2054 # *unescaped* parameter names in: 

2055 # self.bind_names, self.binds, self._bind_processors, self.positiontup 

2056 # 

2057 # *escaped* parameter names in: 

2058 # construct_params(), replacement_expressions 

2059 

2060 numeric_positiontup: Optional[List[str]] = None 

2061 

2062 if self.positional and pre_expanded_positiontup is not None: 

2063 names: Iterable[str] = pre_expanded_positiontup 

2064 if self._numeric_binds: 

2065 numeric_positiontup = [] 

2066 else: 

2067 names = self.bind_names.values() 

2068 

2069 ebn = self.escaped_bind_names 

2070 for name in names: 

2071 escaped_name = ebn.get(name, name) if ebn else name 

2072 parameter = self.binds[name] 

2073 

2074 if parameter in self.literal_execute_params: 

2075 if escaped_name not in replacement_expressions: 

2076 replacement_expressions[escaped_name] = ( 

2077 self.render_literal_bindparam( 

2078 parameter, 

2079 render_literal_value=parameters.pop(escaped_name), 

2080 ) 

2081 ) 

2082 continue 

2083 

2084 if parameter in self.post_compile_params: 

2085 if escaped_name in replacement_expressions: 

2086 to_update = to_update_sets[escaped_name] 

2087 values = None 

2088 else: 

2089 # we are removing the parameter from parameters 

2090 # because it is a list value, which is not expected by 

2091 # TypeEngine objects that would otherwise be asked to 

2092 # process it. the single name is being replaced with 

2093 # individual numbered parameters for each value in the 

2094 # param. 

2095 # 

2096 # note we are also inserting *escaped* parameter names 

2097 # into the given dictionary. default dialect will 

2098 # use these param names directly as they will not be 

2099 # in the escaped_bind_names dictionary. 

2100 values = parameters.pop(name) 

2101 

2102 leep_res = self._literal_execute_expanding_parameter( 

2103 escaped_name, parameter, values 

2104 ) 

2105 (to_update, replacement_expr) = leep_res 

2106 

2107 to_update_sets[escaped_name] = to_update 

2108 replacement_expressions[escaped_name] = replacement_expr 

2109 

2110 if not parameter.literal_execute: 

2111 parameters.update(to_update) 

2112 if parameter.type._is_tuple_type: 

2113 assert values is not None 

2114 new_processors.update( 

2115 ( 

2116 "%s_%s_%s" % (name, i, j), 

2117 tuple_processors[name][j - 1], 

2118 ) 

2119 for i, tuple_element in enumerate(values, 1) 

2120 for j, _ in enumerate(tuple_element, 1) 

2121 if name in tuple_processors 

2122 and tuple_processors[name][j - 1] is not None 

2123 ) 

2124 else: 

2125 new_processors.update( 

2126 (key, single_processors[name]) 

2127 for key, _ in to_update 

2128 if name in single_processors 

2129 ) 

2130 if numeric_positiontup is not None: 

2131 numeric_positiontup.extend( 

2132 name for name, _ in to_update 

2133 ) 

2134 elif new_positiontup is not None: 

2135 # to_update has escaped names, but that's ok since 

2136 # these are new names, that aren't in the 

2137 # escaped_bind_names dict. 

2138 new_positiontup.extend(name for name, _ in to_update) 

2139 expanded_parameters[name] = [ 

2140 expand_key for expand_key, _ in to_update 

2141 ] 

2142 elif new_positiontup is not None: 

2143 new_positiontup.append(name) 

2144 

2145 def process_expanding(m): 

2146 key = m.group(1) 

2147 expr = replacement_expressions[key] 

2148 

2149 # if POSTCOMPILE included a bind_expression, render that 

2150 # around each element 

2151 if m.group(2): 

2152 tok = m.group(2).split("~~") 

2153 be_left, be_right = tok[1], tok[3] 

2154 expr = ", ".join( 

2155 "%s%s%s" % (be_left, exp, be_right) 

2156 for exp in expr.split(", ") 

2157 ) 

2158 return expr 

2159 

2160 statement = re.sub( 

2161 self._post_compile_pattern, process_expanding, pre_expanded_string 

2162 ) 

2163 

2164 if numeric_positiontup is not None: 

2165 assert new_positiontup is not None 

2166 param_pos = { 

2167 key: f"{self._numeric_binds_identifier_char}{num}" 

2168 for num, key in enumerate( 

2169 numeric_positiontup, self.next_numeric_pos 

2170 ) 

2171 } 

2172 # Can't use format here since % chars are not escaped. 

2173 statement = self._pyformat_pattern.sub( 

2174 lambda m: param_pos[m.group(1)], statement 

2175 ) 

2176 new_positiontup.extend(numeric_positiontup) 

2177 

2178 expanded_state = ExpandedState( 

2179 statement, 

2180 parameters, 

2181 new_processors, 

2182 new_positiontup, 

2183 expanded_parameters, 

2184 ) 

2185 

2186 if _populate_self: 

2187 # this is for the "render_postcompile" flag, which is not 

2188 # otherwise used internally and is for end-user debugging and 

2189 # special use cases. 

2190 self._pre_expanded_string = pre_expanded_string 

2191 self._pre_expanded_positiontup = pre_expanded_positiontup 

2192 self.string = expanded_state.statement 

2193 self.positiontup = ( 

2194 list(expanded_state.positiontup or ()) 

2195 if self.positional 

2196 else None 

2197 ) 

2198 self._post_compile_expanded_state = expanded_state 

2199 

2200 return expanded_state 

2201 

2202 @util.preload_module("sqlalchemy.engine.cursor") 

2203 def _create_result_map(self): 

2204 """utility method used for unit tests only.""" 

2205 cursor = util.preloaded.engine_cursor 

2206 return cursor.CursorResultMetaData._create_description_match_map( 

2207 self._result_columns 

2208 ) 

2209 

2210 # assigned by crud.py for insert/update statements 

2211 _get_bind_name_for_col: _BindNameForColProtocol 

2212 

2213 @util.memoized_property 

2214 def _within_exec_param_key_getter(self) -> Callable[[Any], str]: 

2215 getter = self._get_bind_name_for_col 

2216 return getter 

2217 

2218 @util.memoized_property 

2219 @util.preload_module("sqlalchemy.engine.result") 

2220 def _inserted_primary_key_from_lastrowid_getter(self): 

2221 result = util.preloaded.engine_result 

2222 

2223 param_key_getter = self._within_exec_param_key_getter 

2224 

2225 assert self.compile_state is not None 

2226 statement = self.compile_state.statement 

2227 

2228 if TYPE_CHECKING: 

2229 assert isinstance(statement, Insert) 

2230 

2231 table = statement.table 

2232 

2233 getters = [ 

2234 (operator.methodcaller("get", param_key_getter(col), None), col) 

2235 for col in table.primary_key 

2236 ] 

2237 

2238 autoinc_getter = None 

2239 autoinc_col = table._autoincrement_column 

2240 if autoinc_col is not None: 

2241 # apply type post processors to the lastrowid 

2242 lastrowid_processor = autoinc_col.type._cached_result_processor( 

2243 self.dialect, None 

2244 ) 

2245 autoinc_key = param_key_getter(autoinc_col) 

2246 

2247 # if a bind value is present for the autoincrement column 

2248 # in the parameters, we need to do the logic dictated by 

2249 # #7998; honor a non-None user-passed parameter over lastrowid. 

2250 # previously in the 1.4 series we weren't fetching lastrowid 

2251 # at all if the key were present in the parameters 

2252 if autoinc_key in self.binds: 

2253 

2254 def _autoinc_getter(lastrowid, parameters): 

2255 param_value = parameters.get(autoinc_key, lastrowid) 

2256 if param_value is not None: 

2257 # they supplied non-None parameter, use that. 

2258 # SQLite at least is observed to return the wrong 

2259 # cursor.lastrowid for INSERT..ON CONFLICT so it 

2260 # can't be used in all cases 

2261 return param_value 

2262 else: 

2263 # use lastrowid 

2264 return lastrowid 

2265 

2266 # work around mypy https://github.com/python/mypy/issues/14027 

2267 autoinc_getter = _autoinc_getter 

2268 

2269 else: 

2270 lastrowid_processor = None 

2271 

2272 row_fn = result.result_tuple([col.key for col in table.primary_key]) 

2273 

2274 def get(lastrowid, parameters): 

2275 """given cursor.lastrowid value and the parameters used for INSERT, 

2276 return a "row" that represents the primary key, either by 

2277 using the "lastrowid" or by extracting values from the parameters 

2278 that were sent along with the INSERT. 

2279 

2280 """ 

2281 if lastrowid_processor is not None: 

2282 lastrowid = lastrowid_processor(lastrowid) 

2283 

2284 if lastrowid is None: 

2285 return row_fn(getter(parameters) for getter, col in getters) 

2286 else: 

2287 return row_fn( 

2288 ( 

2289 ( 

2290 autoinc_getter(lastrowid, parameters) 

2291 if autoinc_getter is not None 

2292 else lastrowid 

2293 ) 

2294 if col is autoinc_col 

2295 else getter(parameters) 

2296 ) 

2297 for getter, col in getters 

2298 ) 

2299 

2300 return get 

2301 

2302 @util.memoized_property 

2303 @util.preload_module("sqlalchemy.engine.result") 

2304 def _inserted_primary_key_from_returning_getter(self): 

2305 result = util.preloaded.engine_result 

2306 

2307 assert self.compile_state is not None 

2308 statement = self.compile_state.statement 

2309 

2310 if TYPE_CHECKING: 

2311 assert isinstance(statement, Insert) 

2312 

2313 param_key_getter = self._within_exec_param_key_getter 

2314 table = statement.table 

2315 

2316 returning = self.implicit_returning 

2317 assert returning is not None 

2318 ret = {col: idx for idx, col in enumerate(returning)} 

2319 

2320 getters = cast( 

2321 "List[Tuple[Callable[[Any], Any], bool]]", 

2322 [ 

2323 ( 

2324 (operator.itemgetter(ret[col]), True) 

2325 if col in ret 

2326 else ( 

2327 operator.methodcaller( 

2328 "get", param_key_getter(col), None 

2329 ), 

2330 False, 

2331 ) 

2332 ) 

2333 for col in table.primary_key 

2334 ], 

2335 ) 

2336 

2337 row_fn = result.result_tuple([col.key for col in table.primary_key]) 

2338 

2339 def get(row, parameters): 

2340 return row_fn( 

2341 getter(row) if use_row else getter(parameters) 

2342 for getter, use_row in getters 

2343 ) 

2344 

2345 return get 

2346 

2347 def default_from(self) -> str: 

2348 """Called when a SELECT statement has no froms, and no FROM clause is 

2349 to be appended. 

2350 

2351 Gives Oracle Database a chance to tack on a ``FROM DUAL`` to the string 

2352 output. 

2353 

2354 """ 

2355 return "" 

2356 

2357 def visit_override_binds(self, override_binds, **kw): 

2358 """SQL compile the nested element of an _OverrideBinds with 

2359 bindparams swapped out. 

2360 

2361 The _OverrideBinds is not normally expected to be compiled; it 

2362 is meant to be used when an already cached statement is to be used, 

2363 the compilation was already performed, and only the bound params should 

2364 be swapped in at execution time. 

2365 

2366 However, there are test cases that exericise this object, and 

2367 additionally the ORM subquery loader is known to feed in expressions 

2368 which include this construct into new queries (discovered in #11173), 

2369 so it has to do the right thing at compile time as well. 

2370 

2371 """ 

2372 

2373 # get SQL text first 

2374 sqltext = override_binds.element._compiler_dispatch(self, **kw) 

2375 

2376 # for a test compile that is not for caching, change binds after the 

2377 # fact. note that we don't try to 

2378 # swap the bindparam as we compile, because our element may be 

2379 # elsewhere in the statement already (e.g. a subquery or perhaps a 

2380 # CTE) and was already visited / compiled. See 

2381 # test_relationship_criteria.py -> 

2382 # test_selectinload_local_criteria_subquery 

2383 for k in override_binds.translate: 

2384 if k not in self.binds: 

2385 continue 

2386 bp = self.binds[k] 

2387 

2388 # so this would work, just change the value of bp in place. 

2389 # but we dont want to mutate things outside. 

2390 # bp.value = override_binds.translate[bp.key] 

2391 # continue 

2392 

2393 # instead, need to replace bp with new_bp or otherwise accommodate 

2394 # in all internal collections 

2395 new_bp = bp._with_value( 

2396 override_binds.translate[bp.key], 

2397 maintain_key=True, 

2398 required=False, 

2399 ) 

2400 

2401 name = self.bind_names[bp] 

2402 self.binds[k] = self.binds[name] = new_bp 

2403 self.bind_names[new_bp] = name 

2404 self.bind_names.pop(bp, None) 

2405 

2406 if bp in self.post_compile_params: 

2407 self.post_compile_params |= {new_bp} 

2408 if bp in self.literal_execute_params: 

2409 self.literal_execute_params |= {new_bp} 

2410 

2411 ckbm_tuple = self._cache_key_bind_match 

2412 if ckbm_tuple: 

2413 ckbm, cksm = ckbm_tuple 

2414 for bp in bp._cloned_set: 

2415 if bp.key in cksm: 

2416 cb = cksm[bp.key] 

2417 ckbm[cb].append(new_bp) 

2418 

2419 return sqltext 

2420 

2421 def visit_grouping(self, grouping, asfrom=False, **kwargs): 

2422 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")" 

2423 

2424 def visit_select_statement_grouping(self, grouping, **kwargs): 

2425 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")" 

2426 

2427 def visit_label_reference( 

2428 self, element, within_columns_clause=False, **kwargs 

2429 ): 

2430 if self.stack and self.dialect.supports_simple_order_by_label: 

2431 try: 

2432 compile_state = cast( 

2433 "Union[SelectState, CompoundSelectState]", 

2434 self.stack[-1]["compile_state"], 

2435 ) 

2436 except KeyError as ke: 

2437 raise exc.CompileError( 

2438 "Can't resolve label reference for ORDER BY / " 

2439 "GROUP BY / DISTINCT etc." 

2440 ) from ke 

2441 

2442 ( 

2443 with_cols, 

2444 only_froms, 

2445 only_cols, 

2446 ) = compile_state._label_resolve_dict 

2447 if within_columns_clause: 

2448 resolve_dict = only_froms 

2449 else: 

2450 resolve_dict = only_cols 

2451 

2452 # this can be None in the case that a _label_reference() 

2453 # were subject to a replacement operation, in which case 

2454 # the replacement of the Label element may have changed 

2455 # to something else like a ColumnClause expression. 

2456 order_by_elem = element.element._order_by_label_element 

2457 

2458 if ( 

2459 order_by_elem is not None 

2460 and order_by_elem.name in resolve_dict 

2461 and order_by_elem.shares_lineage( 

2462 resolve_dict[order_by_elem.name] 

2463 ) 

2464 ): 

2465 kwargs["render_label_as_label"] = ( 

2466 element.element._order_by_label_element 

2467 ) 

2468 return self.process( 

2469 element.element, 

2470 within_columns_clause=within_columns_clause, 

2471 **kwargs, 

2472 ) 

2473 

2474 def visit_textual_label_reference( 

2475 self, element, within_columns_clause=False, **kwargs 

2476 ): 

2477 if not self.stack: 

2478 # compiling the element outside of the context of a SELECT 

2479 return self.process(element._text_clause) 

2480 

2481 try: 

2482 compile_state = cast( 

2483 "Union[SelectState, CompoundSelectState]", 

2484 self.stack[-1]["compile_state"], 

2485 ) 

2486 except KeyError as ke: 

2487 coercions._no_text_coercion( 

2488 element.element, 

2489 extra=( 

2490 "Can't resolve label reference for ORDER BY / " 

2491 "GROUP BY / DISTINCT etc." 

2492 ), 

2493 exc_cls=exc.CompileError, 

2494 err=ke, 

2495 ) 

2496 

2497 with_cols, only_froms, only_cols = compile_state._label_resolve_dict 

2498 try: 

2499 if within_columns_clause: 

2500 col = only_froms[element.element] 

2501 else: 

2502 col = with_cols[element.element] 

2503 except KeyError as err: 

2504 coercions._no_text_coercion( 

2505 element.element, 

2506 extra=( 

2507 "Can't resolve label reference for ORDER BY / " 

2508 "GROUP BY / DISTINCT etc." 

2509 ), 

2510 exc_cls=exc.CompileError, 

2511 err=err, 

2512 ) 

2513 else: 

2514 kwargs["render_label_as_label"] = col 

2515 return self.process( 

2516 col, within_columns_clause=within_columns_clause, **kwargs 

2517 ) 

2518 

2519 def visit_label( 

2520 self, 

2521 label, 

2522 add_to_result_map=None, 

2523 within_label_clause=False, 

2524 within_columns_clause=False, 

2525 render_label_as_label=None, 

2526 result_map_targets=(), 

2527 **kw, 

2528 ): 

2529 # only render labels within the columns clause 

2530 # or ORDER BY clause of a select. dialect-specific compilers 

2531 # can modify this behavior. 

2532 render_label_with_as = ( 

2533 within_columns_clause and not within_label_clause 

2534 ) 

2535 render_label_only = render_label_as_label is label 

2536 

2537 if render_label_only or render_label_with_as: 

2538 if isinstance(label.name, elements._truncated_label): 

2539 labelname = self._truncated_identifier("colident", label.name) 

2540 else: 

2541 labelname = label.name 

2542 

2543 if render_label_with_as: 

2544 if add_to_result_map is not None: 

2545 add_to_result_map( 

2546 labelname, 

2547 label.name, 

2548 (label, labelname) + label._alt_names + result_map_targets, 

2549 label.type, 

2550 ) 

2551 return ( 

2552 label.element._compiler_dispatch( 

2553 self, 

2554 within_columns_clause=True, 

2555 within_label_clause=True, 

2556 **kw, 

2557 ) 

2558 + OPERATORS[operators.as_] 

2559 + self.preparer.format_label(label, labelname) 

2560 ) 

2561 elif render_label_only: 

2562 return self.preparer.format_label(label, labelname) 

2563 else: 

2564 return label.element._compiler_dispatch( 

2565 self, within_columns_clause=False, **kw 

2566 ) 

2567 

2568 def _fallback_column_name(self, column): 

2569 raise exc.CompileError( 

2570 "Cannot compile Column object until its 'name' is assigned." 

2571 ) 

2572 

2573 def visit_lambda_element(self, element, **kw): 

2574 sql_element = element._resolved 

2575 return self.process(sql_element, **kw) 

2576 

2577 def visit_column( 

2578 self, 

2579 column: ColumnClause[Any], 

2580 add_to_result_map: Optional[_ResultMapAppender] = None, 

2581 include_table: bool = True, 

2582 result_map_targets: Tuple[Any, ...] = (), 

2583 ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] = None, 

2584 **kwargs: Any, 

2585 ) -> str: 

2586 name = orig_name = column.name 

2587 if name is None: 

2588 name = self._fallback_column_name(column) 

2589 

2590 is_literal = column.is_literal 

2591 if not is_literal and isinstance(name, elements._truncated_label): 

2592 name = self._truncated_identifier("colident", name) 

2593 

2594 if add_to_result_map is not None: 

2595 targets = (column, name, column.key) + result_map_targets 

2596 if column._tq_label: 

2597 targets += (column._tq_label,) 

2598 

2599 add_to_result_map(name, orig_name, targets, column.type) 

2600 

2601 if is_literal: 

2602 # note we are not currently accommodating for 

2603 # literal_column(quoted_name('ident', True)) here 

2604 name = self.escape_literal_column(name) 

2605 else: 

2606 name = self.preparer.quote(name) 

2607 table = column.table 

2608 if table is None or not include_table or not table.named_with_column: 

2609 return name 

2610 else: 

2611 effective_schema = self.preparer.schema_for_object(table) 

2612 

2613 if effective_schema: 

2614 schema_prefix = ( 

2615 self.preparer.quote_schema(effective_schema) + "." 

2616 ) 

2617 else: 

2618 schema_prefix = "" 

2619 

2620 if TYPE_CHECKING: 

2621 assert isinstance(table, NamedFromClause) 

2622 tablename = table.name 

2623 

2624 if ( 

2625 not effective_schema 

2626 and ambiguous_table_name_map 

2627 and tablename in ambiguous_table_name_map 

2628 ): 

2629 tablename = ambiguous_table_name_map[tablename] 

2630 

2631 if isinstance(tablename, elements._truncated_label): 

2632 tablename = self._truncated_identifier("alias", tablename) 

2633 

2634 return schema_prefix + self.preparer.quote(tablename) + "." + name 

2635 

2636 def visit_collation(self, element, **kw): 

2637 return self.preparer.format_collation(element.collation) 

2638 

2639 def visit_fromclause(self, fromclause, **kwargs): 

2640 return fromclause.name 

2641 

2642 def visit_index(self, index, **kwargs): 

2643 return index.name 

2644 

2645 def visit_typeclause(self, typeclause, **kw): 

2646 kw["type_expression"] = typeclause 

2647 kw["identifier_preparer"] = self.preparer 

2648 return self.dialect.type_compiler_instance.process( 

2649 typeclause.type, **kw 

2650 ) 

2651 

2652 def post_process_text(self, text): 

2653 if self.preparer._double_percents: 

2654 text = text.replace("%", "%%") 

2655 return text 

2656 

2657 def escape_literal_column(self, text): 

2658 if self.preparer._double_percents: 

2659 text = text.replace("%", "%%") 

2660 return text 

2661 

2662 def visit_textclause(self, textclause, add_to_result_map=None, **kw): 

2663 def do_bindparam(m): 

2664 name = m.group(1) 

2665 if name in textclause._bindparams: 

2666 return self.process(textclause._bindparams[name], **kw) 

2667 else: 

2668 return self.bindparam_string(name, **kw) 

2669 

2670 if not self.stack: 

2671 self.isplaintext = True 

2672 

2673 if add_to_result_map: 

2674 # text() object is present in the columns clause of a 

2675 # select(). Add a no-name entry to the result map so that 

2676 # row[text()] produces a result 

2677 add_to_result_map(None, None, (textclause,), sqltypes.NULLTYPE) 

2678 

2679 # un-escape any \:params 

2680 return BIND_PARAMS_ESC.sub( 

2681 lambda m: m.group(1), 

2682 BIND_PARAMS.sub( 

2683 do_bindparam, self.post_process_text(textclause.text) 

2684 ), 

2685 ) 

2686 

2687 def visit_textual_select( 

2688 self, taf, compound_index=None, asfrom=False, **kw 

2689 ): 

2690 toplevel = not self.stack 

2691 entry = self._default_stack_entry if toplevel else self.stack[-1] 

2692 

2693 new_entry: _CompilerStackEntry = { 

2694 "correlate_froms": set(), 

2695 "asfrom_froms": set(), 

2696 "selectable": taf, 

2697 } 

2698 self.stack.append(new_entry) 

2699 

2700 if taf._independent_ctes: 

2701 self._dispatch_independent_ctes(taf, kw) 

2702 

2703 populate_result_map = ( 

2704 toplevel 

2705 or ( 

2706 compound_index == 0 

2707 and entry.get("need_result_map_for_compound", False) 

2708 ) 

2709 or entry.get("need_result_map_for_nested", False) 

2710 ) 

2711 

2712 if populate_result_map: 

2713 self._ordered_columns = self._textual_ordered_columns = ( 

2714 taf.positional 

2715 ) 

2716 

2717 # enable looser result column matching when the SQL text links to 

2718 # Column objects by name only 

2719 self._loose_column_name_matching = not taf.positional and bool( 

2720 taf.column_args 

2721 ) 

2722 

2723 for c in taf.column_args: 

2724 self.process( 

2725 c, 

2726 within_columns_clause=True, 

2727 add_to_result_map=self._add_to_result_map, 

2728 ) 

2729 

2730 text = self.process(taf.element, **kw) 

2731 if self.ctes: 

2732 nesting_level = len(self.stack) if not toplevel else None 

2733 text = self._render_cte_clause(nesting_level=nesting_level) + text 

2734 

2735 self.stack.pop(-1) 

2736 

2737 return text 

2738 

2739 def visit_null(self, expr: Null, **kw: Any) -> str: 

2740 return "NULL" 

2741 

2742 def visit_true(self, expr: True_, **kw: Any) -> str: 

2743 if self.dialect.supports_native_boolean: 

2744 return "true" 

2745 else: 

2746 return "1" 

2747 

2748 def visit_false(self, expr: False_, **kw: Any) -> str: 

2749 if self.dialect.supports_native_boolean: 

2750 return "false" 

2751 else: 

2752 return "0" 

2753 

2754 def _generate_delimited_list(self, elements, separator, **kw): 

2755 return separator.join( 

2756 s 

2757 for s in (c._compiler_dispatch(self, **kw) for c in elements) 

2758 if s 

2759 ) 

2760 

2761 def _generate_delimited_and_list(self, clauses, **kw): 

2762 lcc, clauses = elements.BooleanClauseList._process_clauses_for_boolean( 

2763 operators.and_, 

2764 elements.True_._singleton, 

2765 elements.False_._singleton, 

2766 clauses, 

2767 ) 

2768 if lcc == 1: 

2769 return clauses[0]._compiler_dispatch(self, **kw) 

2770 else: 

2771 separator = OPERATORS[operators.and_] 

2772 return separator.join( 

2773 s 

2774 for s in (c._compiler_dispatch(self, **kw) for c in clauses) 

2775 if s 

2776 ) 

2777 

2778 def visit_tuple(self, clauselist, **kw): 

2779 return "(%s)" % self.visit_clauselist(clauselist, **kw) 

2780 

2781 def visit_element_list(self, element, **kw): 

2782 return self._generate_delimited_list(element.clauses, " ", **kw) 

2783 

2784 def visit_clauselist(self, clauselist, **kw): 

2785 sep = clauselist.operator 

2786 if sep is None: 

2787 sep = " " 

2788 else: 

2789 sep = OPERATORS[clauselist.operator] 

2790 

2791 return self._generate_delimited_list(clauselist.clauses, sep, **kw) 

2792 

2793 def visit_expression_clauselist(self, clauselist, **kw): 

2794 operator_ = clauselist.operator 

2795 

2796 disp = self._get_operator_dispatch( 

2797 operator_, "expression_clauselist", None 

2798 ) 

2799 if disp: 

2800 return disp(clauselist, operator_, **kw) 

2801 

2802 try: 

2803 opstring = OPERATORS[operator_] 

2804 except KeyError as err: 

2805 raise exc.UnsupportedCompilationError(self, operator_) from err 

2806 else: 

2807 kw["_in_operator_expression"] = True 

2808 return self._generate_delimited_list( 

2809 clauselist.clauses, opstring, **kw 

2810 ) 

2811 

2812 def visit_case(self, clause, **kwargs): 

2813 x = "CASE " 

2814 if clause.value is not None: 

2815 x += clause.value._compiler_dispatch(self, **kwargs) + " " 

2816 for cond, result in clause.whens: 

2817 x += ( 

2818 "WHEN " 

2819 + cond._compiler_dispatch(self, **kwargs) 

2820 + " THEN " 

2821 + result._compiler_dispatch(self, **kwargs) 

2822 + " " 

2823 ) 

2824 if clause.else_ is not None: 

2825 x += ( 

2826 "ELSE " + clause.else_._compiler_dispatch(self, **kwargs) + " " 

2827 ) 

2828 x += "END" 

2829 return x 

2830 

2831 def visit_type_coerce(self, type_coerce, **kw): 

2832 return type_coerce.typed_expression._compiler_dispatch(self, **kw) 

2833 

2834 def visit_cast(self, cast, **kwargs): 

2835 type_clause = cast.typeclause._compiler_dispatch(self, **kwargs) 

2836 match = re.match("(.*)( COLLATE .*)", type_clause) 

2837 return "CAST(%s AS %s)%s" % ( 

2838 cast.clause._compiler_dispatch(self, **kwargs), 

2839 match.group(1) if match else type_clause, 

2840 match.group(2) if match else "", 

2841 ) 

2842 

2843 def visit_frame_clause(self, frameclause, **kw): 

2844 

2845 if frameclause.lower_type is elements._FrameClauseType.RANGE_UNBOUNDED: 

2846 left = "UNBOUNDED PRECEDING" 

2847 elif frameclause.lower_type is elements._FrameClauseType.RANGE_CURRENT: 

2848 left = "CURRENT ROW" 

2849 else: 

2850 val = self.process(frameclause.lower_integer_bind, **kw) 

2851 if ( 

2852 frameclause.lower_type 

2853 is elements._FrameClauseType.RANGE_PRECEDING 

2854 ): 

2855 left = f"{val} PRECEDING" 

2856 else: 

2857 left = f"{val} FOLLOWING" 

2858 

2859 if frameclause.upper_type is elements._FrameClauseType.RANGE_UNBOUNDED: 

2860 right = "UNBOUNDED FOLLOWING" 

2861 elif frameclause.upper_type is elements._FrameClauseType.RANGE_CURRENT: 

2862 right = "CURRENT ROW" 

2863 else: 

2864 val = self.process(frameclause.upper_integer_bind, **kw) 

2865 if ( 

2866 frameclause.upper_type 

2867 is elements._FrameClauseType.RANGE_PRECEDING 

2868 ): 

2869 right = f"{val} PRECEDING" 

2870 else: 

2871 right = f"{val} FOLLOWING" 

2872 

2873 return f"{left} AND {right}" 

2874 

2875 def visit_over(self, over, **kwargs): 

2876 text = over.element._compiler_dispatch(self, **kwargs) 

2877 if over.range_ is not None: 

2878 range_ = f"RANGE BETWEEN {self.process(over.range_, **kwargs)}" 

2879 elif over.rows is not None: 

2880 range_ = f"ROWS BETWEEN {self.process(over.rows, **kwargs)}" 

2881 elif over.groups is not None: 

2882 range_ = f"GROUPS BETWEEN {self.process(over.groups, **kwargs)}" 

2883 else: 

2884 range_ = None 

2885 

2886 return "%s OVER (%s)" % ( 

2887 text, 

2888 " ".join( 

2889 [ 

2890 "%s BY %s" 

2891 % (word, clause._compiler_dispatch(self, **kwargs)) 

2892 for word, clause in ( 

2893 ("PARTITION", over.partition_by), 

2894 ("ORDER", over.order_by), 

2895 ) 

2896 if clause is not None and len(clause) 

2897 ] 

2898 + ([range_] if range_ else []) 

2899 ), 

2900 ) 

2901 

2902 def visit_withingroup(self, withingroup, **kwargs): 

2903 return "%s WITHIN GROUP (ORDER BY %s)" % ( 

2904 withingroup.element._compiler_dispatch(self, **kwargs), 

2905 withingroup.order_by._compiler_dispatch(self, **kwargs), 

2906 ) 

2907 

2908 def visit_funcfilter(self, funcfilter, **kwargs): 

2909 return "%s FILTER (WHERE %s)" % ( 

2910 funcfilter.func._compiler_dispatch(self, **kwargs), 

2911 funcfilter.criterion._compiler_dispatch(self, **kwargs), 

2912 ) 

2913 

2914 def visit_extract(self, extract, **kwargs): 

2915 field = self.extract_map.get(extract.field, extract.field) 

2916 return "EXTRACT(%s FROM %s)" % ( 

2917 field, 

2918 extract.expr._compiler_dispatch(self, **kwargs), 

2919 ) 

2920 

2921 def visit_scalar_function_column(self, element, **kw): 

2922 compiled_fn = self.visit_function(element.fn, **kw) 

2923 compiled_col = self.visit_column(element, **kw) 

2924 return "(%s).%s" % (compiled_fn, compiled_col) 

2925 

2926 def visit_function( 

2927 self, 

2928 func: Function[Any], 

2929 add_to_result_map: Optional[_ResultMapAppender] = None, 

2930 **kwargs: Any, 

2931 ) -> str: 

2932 if add_to_result_map is not None: 

2933 add_to_result_map(func.name, func.name, (func.name,), func.type) 

2934 

2935 disp = getattr(self, "visit_%s_func" % func.name.lower(), None) 

2936 

2937 text: str 

2938 

2939 if disp: 

2940 text = disp(func, **kwargs) 

2941 else: 

2942 name = FUNCTIONS.get(func._deannotate().__class__, None) 

2943 if name: 

2944 if func._has_args: 

2945 name += "%(expr)s" 

2946 else: 

2947 name = func.name 

2948 name = ( 

2949 self.preparer.quote(name) 

2950 if self.preparer._requires_quotes_illegal_chars(name) 

2951 or isinstance(name, elements.quoted_name) 

2952 else name 

2953 ) 

2954 name = name + "%(expr)s" 

2955 text = ".".join( 

2956 [ 

2957 ( 

2958 self.preparer.quote(tok) 

2959 if self.preparer._requires_quotes_illegal_chars(tok) 

2960 or isinstance(name, elements.quoted_name) 

2961 else tok 

2962 ) 

2963 for tok in func.packagenames 

2964 ] 

2965 + [name] 

2966 ) % {"expr": self.function_argspec(func, **kwargs)} 

2967 

2968 if func._with_ordinality: 

2969 text += " WITH ORDINALITY" 

2970 return text 

2971 

2972 def visit_next_value_func(self, next_value, **kw): 

2973 return self.visit_sequence(next_value.sequence) 

2974 

2975 def visit_sequence(self, sequence, **kw): 

2976 raise NotImplementedError( 

2977 "Dialect '%s' does not support sequence increments." 

2978 % self.dialect.name 

2979 ) 

2980 

2981 def function_argspec(self, func: Function[Any], **kwargs: Any) -> str: 

2982 return func.clause_expr._compiler_dispatch(self, **kwargs) 

2983 

2984 def visit_compound_select( 

2985 self, cs, asfrom=False, compound_index=None, **kwargs 

2986 ): 

2987 toplevel = not self.stack 

2988 

2989 compile_state = cs._compile_state_factory(cs, self, **kwargs) 

2990 

2991 if toplevel and not self.compile_state: 

2992 self.compile_state = compile_state 

2993 

2994 compound_stmt = compile_state.statement 

2995 

2996 entry = self._default_stack_entry if toplevel else self.stack[-1] 

2997 need_result_map = toplevel or ( 

2998 not compound_index 

2999 and entry.get("need_result_map_for_compound", False) 

3000 ) 

3001 

3002 # indicates there is already a CompoundSelect in play 

3003 if compound_index == 0: 

3004 entry["select_0"] = cs 

3005 

3006 self.stack.append( 

3007 { 

3008 "correlate_froms": entry["correlate_froms"], 

3009 "asfrom_froms": entry["asfrom_froms"], 

3010 "selectable": cs, 

3011 "compile_state": compile_state, 

3012 "need_result_map_for_compound": need_result_map, 

3013 } 

3014 ) 

3015 

3016 if compound_stmt._independent_ctes: 

3017 self._dispatch_independent_ctes(compound_stmt, kwargs) 

3018 

3019 keyword = self.compound_keywords[cs.keyword] 

3020 

3021 text = (" " + keyword + " ").join( 

3022 ( 

3023 c._compiler_dispatch( 

3024 self, asfrom=asfrom, compound_index=i, **kwargs 

3025 ) 

3026 for i, c in enumerate(cs.selects) 

3027 ) 

3028 ) 

3029 

3030 kwargs["include_table"] = False 

3031 text += self.group_by_clause(cs, **dict(asfrom=asfrom, **kwargs)) 

3032 text += self.order_by_clause(cs, **kwargs) 

3033 if cs._has_row_limiting_clause: 

3034 text += self._row_limit_clause(cs, **kwargs) 

3035 

3036 if self.ctes: 

3037 nesting_level = len(self.stack) if not toplevel else None 

3038 text = ( 

3039 self._render_cte_clause( 

3040 nesting_level=nesting_level, 

3041 include_following_stack=True, 

3042 ) 

3043 + text 

3044 ) 

3045 

3046 self.stack.pop(-1) 

3047 return text 

3048 

3049 def _row_limit_clause(self, cs, **kwargs): 

3050 if cs._fetch_clause is not None: 

3051 return self.fetch_clause(cs, **kwargs) 

3052 else: 

3053 return self.limit_clause(cs, **kwargs) 

3054 

3055 def _get_operator_dispatch(self, operator_, qualifier1, qualifier2): 

3056 attrname = "visit_%s_%s%s" % ( 

3057 operator_.__name__, 

3058 qualifier1, 

3059 "_" + qualifier2 if qualifier2 else "", 

3060 ) 

3061 return getattr(self, attrname, None) 

3062 

3063 def visit_unary( 

3064 self, unary, add_to_result_map=None, result_map_targets=(), **kw 

3065 ): 

3066 if add_to_result_map is not None: 

3067 result_map_targets += (unary,) 

3068 kw["add_to_result_map"] = add_to_result_map 

3069 kw["result_map_targets"] = result_map_targets 

3070 

3071 if unary.operator: 

3072 if unary.modifier: 

3073 raise exc.CompileError( 

3074 "Unary expression does not support operator " 

3075 "and modifier simultaneously" 

3076 ) 

3077 disp = self._get_operator_dispatch( 

3078 unary.operator, "unary", "operator" 

3079 ) 

3080 if disp: 

3081 return disp(unary, unary.operator, **kw) 

3082 else: 

3083 return self._generate_generic_unary_operator( 

3084 unary, OPERATORS[unary.operator], **kw 

3085 ) 

3086 elif unary.modifier: 

3087 disp = self._get_operator_dispatch( 

3088 unary.modifier, "unary", "modifier" 

3089 ) 

3090 if disp: 

3091 return disp(unary, unary.modifier, **kw) 

3092 else: 

3093 return self._generate_generic_unary_modifier( 

3094 unary, OPERATORS[unary.modifier], **kw 

3095 ) 

3096 else: 

3097 raise exc.CompileError( 

3098 "Unary expression has no operator or modifier" 

3099 ) 

3100 

3101 def visit_truediv_binary(self, binary, operator, **kw): 

3102 if self.dialect.div_is_floordiv: 

3103 return ( 

3104 self.process(binary.left, **kw) 

3105 + " / " 

3106 # TODO: would need a fast cast again here, 

3107 # unless we want to use an implicit cast like "+ 0.0" 

3108 + self.process( 

3109 elements.Cast( 

3110 binary.right, 

3111 ( 

3112 binary.right.type 

3113 if binary.right.type._type_affinity 

3114 in (sqltypes.Numeric, sqltypes.Float) 

3115 else sqltypes.Numeric() 

3116 ), 

3117 ), 

3118 **kw, 

3119 ) 

3120 ) 

3121 else: 

3122 return ( 

3123 self.process(binary.left, **kw) 

3124 + " / " 

3125 + self.process(binary.right, **kw) 

3126 ) 

3127 

3128 def visit_floordiv_binary(self, binary, operator, **kw): 

3129 if ( 

3130 self.dialect.div_is_floordiv 

3131 and binary.right.type._type_affinity is sqltypes.Integer 

3132 ): 

3133 return ( 

3134 self.process(binary.left, **kw) 

3135 + " / " 

3136 + self.process(binary.right, **kw) 

3137 ) 

3138 else: 

3139 return "FLOOR(%s)" % ( 

3140 self.process(binary.left, **kw) 

3141 + " / " 

3142 + self.process(binary.right, **kw) 

3143 ) 

3144 

3145 def visit_is_true_unary_operator(self, element, operator, **kw): 

3146 if ( 

3147 element._is_implicitly_boolean 

3148 or self.dialect.supports_native_boolean 

3149 ): 

3150 return self.process(element.element, **kw) 

3151 else: 

3152 return "%s = 1" % self.process(element.element, **kw) 

3153 

3154 def visit_is_false_unary_operator(self, element, operator, **kw): 

3155 if ( 

3156 element._is_implicitly_boolean 

3157 or self.dialect.supports_native_boolean 

3158 ): 

3159 return "NOT %s" % self.process(element.element, **kw) 

3160 else: 

3161 return "%s = 0" % self.process(element.element, **kw) 

3162 

3163 def visit_not_match_op_binary(self, binary, operator, **kw): 

3164 return "NOT %s" % self.visit_binary( 

3165 binary, override_operator=operators.match_op 

3166 ) 

3167 

3168 def visit_not_in_op_binary(self, binary, operator, **kw): 

3169 # The brackets are required in the NOT IN operation because the empty 

3170 # case is handled using the form "(col NOT IN (null) OR 1 = 1)". 

3171 # The presence of the OR makes the brackets required. 

3172 return "(%s)" % self._generate_generic_binary( 

3173 binary, OPERATORS[operator], **kw 

3174 ) 

3175 

3176 def visit_empty_set_op_expr(self, type_, expand_op, **kw): 

3177 if expand_op is operators.not_in_op: 

3178 if len(type_) > 1: 

3179 return "(%s)) OR (1 = 1" % ( 

3180 ", ".join("NULL" for element in type_) 

3181 ) 

3182 else: 

3183 return "NULL) OR (1 = 1" 

3184 elif expand_op is operators.in_op: 

3185 if len(type_) > 1: 

3186 return "(%s)) AND (1 != 1" % ( 

3187 ", ".join("NULL" for element in type_) 

3188 ) 

3189 else: 

3190 return "NULL) AND (1 != 1" 

3191 else: 

3192 return self.visit_empty_set_expr(type_) 

3193 

3194 def visit_empty_set_expr(self, element_types, **kw): 

3195 raise NotImplementedError( 

3196 "Dialect '%s' does not support empty set expression." 

3197 % self.dialect.name 

3198 ) 

3199 

3200 def _literal_execute_expanding_parameter_literal_binds( 

3201 self, parameter, values, bind_expression_template=None 

3202 ): 

3203 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(self.dialect) 

3204 

3205 if not values: 

3206 # empty IN expression. note we don't need to use 

3207 # bind_expression_template here because there are no 

3208 # expressions to render. 

3209 

3210 if typ_dialect_impl._is_tuple_type: 

3211 replacement_expression = ( 

3212 "VALUES " if self.dialect.tuple_in_values else "" 

3213 ) + self.visit_empty_set_op_expr( 

3214 parameter.type.types, parameter.expand_op 

3215 ) 

3216 

3217 else: 

3218 replacement_expression = self.visit_empty_set_op_expr( 

3219 [parameter.type], parameter.expand_op 

3220 ) 

3221 

3222 elif typ_dialect_impl._is_tuple_type or ( 

3223 typ_dialect_impl._isnull 

3224 and isinstance(values[0], collections_abc.Sequence) 

3225 and not isinstance(values[0], (str, bytes)) 

3226 ): 

3227 if typ_dialect_impl._has_bind_expression: 

3228 raise NotImplementedError( 

3229 "bind_expression() on TupleType not supported with " 

3230 "literal_binds" 

3231 ) 

3232 

3233 replacement_expression = ( 

3234 "VALUES " if self.dialect.tuple_in_values else "" 

3235 ) + ", ".join( 

3236 "(%s)" 

3237 % ( 

3238 ", ".join( 

3239 self.render_literal_value(value, param_type) 

3240 for value, param_type in zip( 

3241 tuple_element, parameter.type.types 

3242 ) 

3243 ) 

3244 ) 

3245 for i, tuple_element in enumerate(values) 

3246 ) 

3247 else: 

3248 if bind_expression_template: 

3249 post_compile_pattern = self._post_compile_pattern 

3250 m = post_compile_pattern.search(bind_expression_template) 

3251 assert m and m.group( 

3252 2 

3253 ), "unexpected format for expanding parameter" 

3254 

3255 tok = m.group(2).split("~~") 

3256 be_left, be_right = tok[1], tok[3] 

3257 replacement_expression = ", ".join( 

3258 "%s%s%s" 

3259 % ( 

3260 be_left, 

3261 self.render_literal_value(value, parameter.type), 

3262 be_right, 

3263 ) 

3264 for value in values 

3265 ) 

3266 else: 

3267 replacement_expression = ", ".join( 

3268 self.render_literal_value(value, parameter.type) 

3269 for value in values 

3270 ) 

3271 

3272 return (), replacement_expression 

3273 

3274 def _literal_execute_expanding_parameter(self, name, parameter, values): 

3275 if parameter.literal_execute: 

3276 return self._literal_execute_expanding_parameter_literal_binds( 

3277 parameter, values 

3278 ) 

3279 

3280 dialect = self.dialect 

3281 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(dialect) 

3282 

3283 if self._numeric_binds: 

3284 bind_template = self.compilation_bindtemplate 

3285 else: 

3286 bind_template = self.bindtemplate 

3287 

3288 if ( 

3289 self.dialect._bind_typing_render_casts 

3290 and typ_dialect_impl.render_bind_cast 

3291 ): 

3292 

3293 def _render_bindtemplate(name): 

3294 return self.render_bind_cast( 

3295 parameter.type, 

3296 typ_dialect_impl, 

3297 bind_template % {"name": name}, 

3298 ) 

3299 

3300 else: 

3301 

3302 def _render_bindtemplate(name): 

3303 return bind_template % {"name": name} 

3304 

3305 if not values: 

3306 to_update = [] 

3307 if typ_dialect_impl._is_tuple_type: 

3308 replacement_expression = self.visit_empty_set_op_expr( 

3309 parameter.type.types, parameter.expand_op 

3310 ) 

3311 else: 

3312 replacement_expression = self.visit_empty_set_op_expr( 

3313 [parameter.type], parameter.expand_op 

3314 ) 

3315 

3316 elif typ_dialect_impl._is_tuple_type or ( 

3317 typ_dialect_impl._isnull 

3318 and isinstance(values[0], collections_abc.Sequence) 

3319 and not isinstance(values[0], (str, bytes)) 

3320 ): 

3321 assert not typ_dialect_impl._is_array 

3322 to_update = [ 

3323 ("%s_%s_%s" % (name, i, j), value) 

3324 for i, tuple_element in enumerate(values, 1) 

3325 for j, value in enumerate(tuple_element, 1) 

3326 ] 

3327 

3328 replacement_expression = ( 

3329 "VALUES " if dialect.tuple_in_values else "" 

3330 ) + ", ".join( 

3331 "(%s)" 

3332 % ( 

3333 ", ".join( 

3334 _render_bindtemplate( 

3335 to_update[i * len(tuple_element) + j][0] 

3336 ) 

3337 for j, value in enumerate(tuple_element) 

3338 ) 

3339 ) 

3340 for i, tuple_element in enumerate(values) 

3341 ) 

3342 else: 

3343 to_update = [ 

3344 ("%s_%s" % (name, i), value) 

3345 for i, value in enumerate(values, 1) 

3346 ] 

3347 replacement_expression = ", ".join( 

3348 _render_bindtemplate(key) for key, value in to_update 

3349 ) 

3350 

3351 return to_update, replacement_expression 

3352 

3353 def visit_binary( 

3354 self, 

3355 binary, 

3356 override_operator=None, 

3357 eager_grouping=False, 

3358 from_linter=None, 

3359 lateral_from_linter=None, 

3360 **kw, 

3361 ): 

3362 if from_linter and operators.is_comparison(binary.operator): 

3363 if lateral_from_linter is not None: 

3364 enclosing_lateral = kw["enclosing_lateral"] 

3365 lateral_from_linter.edges.update( 

3366 itertools.product( 

3367 _de_clone( 

3368 binary.left._from_objects + [enclosing_lateral] 

3369 ), 

3370 _de_clone( 

3371 binary.right._from_objects + [enclosing_lateral] 

3372 ), 

3373 ) 

3374 ) 

3375 else: 

3376 from_linter.edges.update( 

3377 itertools.product( 

3378 _de_clone(binary.left._from_objects), 

3379 _de_clone(binary.right._from_objects), 

3380 ) 

3381 ) 

3382 

3383 # don't allow "? = ?" to render 

3384 if ( 

3385 self.ansi_bind_rules 

3386 and isinstance(binary.left, elements.BindParameter) 

3387 and isinstance(binary.right, elements.BindParameter) 

3388 ): 

3389 kw["literal_execute"] = True 

3390 

3391 operator_ = override_operator or binary.operator 

3392 disp = self._get_operator_dispatch(operator_, "binary", None) 

3393 if disp: 

3394 return disp(binary, operator_, **kw) 

3395 else: 

3396 try: 

3397 opstring = OPERATORS[operator_] 

3398 except KeyError as err: 

3399 raise exc.UnsupportedCompilationError(self, operator_) from err 

3400 else: 

3401 return self._generate_generic_binary( 

3402 binary, 

3403 opstring, 

3404 from_linter=from_linter, 

3405 lateral_from_linter=lateral_from_linter, 

3406 **kw, 

3407 ) 

3408 

3409 def visit_function_as_comparison_op_binary(self, element, operator, **kw): 

3410 return self.process(element.sql_function, **kw) 

3411 

3412 def visit_mod_binary(self, binary, operator, **kw): 

3413 if self.preparer._double_percents: 

3414 return ( 

3415 self.process(binary.left, **kw) 

3416 + " %% " 

3417 + self.process(binary.right, **kw) 

3418 ) 

3419 else: 

3420 return ( 

3421 self.process(binary.left, **kw) 

3422 + " % " 

3423 + self.process(binary.right, **kw) 

3424 ) 

3425 

3426 def visit_custom_op_binary(self, element, operator, **kw): 

3427 kw["eager_grouping"] = operator.eager_grouping 

3428 return self._generate_generic_binary( 

3429 element, 

3430 " " + self.escape_literal_column(operator.opstring) + " ", 

3431 **kw, 

3432 ) 

3433 

3434 def visit_custom_op_unary_operator(self, element, operator, **kw): 

3435 return self._generate_generic_unary_operator( 

3436 element, self.escape_literal_column(operator.opstring) + " ", **kw 

3437 ) 

3438 

3439 def visit_custom_op_unary_modifier(self, element, operator, **kw): 

3440 return self._generate_generic_unary_modifier( 

3441 element, " " + self.escape_literal_column(operator.opstring), **kw 

3442 ) 

3443 

3444 def _generate_generic_binary( 

3445 self, 

3446 binary: BinaryExpression[Any], 

3447 opstring: str, 

3448 eager_grouping: bool = False, 

3449 **kw: Any, 

3450 ) -> str: 

3451 _in_operator_expression = kw.get("_in_operator_expression", False) 

3452 

3453 kw["_in_operator_expression"] = True 

3454 kw["_binary_op"] = binary.operator 

3455 text = ( 

3456 binary.left._compiler_dispatch( 

3457 self, eager_grouping=eager_grouping, **kw 

3458 ) 

3459 + opstring 

3460 + binary.right._compiler_dispatch( 

3461 self, eager_grouping=eager_grouping, **kw 

3462 ) 

3463 ) 

3464 

3465 if _in_operator_expression and eager_grouping: 

3466 text = "(%s)" % text 

3467 return text 

3468 

3469 def _generate_generic_unary_operator(self, unary, opstring, **kw): 

3470 return opstring + unary.element._compiler_dispatch(self, **kw) 

3471 

3472 def _generate_generic_unary_modifier(self, unary, opstring, **kw): 

3473 return unary.element._compiler_dispatch(self, **kw) + opstring 

3474 

3475 @util.memoized_property 

3476 def _like_percent_literal(self): 

3477 return elements.literal_column("'%'", type_=sqltypes.STRINGTYPE) 

3478 

3479 def visit_ilike_case_insensitive_operand(self, element, **kw): 

3480 return f"lower({element.element._compiler_dispatch(self, **kw)})" 

3481 

3482 def visit_contains_op_binary(self, binary, operator, **kw): 

3483 binary = binary._clone() 

3484 percent = self._like_percent_literal 

3485 binary.right = percent.concat(binary.right).concat(percent) 

3486 return self.visit_like_op_binary(binary, operator, **kw) 

3487 

3488 def visit_not_contains_op_binary(self, binary, operator, **kw): 

3489 binary = binary._clone() 

3490 percent = self._like_percent_literal 

3491 binary.right = percent.concat(binary.right).concat(percent) 

3492 return self.visit_not_like_op_binary(binary, operator, **kw) 

3493 

3494 def visit_icontains_op_binary(self, binary, operator, **kw): 

3495 binary = binary._clone() 

3496 percent = self._like_percent_literal 

3497 binary.left = ilike_case_insensitive(binary.left) 

3498 binary.right = percent.concat( 

3499 ilike_case_insensitive(binary.right) 

3500 ).concat(percent) 

3501 return self.visit_ilike_op_binary(binary, operator, **kw) 

3502 

3503 def visit_not_icontains_op_binary(self, binary, operator, **kw): 

3504 binary = binary._clone() 

3505 percent = self._like_percent_literal 

3506 binary.left = ilike_case_insensitive(binary.left) 

3507 binary.right = percent.concat( 

3508 ilike_case_insensitive(binary.right) 

3509 ).concat(percent) 

3510 return self.visit_not_ilike_op_binary(binary, operator, **kw) 

3511 

3512 def visit_startswith_op_binary(self, binary, operator, **kw): 

3513 binary = binary._clone() 

3514 percent = self._like_percent_literal 

3515 binary.right = percent._rconcat(binary.right) 

3516 return self.visit_like_op_binary(binary, operator, **kw) 

3517 

3518 def visit_not_startswith_op_binary(self, binary, operator, **kw): 

3519 binary = binary._clone() 

3520 percent = self._like_percent_literal 

3521 binary.right = percent._rconcat(binary.right) 

3522 return self.visit_not_like_op_binary(binary, operator, **kw) 

3523 

3524 def visit_istartswith_op_binary(self, binary, operator, **kw): 

3525 binary = binary._clone() 

3526 percent = self._like_percent_literal 

3527 binary.left = ilike_case_insensitive(binary.left) 

3528 binary.right = percent._rconcat(ilike_case_insensitive(binary.right)) 

3529 return self.visit_ilike_op_binary(binary, operator, **kw) 

3530 

3531 def visit_not_istartswith_op_binary(self, binary, operator, **kw): 

3532 binary = binary._clone() 

3533 percent = self._like_percent_literal 

3534 binary.left = ilike_case_insensitive(binary.left) 

3535 binary.right = percent._rconcat(ilike_case_insensitive(binary.right)) 

3536 return self.visit_not_ilike_op_binary(binary, operator, **kw) 

3537 

3538 def visit_endswith_op_binary(self, binary, operator, **kw): 

3539 binary = binary._clone() 

3540 percent = self._like_percent_literal 

3541 binary.right = percent.concat(binary.right) 

3542 return self.visit_like_op_binary(binary, operator, **kw) 

3543 

3544 def visit_not_endswith_op_binary(self, binary, operator, **kw): 

3545 binary = binary._clone() 

3546 percent = self._like_percent_literal 

3547 binary.right = percent.concat(binary.right) 

3548 return self.visit_not_like_op_binary(binary, operator, **kw) 

3549 

3550 def visit_iendswith_op_binary(self, binary, operator, **kw): 

3551 binary = binary._clone() 

3552 percent = self._like_percent_literal 

3553 binary.left = ilike_case_insensitive(binary.left) 

3554 binary.right = percent.concat(ilike_case_insensitive(binary.right)) 

3555 return self.visit_ilike_op_binary(binary, operator, **kw) 

3556 

3557 def visit_not_iendswith_op_binary(self, binary, operator, **kw): 

3558 binary = binary._clone() 

3559 percent = self._like_percent_literal 

3560 binary.left = ilike_case_insensitive(binary.left) 

3561 binary.right = percent.concat(ilike_case_insensitive(binary.right)) 

3562 return self.visit_not_ilike_op_binary(binary, operator, **kw) 

3563 

3564 def visit_like_op_binary(self, binary, operator, **kw): 

3565 escape = binary.modifiers.get("escape", None) 

3566 

3567 return "%s LIKE %s" % ( 

3568 binary.left._compiler_dispatch(self, **kw), 

3569 binary.right._compiler_dispatch(self, **kw), 

3570 ) + ( 

3571 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) 

3572 if escape is not None 

3573 else "" 

3574 ) 

3575 

3576 def visit_not_like_op_binary(self, binary, operator, **kw): 

3577 escape = binary.modifiers.get("escape", None) 

3578 return "%s NOT LIKE %s" % ( 

3579 binary.left._compiler_dispatch(self, **kw), 

3580 binary.right._compiler_dispatch(self, **kw), 

3581 ) + ( 

3582 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) 

3583 if escape is not None 

3584 else "" 

3585 ) 

3586 

3587 def visit_ilike_op_binary(self, binary, operator, **kw): 

3588 if operator is operators.ilike_op: 

3589 binary = binary._clone() 

3590 binary.left = ilike_case_insensitive(binary.left) 

3591 binary.right = ilike_case_insensitive(binary.right) 

3592 # else we assume ilower() has been applied 

3593 

3594 return self.visit_like_op_binary(binary, operator, **kw) 

3595 

3596 def visit_not_ilike_op_binary(self, binary, operator, **kw): 

3597 if operator is operators.not_ilike_op: 

3598 binary = binary._clone() 

3599 binary.left = ilike_case_insensitive(binary.left) 

3600 binary.right = ilike_case_insensitive(binary.right) 

3601 # else we assume ilower() has been applied 

3602 

3603 return self.visit_not_like_op_binary(binary, operator, **kw) 

3604 

3605 def visit_between_op_binary(self, binary, operator, **kw): 

3606 symmetric = binary.modifiers.get("symmetric", False) 

3607 return self._generate_generic_binary( 

3608 binary, " BETWEEN SYMMETRIC " if symmetric else " BETWEEN ", **kw 

3609 ) 

3610 

3611 def visit_not_between_op_binary(self, binary, operator, **kw): 

3612 symmetric = binary.modifiers.get("symmetric", False) 

3613 return self._generate_generic_binary( 

3614 binary, 

3615 " NOT BETWEEN SYMMETRIC " if symmetric else " NOT BETWEEN ", 

3616 **kw, 

3617 ) 

3618 

3619 def visit_regexp_match_op_binary( 

3620 self, binary: BinaryExpression[Any], operator: Any, **kw: Any 

3621 ) -> str: 

3622 raise exc.CompileError( 

3623 "%s dialect does not support regular expressions" 

3624 % self.dialect.name 

3625 ) 

3626 

3627 def visit_not_regexp_match_op_binary( 

3628 self, binary: BinaryExpression[Any], operator: Any, **kw: Any 

3629 ) -> str: 

3630 raise exc.CompileError( 

3631 "%s dialect does not support regular expressions" 

3632 % self.dialect.name 

3633 ) 

3634 

3635 def visit_regexp_replace_op_binary( 

3636 self, binary: BinaryExpression[Any], operator: Any, **kw: Any 

3637 ) -> str: 

3638 raise exc.CompileError( 

3639 "%s dialect does not support regular expression replacements" 

3640 % self.dialect.name 

3641 ) 

3642 

3643 def visit_bindparam( 

3644 self, 

3645 bindparam, 

3646 within_columns_clause=False, 

3647 literal_binds=False, 

3648 skip_bind_expression=False, 

3649 literal_execute=False, 

3650 render_postcompile=False, 

3651 **kwargs, 

3652 ): 

3653 

3654 if not skip_bind_expression: 

3655 impl = bindparam.type.dialect_impl(self.dialect) 

3656 if impl._has_bind_expression: 

3657 bind_expression = impl.bind_expression(bindparam) 

3658 wrapped = self.process( 

3659 bind_expression, 

3660 skip_bind_expression=True, 

3661 within_columns_clause=within_columns_clause, 

3662 literal_binds=literal_binds and not bindparam.expanding, 

3663 literal_execute=literal_execute, 

3664 render_postcompile=render_postcompile, 

3665 **kwargs, 

3666 ) 

3667 if bindparam.expanding: 

3668 # for postcompile w/ expanding, move the "wrapped" part 

3669 # of this into the inside 

3670 

3671 m = re.match( 

3672 r"^(.*)\(__\[POSTCOMPILE_(\S+?)\]\)(.*)$", wrapped 

3673 ) 

3674 assert m, "unexpected format for expanding parameter" 

3675 wrapped = "(__[POSTCOMPILE_%s~~%s~~REPL~~%s~~])" % ( 

3676 m.group(2), 

3677 m.group(1), 

3678 m.group(3), 

3679 ) 

3680 

3681 if literal_binds: 

3682 ret = self.render_literal_bindparam( 

3683 bindparam, 

3684 within_columns_clause=True, 

3685 bind_expression_template=wrapped, 

3686 **kwargs, 

3687 ) 

3688 return f"({ret})" 

3689 

3690 return wrapped 

3691 

3692 if not literal_binds: 

3693 literal_execute = ( 

3694 literal_execute 

3695 or bindparam.literal_execute 

3696 or (within_columns_clause and self.ansi_bind_rules) 

3697 ) 

3698 post_compile = literal_execute or bindparam.expanding 

3699 else: 

3700 post_compile = False 

3701 

3702 if literal_binds: 

3703 ret = self.render_literal_bindparam( 

3704 bindparam, within_columns_clause=True, **kwargs 

3705 ) 

3706 if bindparam.expanding: 

3707 ret = f"({ret})" 

3708 return ret 

3709 

3710 name = self._truncate_bindparam(bindparam) 

3711 

3712 if name in self.binds: 

3713 existing = self.binds[name] 

3714 if existing is not bindparam: 

3715 if ( 

3716 (existing.unique or bindparam.unique) 

3717 and not existing.proxy_set.intersection( 

3718 bindparam.proxy_set 

3719 ) 

3720 and not existing._cloned_set.intersection( 

3721 bindparam._cloned_set 

3722 ) 

3723 ): 

3724 raise exc.CompileError( 

3725 "Bind parameter '%s' conflicts with " 

3726 "unique bind parameter of the same name" % name 

3727 ) 

3728 elif existing.expanding != bindparam.expanding: 

3729 raise exc.CompileError( 

3730 "Can't reuse bound parameter name '%s' in both " 

3731 "'expanding' (e.g. within an IN expression) and " 

3732 "non-expanding contexts. If this parameter is to " 

3733 "receive a list/array value, set 'expanding=True' on " 

3734 "it for expressions that aren't IN, otherwise use " 

3735 "a different parameter name." % (name,) 

3736 ) 

3737 elif existing._is_crud or bindparam._is_crud: 

3738 if existing._is_crud and bindparam._is_crud: 

3739 # TODO: this condition is not well understood. 

3740 # see tests in test/sql/test_update.py 

3741 raise exc.CompileError( 

3742 "Encountered unsupported case when compiling an " 

3743 "INSERT or UPDATE statement. If this is a " 

3744 "multi-table " 

3745 "UPDATE statement, please provide string-named " 

3746 "arguments to the " 

3747 "values() method with distinct names; support for " 

3748 "multi-table UPDATE statements that " 

3749 "target multiple tables for UPDATE is very " 

3750 "limited", 

3751 ) 

3752 else: 

3753 raise exc.CompileError( 

3754 f"bindparam() name '{bindparam.key}' is reserved " 

3755 "for automatic usage in the VALUES or SET " 

3756 "clause of this " 

3757 "insert/update statement. Please use a " 

3758 "name other than column name when using " 

3759 "bindparam() " 

3760 "with insert() or update() (for example, " 

3761 f"'b_{bindparam.key}')." 

3762 ) 

3763 

3764 self.binds[bindparam.key] = self.binds[name] = bindparam 

3765 

3766 # if we are given a cache key that we're going to match against, 

3767 # relate the bindparam here to one that is most likely present 

3768 # in the "extracted params" portion of the cache key. this is used 

3769 # to set up a positional mapping that is used to determine the 

3770 # correct parameters for a subsequent use of this compiled with 

3771 # a different set of parameter values. here, we accommodate for 

3772 # parameters that may have been cloned both before and after the cache 

3773 # key was been generated. 

3774 ckbm_tuple = self._cache_key_bind_match 

3775 

3776 if ckbm_tuple: 

3777 ckbm, cksm = ckbm_tuple 

3778 for bp in bindparam._cloned_set: 

3779 if bp.key in cksm: 

3780 cb = cksm[bp.key] 

3781 ckbm[cb].append(bindparam) 

3782 

3783 if bindparam.isoutparam: 

3784 self.has_out_parameters = True 

3785 

3786 if post_compile: 

3787 if render_postcompile: 

3788 self._render_postcompile = True 

3789 

3790 if literal_execute: 

3791 self.literal_execute_params |= {bindparam} 

3792 else: 

3793 self.post_compile_params |= {bindparam} 

3794 

3795 ret = self.bindparam_string( 

3796 name, 

3797 post_compile=post_compile, 

3798 expanding=bindparam.expanding, 

3799 bindparam_type=bindparam.type, 

3800 **kwargs, 

3801 ) 

3802 

3803 if bindparam.expanding: 

3804 ret = f"({ret})" 

3805 

3806 return ret 

3807 

3808 def render_bind_cast(self, type_, dbapi_type, sqltext): 

3809 raise NotImplementedError() 

3810 

3811 def render_literal_bindparam( 

3812 self, 

3813 bindparam, 

3814 render_literal_value=NO_ARG, 

3815 bind_expression_template=None, 

3816 **kw, 

3817 ): 

3818 if render_literal_value is not NO_ARG: 

3819 value = render_literal_value 

3820 else: 

3821 if bindparam.value is None and bindparam.callable is None: 

3822 op = kw.get("_binary_op", None) 

3823 if op and op not in (operators.is_, operators.is_not): 

3824 util.warn_limited( 

3825 "Bound parameter '%s' rendering literal NULL in a SQL " 

3826 "expression; comparisons to NULL should not use " 

3827 "operators outside of 'is' or 'is not'", 

3828 (bindparam.key,), 

3829 ) 

3830 return self.process(sqltypes.NULLTYPE, **kw) 

3831 value = bindparam.effective_value 

3832 

3833 if bindparam.expanding: 

3834 leep = self._literal_execute_expanding_parameter_literal_binds 

3835 to_update, replacement_expr = leep( 

3836 bindparam, 

3837 value, 

3838 bind_expression_template=bind_expression_template, 

3839 ) 

3840 return replacement_expr 

3841 else: 

3842 return self.render_literal_value(value, bindparam.type) 

3843 

3844 def render_literal_value( 

3845 self, value: Any, type_: sqltypes.TypeEngine[Any] 

3846 ) -> str: 

3847 """Render the value of a bind parameter as a quoted literal. 

3848 

3849 This is used for statement sections that do not accept bind parameters 

3850 on the target driver/database. 

3851 

3852 This should be implemented by subclasses using the quoting services 

3853 of the DBAPI. 

3854 

3855 """ 

3856 

3857 if value is None and not type_.should_evaluate_none: 

3858 # issue #10535 - handle NULL in the compiler without placing 

3859 # this onto each type, except for "evaluate None" types 

3860 # (e.g. JSON) 

3861 return self.process(elements.Null._instance()) 

3862 

3863 processor = type_._cached_literal_processor(self.dialect) 

3864 if processor: 

3865 try: 

3866 return processor(value) 

3867 except Exception as e: 

3868 raise exc.CompileError( 

3869 f"Could not render literal value " 

3870 f'"{sql_util._repr_single_value(value)}" ' 

3871 f"with datatype " 

3872 f"{type_}; see parent stack trace for " 

3873 "more detail." 

3874 ) from e 

3875 

3876 else: 

3877 raise exc.CompileError( 

3878 f"No literal value renderer is available for literal value " 

3879 f'"{sql_util._repr_single_value(value)}" ' 

3880 f"with datatype {type_}" 

3881 ) 

3882 

3883 def _truncate_bindparam(self, bindparam): 

3884 if bindparam in self.bind_names: 

3885 return self.bind_names[bindparam] 

3886 

3887 bind_name = bindparam.key 

3888 if isinstance(bind_name, elements._truncated_label): 

3889 bind_name = self._truncated_identifier("bindparam", bind_name) 

3890 

3891 # add to bind_names for translation 

3892 self.bind_names[bindparam] = bind_name 

3893 

3894 return bind_name 

3895 

3896 def _truncated_identifier( 

3897 self, ident_class: str, name: _truncated_label 

3898 ) -> str: 

3899 if (ident_class, name) in self.truncated_names: 

3900 return self.truncated_names[(ident_class, name)] 

3901 

3902 anonname = name.apply_map(self.anon_map) 

3903 

3904 if len(anonname) > self.label_length - 6: 

3905 counter = self._truncated_counters.get(ident_class, 1) 

3906 truncname = ( 

3907 anonname[0 : max(self.label_length - 6, 0)] 

3908 + "_" 

3909 + hex(counter)[2:] 

3910 ) 

3911 self._truncated_counters[ident_class] = counter + 1 

3912 else: 

3913 truncname = anonname 

3914 self.truncated_names[(ident_class, name)] = truncname 

3915 return truncname 

3916 

3917 def _anonymize(self, name: str) -> str: 

3918 return name % self.anon_map 

3919 

3920 def bindparam_string( 

3921 self, 

3922 name: str, 

3923 post_compile: bool = False, 

3924 expanding: bool = False, 

3925 escaped_from: Optional[str] = None, 

3926 bindparam_type: Optional[TypeEngine[Any]] = None, 

3927 accumulate_bind_names: Optional[Set[str]] = None, 

3928 visited_bindparam: Optional[List[str]] = None, 

3929 **kw: Any, 

3930 ) -> str: 

3931 # TODO: accumulate_bind_names is passed by crud.py to gather 

3932 # names on a per-value basis, visited_bindparam is passed by 

3933 # visit_insert() to collect all parameters in the statement. 

3934 # see if this gathering can be simplified somehow 

3935 if accumulate_bind_names is not None: 

3936 accumulate_bind_names.add(name) 

3937 if visited_bindparam is not None: 

3938 visited_bindparam.append(name) 

3939 

3940 if not escaped_from: 

3941 if self._bind_translate_re.search(name): 

3942 # not quite the translate use case as we want to 

3943 # also get a quick boolean if we even found 

3944 # unusual characters in the name 

3945 new_name = self._bind_translate_re.sub( 

3946 lambda m: self._bind_translate_chars[m.group(0)], 

3947 name, 

3948 ) 

3949 escaped_from = name 

3950 name = new_name 

3951 

3952 if escaped_from: 

3953 self.escaped_bind_names = self.escaped_bind_names.union( 

3954 {escaped_from: name} 

3955 ) 

3956 if post_compile: 

3957 ret = "__[POSTCOMPILE_%s]" % name 

3958 if expanding: 

3959 # for expanding, bound parameters or literal values will be 

3960 # rendered per item 

3961 return ret 

3962 

3963 # otherwise, for non-expanding "literal execute", apply 

3964 # bind casts as determined by the datatype 

3965 if bindparam_type is not None: 

3966 type_impl = bindparam_type._unwrapped_dialect_impl( 

3967 self.dialect 

3968 ) 

3969 if type_impl.render_literal_cast: 

3970 ret = self.render_bind_cast(bindparam_type, type_impl, ret) 

3971 return ret 

3972 elif self.state is CompilerState.COMPILING: 

3973 ret = self.compilation_bindtemplate % {"name": name} 

3974 else: 

3975 ret = self.bindtemplate % {"name": name} 

3976 

3977 if ( 

3978 bindparam_type is not None 

3979 and self.dialect._bind_typing_render_casts 

3980 ): 

3981 type_impl = bindparam_type._unwrapped_dialect_impl(self.dialect) 

3982 if type_impl.render_bind_cast: 

3983 ret = self.render_bind_cast(bindparam_type, type_impl, ret) 

3984 

3985 return ret 

3986 

3987 def _dispatch_independent_ctes(self, stmt, kw): 

3988 local_kw = kw.copy() 

3989 local_kw.pop("cte_opts", None) 

3990 for cte, opt in zip( 

3991 stmt._independent_ctes, stmt._independent_ctes_opts 

3992 ): 

3993 cte._compiler_dispatch(self, cte_opts=opt, **local_kw) 

3994 

3995 def visit_cte( 

3996 self, 

3997 cte: CTE, 

3998 asfrom: bool = False, 

3999 ashint: bool = False, 

4000 fromhints: Optional[_FromHintsType] = None, 

4001 visiting_cte: Optional[CTE] = None, 

4002 from_linter: Optional[FromLinter] = None, 

4003 cte_opts: selectable._CTEOpts = selectable._CTEOpts(False), 

4004 **kwargs: Any, 

4005 ) -> Optional[str]: 

4006 self_ctes = self._init_cte_state() 

4007 assert self_ctes is self.ctes 

4008 

4009 kwargs["visiting_cte"] = cte 

4010 

4011 cte_name = cte.name 

4012 

4013 if isinstance(cte_name, elements._truncated_label): 

4014 cte_name = self._truncated_identifier("alias", cte_name) 

4015 

4016 is_new_cte = True 

4017 embedded_in_current_named_cte = False 

4018 

4019 _reference_cte = cte._get_reference_cte() 

4020 

4021 nesting = cte.nesting or cte_opts.nesting 

4022 

4023 # check for CTE already encountered 

4024 if _reference_cte in self.level_name_by_cte: 

4025 cte_level, _, existing_cte_opts = self.level_name_by_cte[ 

4026 _reference_cte 

4027 ] 

4028 assert _ == cte_name 

4029 

4030 cte_level_name = (cte_level, cte_name) 

4031 existing_cte = self.ctes_by_level_name[cte_level_name] 

4032 

4033 # check if we are receiving it here with a specific 

4034 # "nest_here" location; if so, move it to this location 

4035 

4036 if cte_opts.nesting: 

4037 if existing_cte_opts.nesting: 

4038 raise exc.CompileError( 

4039 "CTE is stated as 'nest_here' in " 

4040 "more than one location" 

4041 ) 

4042 

4043 old_level_name = (cte_level, cte_name) 

4044 cte_level = len(self.stack) if nesting else 1 

4045 cte_level_name = new_level_name = (cte_level, cte_name) 

4046 

4047 del self.ctes_by_level_name[old_level_name] 

4048 self.ctes_by_level_name[new_level_name] = existing_cte 

4049 self.level_name_by_cte[_reference_cte] = new_level_name + ( 

4050 cte_opts, 

4051 ) 

4052 

4053 else: 

4054 cte_level = len(self.stack) if nesting else 1 

4055 cte_level_name = (cte_level, cte_name) 

4056 

4057 if cte_level_name in self.ctes_by_level_name: 

4058 existing_cte = self.ctes_by_level_name[cte_level_name] 

4059 else: 

4060 existing_cte = None 

4061 

4062 if existing_cte is not None: 

4063 embedded_in_current_named_cte = visiting_cte is existing_cte 

4064 

4065 # we've generated a same-named CTE that we are enclosed in, 

4066 # or this is the same CTE. just return the name. 

4067 if cte is existing_cte._restates or cte is existing_cte: 

4068 is_new_cte = False 

4069 elif existing_cte is cte._restates: 

4070 # we've generated a same-named CTE that is 

4071 # enclosed in us - we take precedence, so 

4072 # discard the text for the "inner". 

4073 del self_ctes[existing_cte] 

4074 

4075 existing_cte_reference_cte = existing_cte._get_reference_cte() 

4076 

4077 assert existing_cte_reference_cte is _reference_cte 

4078 assert existing_cte_reference_cte is existing_cte 

4079 

4080 del self.level_name_by_cte[existing_cte_reference_cte] 

4081 else: 

4082 if ( 

4083 # if the two CTEs have the same hash, which we expect 

4084 # here means that one/both is an annotated of the other 

4085 (hash(cte) == hash(existing_cte)) 

4086 # or... 

4087 or ( 

4088 ( 

4089 # if they are clones, i.e. they came from the ORM 

4090 # or some other visit method 

4091 cte._is_clone_of is not None 

4092 or existing_cte._is_clone_of is not None 

4093 ) 

4094 # and are deep-copy identical 

4095 and cte.compare(existing_cte) 

4096 ) 

4097 ): 

4098 # then consider these two CTEs the same 

4099 is_new_cte = False 

4100 else: 

4101 # otherwise these are two CTEs that either will render 

4102 # differently, or were indicated separately by the user, 

4103 # with the same name 

4104 raise exc.CompileError( 

4105 "Multiple, unrelated CTEs found with " 

4106 "the same name: %r" % cte_name 

4107 ) 

4108 

4109 if not asfrom and not is_new_cte: 

4110 return None 

4111 

4112 if cte._cte_alias is not None: 

4113 pre_alias_cte = cte._cte_alias 

4114 cte_pre_alias_name = cte._cte_alias.name 

4115 if isinstance(cte_pre_alias_name, elements._truncated_label): 

4116 cte_pre_alias_name = self._truncated_identifier( 

4117 "alias", cte_pre_alias_name 

4118 ) 

4119 else: 

4120 pre_alias_cte = cte 

4121 cte_pre_alias_name = None 

4122 

4123 if is_new_cte: 

4124 self.ctes_by_level_name[cte_level_name] = cte 

4125 self.level_name_by_cte[_reference_cte] = cte_level_name + ( 

4126 cte_opts, 

4127 ) 

4128 

4129 if pre_alias_cte not in self.ctes: 

4130 self.visit_cte(pre_alias_cte, **kwargs) 

4131 

4132 if not cte_pre_alias_name and cte not in self_ctes: 

4133 if cte.recursive: 

4134 self.ctes_recursive = True 

4135 text = self.preparer.format_alias(cte, cte_name) 

4136 if cte.recursive or cte.element.name_cte_columns: 

4137 col_source = cte.element 

4138 

4139 # TODO: can we get at the .columns_plus_names collection 

4140 # that is already (or will be?) generated for the SELECT 

4141 # rather than calling twice? 

4142 recur_cols = [ 

4143 # TODO: proxy_name is not technically safe, 

4144 # see test_cte-> 

4145 # test_with_recursive_no_name_currently_buggy. not 

4146 # clear what should be done with such a case 

4147 fallback_label_name or proxy_name 

4148 for ( 

4149 _, 

4150 proxy_name, 

4151 fallback_label_name, 

4152 c, 

4153 repeated, 

4154 ) in (col_source._generate_columns_plus_names(True)) 

4155 if not repeated 

4156 ] 

4157 

4158 text += "(%s)" % ( 

4159 ", ".join( 

4160 self.preparer.format_label_name( 

4161 ident, anon_map=self.anon_map 

4162 ) 

4163 for ident in recur_cols 

4164 ) 

4165 ) 

4166 

4167 assert kwargs.get("subquery", False) is False 

4168 

4169 if not self.stack: 

4170 # toplevel, this is a stringify of the 

4171 # cte directly. just compile the inner 

4172 # the way alias() does. 

4173 return cte.element._compiler_dispatch( 

4174 self, asfrom=asfrom, **kwargs 

4175 ) 

4176 else: 

4177 prefixes = self._generate_prefixes( 

4178 cte, cte._prefixes, **kwargs 

4179 ) 

4180 inner = cte.element._compiler_dispatch( 

4181 self, asfrom=True, **kwargs 

4182 ) 

4183 

4184 text += " AS %s\n(%s)" % (prefixes, inner) 

4185 

4186 if cte._suffixes: 

4187 text += " " + self._generate_prefixes( 

4188 cte, cte._suffixes, **kwargs 

4189 ) 

4190 

4191 self_ctes[cte] = text 

4192 

4193 if asfrom: 

4194 if from_linter: 

4195 from_linter.froms[cte._de_clone()] = cte_name 

4196 

4197 if not is_new_cte and embedded_in_current_named_cte: 

4198 return self.preparer.format_alias(cte, cte_name) 

4199 

4200 if cte_pre_alias_name: 

4201 text = self.preparer.format_alias(cte, cte_pre_alias_name) 

4202 if self.preparer._requires_quotes(cte_name): 

4203 cte_name = self.preparer.quote(cte_name) 

4204 text += self.get_render_as_alias_suffix(cte_name) 

4205 return text # type: ignore[no-any-return] 

4206 else: 

4207 return self.preparer.format_alias(cte, cte_name) 

4208 

4209 return None 

4210 

4211 def visit_table_valued_alias(self, element, **kw): 

4212 if element.joins_implicitly: 

4213 kw["from_linter"] = None 

4214 if element._is_lateral: 

4215 return self.visit_lateral(element, **kw) 

4216 else: 

4217 return self.visit_alias(element, **kw) 

4218 

4219 def visit_table_valued_column(self, element, **kw): 

4220 return self.visit_column(element, **kw) 

4221 

4222 def visit_alias( 

4223 self, 

4224 alias, 

4225 asfrom=False, 

4226 ashint=False, 

4227 iscrud=False, 

4228 fromhints=None, 

4229 subquery=False, 

4230 lateral=False, 

4231 enclosing_alias=None, 

4232 from_linter=None, 

4233 **kwargs, 

4234 ): 

4235 if lateral: 

4236 if "enclosing_lateral" not in kwargs: 

4237 # if lateral is set and enclosing_lateral is not 

4238 # present, we assume we are being called directly 

4239 # from visit_lateral() and we need to set enclosing_lateral. 

4240 assert alias._is_lateral 

4241 kwargs["enclosing_lateral"] = alias 

4242 

4243 # for lateral objects, we track a second from_linter that is... 

4244 # lateral! to the level above us. 

4245 if ( 

4246 from_linter 

4247 and "lateral_from_linter" not in kwargs 

4248 and "enclosing_lateral" in kwargs 

4249 ): 

4250 kwargs["lateral_from_linter"] = from_linter 

4251 

4252 if enclosing_alias is not None and enclosing_alias.element is alias: 

4253 inner = alias.element._compiler_dispatch( 

4254 self, 

4255 asfrom=asfrom, 

4256 ashint=ashint, 

4257 iscrud=iscrud, 

4258 fromhints=fromhints, 

4259 lateral=lateral, 

4260 enclosing_alias=alias, 

4261 **kwargs, 

4262 ) 

4263 if subquery and (asfrom or lateral): 

4264 inner = "(%s)" % (inner,) 

4265 return inner 

4266 else: 

4267 kwargs["enclosing_alias"] = alias 

4268 

4269 if asfrom or ashint: 

4270 if isinstance(alias.name, elements._truncated_label): 

4271 alias_name = self._truncated_identifier("alias", alias.name) 

4272 else: 

4273 alias_name = alias.name 

4274 

4275 if ashint: 

4276 return self.preparer.format_alias(alias, alias_name) 

4277 elif asfrom: 

4278 if from_linter: 

4279 from_linter.froms[alias._de_clone()] = alias_name 

4280 

4281 inner = alias.element._compiler_dispatch( 

4282 self, asfrom=True, lateral=lateral, **kwargs 

4283 ) 

4284 if subquery: 

4285 inner = "(%s)" % (inner,) 

4286 

4287 ret = inner + self.get_render_as_alias_suffix( 

4288 self.preparer.format_alias(alias, alias_name) 

4289 ) 

4290 

4291 if alias._supports_derived_columns and alias._render_derived: 

4292 ret += "(%s)" % ( 

4293 ", ".join( 

4294 "%s%s" 

4295 % ( 

4296 self.preparer.quote(col.name), 

4297 ( 

4298 " %s" 

4299 % self.dialect.type_compiler_instance.process( 

4300 col.type, **kwargs 

4301 ) 

4302 if alias._render_derived_w_types 

4303 else "" 

4304 ), 

4305 ) 

4306 for col in alias.c 

4307 ) 

4308 ) 

4309 

4310 if fromhints and alias in fromhints: 

4311 ret = self.format_from_hint_text( 

4312 ret, alias, fromhints[alias], iscrud 

4313 ) 

4314 

4315 return ret 

4316 else: 

4317 # note we cancel the "subquery" flag here as well 

4318 return alias.element._compiler_dispatch( 

4319 self, lateral=lateral, **kwargs 

4320 ) 

4321 

4322 def visit_subquery(self, subquery, **kw): 

4323 kw["subquery"] = True 

4324 return self.visit_alias(subquery, **kw) 

4325 

4326 def visit_lateral(self, lateral_, **kw): 

4327 kw["lateral"] = True 

4328 return "LATERAL %s" % self.visit_alias(lateral_, **kw) 

4329 

4330 def visit_tablesample(self, tablesample, asfrom=False, **kw): 

4331 text = "%s TABLESAMPLE %s" % ( 

4332 self.visit_alias(tablesample, asfrom=True, **kw), 

4333 tablesample._get_method()._compiler_dispatch(self, **kw), 

4334 ) 

4335 

4336 if tablesample.seed is not None: 

4337 text += " REPEATABLE (%s)" % ( 

4338 tablesample.seed._compiler_dispatch(self, **kw) 

4339 ) 

4340 

4341 return text 

4342 

4343 def _render_values(self, element, **kw): 

4344 kw.setdefault("literal_binds", element.literal_binds) 

4345 tuples = ", ".join( 

4346 self.process( 

4347 elements.Tuple( 

4348 types=element._column_types, *elem 

4349 ).self_group(), 

4350 **kw, 

4351 ) 

4352 for chunk in element._data 

4353 for elem in chunk 

4354 ) 

4355 return f"VALUES {tuples}" 

4356 

4357 def visit_values( 

4358 self, element, asfrom=False, from_linter=None, visiting_cte=None, **kw 

4359 ): 

4360 

4361 if element._independent_ctes: 

4362 self._dispatch_independent_ctes(element, kw) 

4363 

4364 v = self._render_values(element, **kw) 

4365 

4366 if element._unnamed: 

4367 name = None 

4368 elif isinstance(element.name, elements._truncated_label): 

4369 name = self._truncated_identifier("values", element.name) 

4370 else: 

4371 name = element.name 

4372 

4373 if element._is_lateral: 

4374 lateral = "LATERAL " 

4375 else: 

4376 lateral = "" 

4377 

4378 if asfrom: 

4379 if from_linter: 

4380 from_linter.froms[element._de_clone()] = ( 

4381 name if name is not None else "(unnamed VALUES element)" 

4382 ) 

4383 

4384 if visiting_cte is not None and visiting_cte.element is element: 

4385 if element._is_lateral: 

4386 raise exc.CompileError( 

4387 "Can't use a LATERAL VALUES expression inside of a CTE" 

4388 ) 

4389 elif name: 

4390 kw["include_table"] = False 

4391 v = "%s(%s)%s (%s)" % ( 

4392 lateral, 

4393 v, 

4394 self.get_render_as_alias_suffix(self.preparer.quote(name)), 

4395 ( 

4396 ", ".join( 

4397 c._compiler_dispatch(self, **kw) 

4398 for c in element.columns 

4399 ) 

4400 ), 

4401 ) 

4402 else: 

4403 v = "%s(%s)" % (lateral, v) 

4404 return v 

4405 

4406 def visit_scalar_values(self, element, **kw): 

4407 return f"({self._render_values(element, **kw)})" 

4408 

4409 def get_render_as_alias_suffix(self, alias_name_text): 

4410 return " AS " + alias_name_text 

4411 

4412 def _add_to_result_map( 

4413 self, 

4414 keyname: str, 

4415 name: str, 

4416 objects: Tuple[Any, ...], 

4417 type_: TypeEngine[Any], 

4418 ) -> None: 

4419 

4420 # note objects must be non-empty for cursor.py to handle the 

4421 # collection properly 

4422 assert objects 

4423 

4424 if keyname is None or keyname == "*": 

4425 self._ordered_columns = False 

4426 self._ad_hoc_textual = True 

4427 if type_._is_tuple_type: 

4428 raise exc.CompileError( 

4429 "Most backends don't support SELECTing " 

4430 "from a tuple() object. If this is an ORM query, " 

4431 "consider using the Bundle object." 

4432 ) 

4433 self._result_columns.append( 

4434 ResultColumnsEntry(keyname, name, objects, type_) 

4435 ) 

4436 

4437 def _label_returning_column( 

4438 self, stmt, column, populate_result_map, column_clause_args=None, **kw 

4439 ): 

4440 """Render a column with necessary labels inside of a RETURNING clause. 

4441 

4442 This method is provided for individual dialects in place of calling 

4443 the _label_select_column method directly, so that the two use cases 

4444 of RETURNING vs. SELECT can be disambiguated going forward. 

4445 

4446 .. versionadded:: 1.4.21 

4447 

4448 """ 

4449 return self._label_select_column( 

4450 None, 

4451 column, 

4452 populate_result_map, 

4453 False, 

4454 {} if column_clause_args is None else column_clause_args, 

4455 **kw, 

4456 ) 

4457 

4458 def _label_select_column( 

4459 self, 

4460 select, 

4461 column, 

4462 populate_result_map, 

4463 asfrom, 

4464 column_clause_args, 

4465 name=None, 

4466 proxy_name=None, 

4467 fallback_label_name=None, 

4468 within_columns_clause=True, 

4469 column_is_repeated=False, 

4470 need_column_expressions=False, 

4471 include_table=True, 

4472 ): 

4473 """produce labeled columns present in a select().""" 

4474 impl = column.type.dialect_impl(self.dialect) 

4475 

4476 if impl._has_column_expression and ( 

4477 need_column_expressions or populate_result_map 

4478 ): 

4479 col_expr = impl.column_expression(column) 

4480 else: 

4481 col_expr = column 

4482 

4483 if populate_result_map: 

4484 # pass an "add_to_result_map" callable into the compilation 

4485 # of embedded columns. this collects information about the 

4486 # column as it will be fetched in the result and is coordinated 

4487 # with cursor.description when the query is executed. 

4488 add_to_result_map = self._add_to_result_map 

4489 

4490 # if the SELECT statement told us this column is a repeat, 

4491 # wrap the callable with one that prevents the addition of the 

4492 # targets 

4493 if column_is_repeated: 

4494 _add_to_result_map = add_to_result_map 

4495 

4496 def add_to_result_map(keyname, name, objects, type_): 

4497 _add_to_result_map(keyname, name, (keyname,), type_) 

4498 

4499 # if we redefined col_expr for type expressions, wrap the 

4500 # callable with one that adds the original column to the targets 

4501 elif col_expr is not column: 

4502 _add_to_result_map = add_to_result_map 

4503 

4504 def add_to_result_map(keyname, name, objects, type_): 

4505 _add_to_result_map( 

4506 keyname, name, (column,) + objects, type_ 

4507 ) 

4508 

4509 else: 

4510 add_to_result_map = None 

4511 

4512 # this method is used by some of the dialects for RETURNING, 

4513 # which has different inputs. _label_returning_column was added 

4514 # as the better target for this now however for 1.4 we will keep 

4515 # _label_select_column directly compatible with this use case. 

4516 # these assertions right now set up the current expected inputs 

4517 assert within_columns_clause, ( 

4518 "_label_select_column is only relevant within " 

4519 "the columns clause of a SELECT or RETURNING" 

4520 ) 

4521 if isinstance(column, elements.Label): 

4522 if col_expr is not column: 

4523 result_expr = _CompileLabel( 

4524 col_expr, column.name, alt_names=(column.element,) 

4525 ) 

4526 else: 

4527 result_expr = col_expr 

4528 

4529 elif name: 

4530 # here, _columns_plus_names has determined there's an explicit 

4531 # label name we need to use. this is the default for 

4532 # tablenames_plus_columnnames as well as when columns are being 

4533 # deduplicated on name 

4534 

4535 assert ( 

4536 proxy_name is not None 

4537 ), "proxy_name is required if 'name' is passed" 

4538 

4539 result_expr = _CompileLabel( 

4540 col_expr, 

4541 name, 

4542 alt_names=( 

4543 proxy_name, 

4544 # this is a hack to allow legacy result column lookups 

4545 # to work as they did before; this goes away in 2.0. 

4546 # TODO: this only seems to be tested indirectly 

4547 # via test/orm/test_deprecations.py. should be a 

4548 # resultset test for this 

4549 column._tq_label, 

4550 ), 

4551 ) 

4552 else: 

4553 # determine here whether this column should be rendered in 

4554 # a labelled context or not, as we were given no required label 

4555 # name from the caller. Here we apply heuristics based on the kind 

4556 # of SQL expression involved. 

4557 

4558 if col_expr is not column: 

4559 # type-specific expression wrapping the given column, 

4560 # so we render a label 

4561 render_with_label = True 

4562 elif isinstance(column, elements.ColumnClause): 

4563 # table-bound column, we render its name as a label if we are 

4564 # inside of a subquery only 

4565 render_with_label = ( 

4566 asfrom 

4567 and not column.is_literal 

4568 and column.table is not None 

4569 ) 

4570 elif isinstance(column, elements.TextClause): 

4571 render_with_label = False 

4572 elif isinstance(column, elements.UnaryExpression): 

4573 # unary expression. notes added as of #12681 

4574 # 

4575 # By convention, the visit_unary() method 

4576 # itself does not add an entry to the result map, and relies 

4577 # upon either the inner expression creating a result map 

4578 # entry, or if not, by creating a label here that produces 

4579 # the result map entry. Where that happens is based on whether 

4580 # or not the element immediately inside the unary is a 

4581 # NamedColumn subclass or not. 

4582 # 

4583 # Now, this also impacts how the SELECT is written; if 

4584 # we decide to generate a label here, we get the usual 

4585 # "~(x+y) AS anon_1" thing in the columns clause. If we 

4586 # don't, we don't get an AS at all, we get like 

4587 # "~table.column". 

4588 # 

4589 # But here is the important thing as of modernish (like 1.4) 

4590 # versions of SQLAlchemy - **whether or not the AS <label> 

4591 # is present in the statement is not actually important**. 

4592 # We target result columns **positionally** for a fully 

4593 # compiled ``Select()`` object; before 1.4 we needed those 

4594 # labels to match in cursor.description etc etc but now it 

4595 # really doesn't matter. 

4596 # So really, we could set render_with_label True in all cases. 

4597 # Or we could just have visit_unary() populate the result map 

4598 # in all cases. 

4599 # 

4600 # What we're doing here is strictly trying to not rock the 

4601 # boat too much with when we do/don't render "AS label"; 

4602 # labels being present helps in the edge cases that we 

4603 # "fall back" to named cursor.description matching, labels 

4604 # not being present for columns keeps us from having awkward 

4605 # phrases like "SELECT DISTINCT table.x AS x". 

4606 render_with_label = ( 

4607 ( 

4608 # exception case to detect if we render "not boolean" 

4609 # as "not <col>" for native boolean or "<col> = 1" 

4610 # for non-native boolean. this is controlled by 

4611 # visit_is_<true|false>_unary_operator 

4612 column.operator 

4613 in (operators.is_false, operators.is_true) 

4614 and not self.dialect.supports_native_boolean 

4615 ) 

4616 or column._wraps_unnamed_column() 

4617 or asfrom 

4618 ) 

4619 elif ( 

4620 # general class of expressions that don't have a SQL-column 

4621 # addressible name. includes scalar selects, bind parameters, 

4622 # SQL functions, others 

4623 not isinstance(column, elements.NamedColumn) 

4624 # deeper check that indicates there's no natural "name" to 

4625 # this element, which accommodates for custom SQL constructs 

4626 # that might have a ".name" attribute (but aren't SQL 

4627 # functions) but are not implementing this more recently added 

4628 # base class. in theory the "NamedColumn" check should be 

4629 # enough, however here we seek to maintain legacy behaviors 

4630 # as well. 

4631 and column._non_anon_label is None 

4632 ): 

4633 render_with_label = True 

4634 else: 

4635 render_with_label = False 

4636 

4637 if render_with_label: 

4638 if not fallback_label_name: 

4639 # used by the RETURNING case right now. we generate it 

4640 # here as 3rd party dialects may be referring to 

4641 # _label_select_column method directly instead of the 

4642 # just-added _label_returning_column method 

4643 assert not column_is_repeated 

4644 fallback_label_name = column._anon_name_label 

4645 

4646 fallback_label_name = ( 

4647 elements._truncated_label(fallback_label_name) 

4648 if not isinstance( 

4649 fallback_label_name, elements._truncated_label 

4650 ) 

4651 else fallback_label_name 

4652 ) 

4653 

4654 result_expr = _CompileLabel( 

4655 col_expr, fallback_label_name, alt_names=(proxy_name,) 

4656 ) 

4657 else: 

4658 result_expr = col_expr 

4659 

4660 column_clause_args.update( 

4661 within_columns_clause=within_columns_clause, 

4662 add_to_result_map=add_to_result_map, 

4663 include_table=include_table, 

4664 ) 

4665 return result_expr._compiler_dispatch(self, **column_clause_args) 

4666 

4667 def format_from_hint_text(self, sqltext, table, hint, iscrud): 

4668 hinttext = self.get_from_hint_text(table, hint) 

4669 if hinttext: 

4670 sqltext += " " + hinttext 

4671 return sqltext 

4672 

4673 def get_select_hint_text(self, byfroms): 

4674 return None 

4675 

4676 def get_from_hint_text( 

4677 self, table: FromClause, text: Optional[str] 

4678 ) -> Optional[str]: 

4679 return None 

4680 

4681 def get_crud_hint_text(self, table, text): 

4682 return None 

4683 

4684 def get_statement_hint_text(self, hint_texts): 

4685 return " ".join(hint_texts) 

4686 

4687 _default_stack_entry: _CompilerStackEntry 

4688 

4689 if not typing.TYPE_CHECKING: 

4690 _default_stack_entry = util.immutabledict( 

4691 [("correlate_froms", frozenset()), ("asfrom_froms", frozenset())] 

4692 ) 

4693 

4694 def _display_froms_for_select( 

4695 self, select_stmt, asfrom, lateral=False, **kw 

4696 ): 

4697 # utility method to help external dialects 

4698 # get the correct from list for a select. 

4699 # specifically the oracle dialect needs this feature 

4700 # right now. 

4701 toplevel = not self.stack 

4702 entry = self._default_stack_entry if toplevel else self.stack[-1] 

4703 

4704 compile_state = select_stmt._compile_state_factory(select_stmt, self) 

4705 

4706 correlate_froms = entry["correlate_froms"] 

4707 asfrom_froms = entry["asfrom_froms"] 

4708 

4709 if asfrom and not lateral: 

4710 froms = compile_state._get_display_froms( 

4711 explicit_correlate_froms=correlate_froms.difference( 

4712 asfrom_froms 

4713 ), 

4714 implicit_correlate_froms=(), 

4715 ) 

4716 else: 

4717 froms = compile_state._get_display_froms( 

4718 explicit_correlate_froms=correlate_froms, 

4719 implicit_correlate_froms=asfrom_froms, 

4720 ) 

4721 return froms 

4722 

4723 translate_select_structure: Any = None 

4724 """if not ``None``, should be a callable which accepts ``(select_stmt, 

4725 **kw)`` and returns a select object. this is used for structural changes 

4726 mostly to accommodate for LIMIT/OFFSET schemes 

4727 

4728 """ 

4729 

4730 def visit_select( 

4731 self, 

4732 select_stmt, 

4733 asfrom=False, 

4734 insert_into=False, 

4735 fromhints=None, 

4736 compound_index=None, 

4737 select_wraps_for=None, 

4738 lateral=False, 

4739 from_linter=None, 

4740 **kwargs, 

4741 ): 

4742 assert select_wraps_for is None, ( 

4743 "SQLAlchemy 1.4 requires use of " 

4744 "the translate_select_structure hook for structural " 

4745 "translations of SELECT objects" 

4746 ) 

4747 

4748 # initial setup of SELECT. the compile_state_factory may now 

4749 # be creating a totally different SELECT from the one that was 

4750 # passed in. for ORM use this will convert from an ORM-state 

4751 # SELECT to a regular "Core" SELECT. other composed operations 

4752 # such as computation of joins will be performed. 

4753 

4754 kwargs["within_columns_clause"] = False 

4755 

4756 compile_state = select_stmt._compile_state_factory( 

4757 select_stmt, self, **kwargs 

4758 ) 

4759 kwargs["ambiguous_table_name_map"] = ( 

4760 compile_state._ambiguous_table_name_map 

4761 ) 

4762 

4763 select_stmt = compile_state.statement 

4764 

4765 toplevel = not self.stack 

4766 

4767 if toplevel and not self.compile_state: 

4768 self.compile_state = compile_state 

4769 

4770 is_embedded_select = compound_index is not None or insert_into 

4771 

4772 # translate step for Oracle, SQL Server which often need to 

4773 # restructure the SELECT to allow for LIMIT/OFFSET and possibly 

4774 # other conditions 

4775 if self.translate_select_structure: 

4776 new_select_stmt = self.translate_select_structure( 

4777 select_stmt, asfrom=asfrom, **kwargs 

4778 ) 

4779 

4780 # if SELECT was restructured, maintain a link to the originals 

4781 # and assemble a new compile state 

4782 if new_select_stmt is not select_stmt: 

4783 compile_state_wraps_for = compile_state 

4784 select_wraps_for = select_stmt 

4785 select_stmt = new_select_stmt 

4786 

4787 compile_state = select_stmt._compile_state_factory( 

4788 select_stmt, self, **kwargs 

4789 ) 

4790 select_stmt = compile_state.statement 

4791 

4792 entry = self._default_stack_entry if toplevel else self.stack[-1] 

4793 

4794 populate_result_map = need_column_expressions = ( 

4795 toplevel 

4796 or entry.get("need_result_map_for_compound", False) 

4797 or entry.get("need_result_map_for_nested", False) 

4798 ) 

4799 

4800 # indicates there is a CompoundSelect in play and we are not the 

4801 # first select 

4802 if compound_index: 

4803 populate_result_map = False 

4804 

4805 # this was first proposed as part of #3372; however, it is not 

4806 # reached in current tests and could possibly be an assertion 

4807 # instead. 

4808 if not populate_result_map and "add_to_result_map" in kwargs: 

4809 del kwargs["add_to_result_map"] 

4810 

4811 froms = self._setup_select_stack( 

4812 select_stmt, compile_state, entry, asfrom, lateral, compound_index 

4813 ) 

4814 

4815 column_clause_args = kwargs.copy() 

4816 column_clause_args.update( 

4817 {"within_label_clause": False, "within_columns_clause": False} 

4818 ) 

4819 

4820 text = "SELECT " # we're off to a good start ! 

4821 

4822 if select_stmt._post_select_clause is not None: 

4823 psc = self.process(select_stmt._post_select_clause, **kwargs) 

4824 if psc is not None: 

4825 text += psc + " " 

4826 

4827 if select_stmt._hints: 

4828 hint_text, byfrom = self._setup_select_hints(select_stmt) 

4829 if hint_text: 

4830 text += hint_text + " " 

4831 else: 

4832 byfrom = None 

4833 

4834 if select_stmt._independent_ctes: 

4835 self._dispatch_independent_ctes(select_stmt, kwargs) 

4836 

4837 if select_stmt._prefixes: 

4838 text += self._generate_prefixes( 

4839 select_stmt, select_stmt._prefixes, **kwargs 

4840 ) 

4841 

4842 text += self.get_select_precolumns(select_stmt, **kwargs) 

4843 

4844 if select_stmt._pre_columns_clause is not None: 

4845 pcc = self.process(select_stmt._pre_columns_clause, **kwargs) 

4846 if pcc is not None: 

4847 text += pcc + " " 

4848 

4849 # the actual list of columns to print in the SELECT column list. 

4850 inner_columns = [ 

4851 c 

4852 for c in [ 

4853 self._label_select_column( 

4854 select_stmt, 

4855 column, 

4856 populate_result_map, 

4857 asfrom, 

4858 column_clause_args, 

4859 name=name, 

4860 proxy_name=proxy_name, 

4861 fallback_label_name=fallback_label_name, 

4862 column_is_repeated=repeated, 

4863 need_column_expressions=need_column_expressions, 

4864 ) 

4865 for ( 

4866 name, 

4867 proxy_name, 

4868 fallback_label_name, 

4869 column, 

4870 repeated, 

4871 ) in compile_state.columns_plus_names 

4872 ] 

4873 if c is not None 

4874 ] 

4875 

4876 if populate_result_map and select_wraps_for is not None: 

4877 # if this select was generated from translate_select, 

4878 # rewrite the targeted columns in the result map 

4879 

4880 translate = dict( 

4881 zip( 

4882 [ 

4883 name 

4884 for ( 

4885 key, 

4886 proxy_name, 

4887 fallback_label_name, 

4888 name, 

4889 repeated, 

4890 ) in compile_state.columns_plus_names 

4891 ], 

4892 [ 

4893 name 

4894 for ( 

4895 key, 

4896 proxy_name, 

4897 fallback_label_name, 

4898 name, 

4899 repeated, 

4900 ) in compile_state_wraps_for.columns_plus_names 

4901 ], 

4902 ) 

4903 ) 

4904 

4905 self._result_columns = [ 

4906 ResultColumnsEntry( 

4907 key, name, tuple(translate.get(o, o) for o in obj), type_ 

4908 ) 

4909 for key, name, obj, type_ in self._result_columns 

4910 ] 

4911 

4912 text = self._compose_select_body( 

4913 text, 

4914 select_stmt, 

4915 compile_state, 

4916 inner_columns, 

4917 froms, 

4918 byfrom, 

4919 toplevel, 

4920 kwargs, 

4921 ) 

4922 

4923 if select_stmt._post_body_clause is not None: 

4924 pbc = self.process(select_stmt._post_body_clause, **kwargs) 

4925 if pbc: 

4926 text += " " + pbc 

4927 

4928 if select_stmt._statement_hints: 

4929 per_dialect = [ 

4930 ht 

4931 for (dialect_name, ht) in select_stmt._statement_hints 

4932 if dialect_name in ("*", self.dialect.name) 

4933 ] 

4934 if per_dialect: 

4935 text += " " + self.get_statement_hint_text(per_dialect) 

4936 

4937 # In compound query, CTEs are shared at the compound level 

4938 if self.ctes and (not is_embedded_select or toplevel): 

4939 nesting_level = len(self.stack) if not toplevel else None 

4940 text = self._render_cte_clause(nesting_level=nesting_level) + text 

4941 

4942 if select_stmt._suffixes: 

4943 text += " " + self._generate_prefixes( 

4944 select_stmt, select_stmt._suffixes, **kwargs 

4945 ) 

4946 

4947 self.stack.pop(-1) 

4948 

4949 return text 

4950 

4951 def _setup_select_hints( 

4952 self, select: Select[Unpack[TupleAny]] 

4953 ) -> Tuple[str, _FromHintsType]: 

4954 byfrom = { 

4955 from_: hinttext 

4956 % {"name": from_._compiler_dispatch(self, ashint=True)} 

4957 for (from_, dialect), hinttext in select._hints.items() 

4958 if dialect in ("*", self.dialect.name) 

4959 } 

4960 hint_text = self.get_select_hint_text(byfrom) 

4961 return hint_text, byfrom 

4962 

4963 def _setup_select_stack( 

4964 self, select, compile_state, entry, asfrom, lateral, compound_index 

4965 ): 

4966 correlate_froms = entry["correlate_froms"] 

4967 asfrom_froms = entry["asfrom_froms"] 

4968 

4969 if compound_index == 0: 

4970 entry["select_0"] = select 

4971 elif compound_index: 

4972 select_0 = entry["select_0"] 

4973 numcols = len(select_0._all_selected_columns) 

4974 

4975 if len(compile_state.columns_plus_names) != numcols: 

4976 raise exc.CompileError( 

4977 "All selectables passed to " 

4978 "CompoundSelect must have identical numbers of " 

4979 "columns; select #%d has %d columns, select " 

4980 "#%d has %d" 

4981 % ( 

4982 1, 

4983 numcols, 

4984 compound_index + 1, 

4985 len(select._all_selected_columns), 

4986 ) 

4987 ) 

4988 

4989 if asfrom and not lateral: 

4990 froms = compile_state._get_display_froms( 

4991 explicit_correlate_froms=correlate_froms.difference( 

4992 asfrom_froms 

4993 ), 

4994 implicit_correlate_froms=(), 

4995 ) 

4996 else: 

4997 froms = compile_state._get_display_froms( 

4998 explicit_correlate_froms=correlate_froms, 

4999 implicit_correlate_froms=asfrom_froms, 

5000 ) 

5001 

5002 new_correlate_froms = set(_from_objects(*froms)) 

5003 all_correlate_froms = new_correlate_froms.union(correlate_froms) 

5004 

5005 new_entry: _CompilerStackEntry = { 

5006 "asfrom_froms": new_correlate_froms, 

5007 "correlate_froms": all_correlate_froms, 

5008 "selectable": select, 

5009 "compile_state": compile_state, 

5010 } 

5011 self.stack.append(new_entry) 

5012 

5013 return froms 

5014 

5015 def _compose_select_body( 

5016 self, 

5017 text, 

5018 select, 

5019 compile_state, 

5020 inner_columns, 

5021 froms, 

5022 byfrom, 

5023 toplevel, 

5024 kwargs, 

5025 ): 

5026 text += ", ".join(inner_columns) 

5027 

5028 if self.linting & COLLECT_CARTESIAN_PRODUCTS: 

5029 from_linter = FromLinter({}, set()) 

5030 warn_linting = self.linting & WARN_LINTING 

5031 if toplevel: 

5032 self.from_linter = from_linter 

5033 else: 

5034 from_linter = None 

5035 warn_linting = False 

5036 

5037 # adjust the whitespace for no inner columns, part of #9440, 

5038 # so that a no-col SELECT comes out as "SELECT WHERE..." or 

5039 # "SELECT FROM ...". 

5040 # while it would be better to have built the SELECT starting string 

5041 # without trailing whitespace first, then add whitespace only if inner 

5042 # cols were present, this breaks compatibility with various custom 

5043 # compilation schemes that are currently being tested. 

5044 if not inner_columns: 

5045 text = text.rstrip() 

5046 

5047 if froms: 

5048 text += " \nFROM " 

5049 

5050 if select._hints: 

5051 text += ", ".join( 

5052 [ 

5053 f._compiler_dispatch( 

5054 self, 

5055 asfrom=True, 

5056 fromhints=byfrom, 

5057 from_linter=from_linter, 

5058 **kwargs, 

5059 ) 

5060 for f in froms 

5061 ] 

5062 ) 

5063 else: 

5064 text += ", ".join( 

5065 [ 

5066 f._compiler_dispatch( 

5067 self, 

5068 asfrom=True, 

5069 from_linter=from_linter, 

5070 **kwargs, 

5071 ) 

5072 for f in froms 

5073 ] 

5074 ) 

5075 else: 

5076 text += self.default_from() 

5077 

5078 if select._where_criteria: 

5079 t = self._generate_delimited_and_list( 

5080 select._where_criteria, from_linter=from_linter, **kwargs 

5081 ) 

5082 if t: 

5083 text += " \nWHERE " + t 

5084 

5085 if warn_linting: 

5086 assert from_linter is not None 

5087 from_linter.warn() 

5088 

5089 if select._group_by_clauses: 

5090 text += self.group_by_clause(select, **kwargs) 

5091 

5092 if select._having_criteria: 

5093 t = self._generate_delimited_and_list( 

5094 select._having_criteria, **kwargs 

5095 ) 

5096 if t: 

5097 text += " \nHAVING " + t 

5098 

5099 if select._post_criteria_clause is not None: 

5100 pcc = self.process(select._post_criteria_clause, **kwargs) 

5101 if pcc is not None: 

5102 text += " \n" + pcc 

5103 

5104 if select._order_by_clauses: 

5105 text += self.order_by_clause(select, **kwargs) 

5106 

5107 if select._has_row_limiting_clause: 

5108 text += self._row_limit_clause(select, **kwargs) 

5109 

5110 if select._for_update_arg is not None: 

5111 text += self.for_update_clause(select, **kwargs) 

5112 

5113 return text 

5114 

5115 def _generate_prefixes(self, stmt, prefixes, **kw): 

5116 clause = " ".join( 

5117 prefix._compiler_dispatch(self, **kw) 

5118 for prefix, dialect_name in prefixes 

5119 if dialect_name in (None, "*") or dialect_name == self.dialect.name 

5120 ) 

5121 if clause: 

5122 clause += " " 

5123 return clause 

5124 

5125 def _render_cte_clause( 

5126 self, 

5127 nesting_level=None, 

5128 include_following_stack=False, 

5129 ): 

5130 """ 

5131 include_following_stack 

5132 Also render the nesting CTEs on the next stack. Useful for 

5133 SQL structures like UNION or INSERT that can wrap SELECT 

5134 statements containing nesting CTEs. 

5135 """ 

5136 if not self.ctes: 

5137 return "" 

5138 

5139 ctes: MutableMapping[CTE, str] 

5140 

5141 if nesting_level and nesting_level > 1: 

5142 ctes = util.OrderedDict() 

5143 for cte in list(self.ctes.keys()): 

5144 cte_level, cte_name, cte_opts = self.level_name_by_cte[ 

5145 cte._get_reference_cte() 

5146 ] 

5147 nesting = cte.nesting or cte_opts.nesting 

5148 is_rendered_level = cte_level == nesting_level or ( 

5149 include_following_stack and cte_level == nesting_level + 1 

5150 ) 

5151 if not (nesting and is_rendered_level): 

5152 continue 

5153 

5154 ctes[cte] = self.ctes[cte] 

5155 

5156 else: 

5157 ctes = self.ctes 

5158 

5159 if not ctes: 

5160 return "" 

5161 ctes_recursive = any([cte.recursive for cte in ctes]) 

5162 

5163 cte_text = self.get_cte_preamble(ctes_recursive) + " " 

5164 cte_text += ", \n".join([txt for txt in ctes.values()]) 

5165 cte_text += "\n " 

5166 

5167 if nesting_level and nesting_level > 1: 

5168 for cte in list(ctes.keys()): 

5169 cte_level, cte_name, cte_opts = self.level_name_by_cte[ 

5170 cte._get_reference_cte() 

5171 ] 

5172 del self.ctes[cte] 

5173 del self.ctes_by_level_name[(cte_level, cte_name)] 

5174 del self.level_name_by_cte[cte._get_reference_cte()] 

5175 

5176 return cte_text 

5177 

5178 def get_cte_preamble(self, recursive): 

5179 if recursive: 

5180 return "WITH RECURSIVE" 

5181 else: 

5182 return "WITH" 

5183 

5184 def get_select_precolumns(self, select: Select[Any], **kw: Any) -> str: 

5185 """Called when building a ``SELECT`` statement, position is just 

5186 before column list. 

5187 

5188 """ 

5189 if select._distinct_on: 

5190 util.warn_deprecated( 

5191 "DISTINCT ON is currently supported only by the PostgreSQL " 

5192 "dialect. Use of DISTINCT ON for other backends is currently " 

5193 "silently ignored, however this usage is deprecated, and will " 

5194 "raise CompileError in a future release for all backends " 

5195 "that do not support this syntax.", 

5196 version="1.4", 

5197 ) 

5198 return "DISTINCT " if select._distinct else "" 

5199 

5200 def group_by_clause(self, select, **kw): 

5201 """allow dialects to customize how GROUP BY is rendered.""" 

5202 

5203 group_by = self._generate_delimited_list( 

5204 select._group_by_clauses, OPERATORS[operators.comma_op], **kw 

5205 ) 

5206 if group_by: 

5207 return " GROUP BY " + group_by 

5208 else: 

5209 return "" 

5210 

5211 def order_by_clause(self, select, **kw): 

5212 """allow dialects to customize how ORDER BY is rendered.""" 

5213 

5214 order_by = self._generate_delimited_list( 

5215 select._order_by_clauses, OPERATORS[operators.comma_op], **kw 

5216 ) 

5217 

5218 if order_by: 

5219 return " ORDER BY " + order_by 

5220 else: 

5221 return "" 

5222 

5223 def for_update_clause(self, select, **kw): 

5224 return " FOR UPDATE" 

5225 

5226 def returning_clause( 

5227 self, 

5228 stmt: UpdateBase, 

5229 returning_cols: Sequence[_ColumnsClauseElement], 

5230 *, 

5231 populate_result_map: bool, 

5232 **kw: Any, 

5233 ) -> str: 

5234 columns = [ 

5235 self._label_returning_column( 

5236 stmt, 

5237 column, 

5238 populate_result_map, 

5239 fallback_label_name=fallback_label_name, 

5240 column_is_repeated=repeated, 

5241 name=name, 

5242 proxy_name=proxy_name, 

5243 **kw, 

5244 ) 

5245 for ( 

5246 name, 

5247 proxy_name, 

5248 fallback_label_name, 

5249 column, 

5250 repeated, 

5251 ) in stmt._generate_columns_plus_names( 

5252 True, cols=base._select_iterables(returning_cols) 

5253 ) 

5254 ] 

5255 

5256 return "RETURNING " + ", ".join(columns) 

5257 

5258 def limit_clause(self, select, **kw): 

5259 text = "" 

5260 if select._limit_clause is not None: 

5261 text += "\n LIMIT " + self.process(select._limit_clause, **kw) 

5262 if select._offset_clause is not None: 

5263 if select._limit_clause is None: 

5264 text += "\n LIMIT -1" 

5265 text += " OFFSET " + self.process(select._offset_clause, **kw) 

5266 return text 

5267 

5268 def fetch_clause( 

5269 self, 

5270 select, 

5271 fetch_clause=None, 

5272 require_offset=False, 

5273 use_literal_execute_for_simple_int=False, 

5274 **kw, 

5275 ): 

5276 if fetch_clause is None: 

5277 fetch_clause = select._fetch_clause 

5278 fetch_clause_options = select._fetch_clause_options 

5279 else: 

5280 fetch_clause_options = {"percent": False, "with_ties": False} 

5281 

5282 text = "" 

5283 

5284 if select._offset_clause is not None: 

5285 offset_clause = select._offset_clause 

5286 if ( 

5287 use_literal_execute_for_simple_int 

5288 and select._simple_int_clause(offset_clause) 

5289 ): 

5290 offset_clause = offset_clause.render_literal_execute() 

5291 offset_str = self.process(offset_clause, **kw) 

5292 text += "\n OFFSET %s ROWS" % offset_str 

5293 elif require_offset: 

5294 text += "\n OFFSET 0 ROWS" 

5295 

5296 if fetch_clause is not None: 

5297 if ( 

5298 use_literal_execute_for_simple_int 

5299 and select._simple_int_clause(fetch_clause) 

5300 ): 

5301 fetch_clause = fetch_clause.render_literal_execute() 

5302 text += "\n FETCH FIRST %s%s ROWS %s" % ( 

5303 self.process(fetch_clause, **kw), 

5304 " PERCENT" if fetch_clause_options["percent"] else "", 

5305 "WITH TIES" if fetch_clause_options["with_ties"] else "ONLY", 

5306 ) 

5307 return text 

5308 

5309 def visit_table( 

5310 self, 

5311 table, 

5312 asfrom=False, 

5313 iscrud=False, 

5314 ashint=False, 

5315 fromhints=None, 

5316 use_schema=True, 

5317 from_linter=None, 

5318 ambiguous_table_name_map=None, 

5319 enclosing_alias=None, 

5320 **kwargs, 

5321 ): 

5322 if from_linter: 

5323 from_linter.froms[table] = table.fullname 

5324 

5325 if asfrom or ashint: 

5326 effective_schema = self.preparer.schema_for_object(table) 

5327 

5328 if use_schema and effective_schema: 

5329 ret = ( 

5330 self.preparer.quote_schema(effective_schema) 

5331 + "." 

5332 + self.preparer.quote(table.name) 

5333 ) 

5334 else: 

5335 ret = self.preparer.quote(table.name) 

5336 

5337 if ( 

5338 ( 

5339 enclosing_alias is None 

5340 or enclosing_alias.element is not table 

5341 ) 

5342 and not effective_schema 

5343 and ambiguous_table_name_map 

5344 and table.name in ambiguous_table_name_map 

5345 ): 

5346 anon_name = self._truncated_identifier( 

5347 "alias", ambiguous_table_name_map[table.name] 

5348 ) 

5349 

5350 ret = ret + self.get_render_as_alias_suffix( 

5351 self.preparer.format_alias(None, anon_name) 

5352 ) 

5353 

5354 if fromhints and table in fromhints: 

5355 ret = self.format_from_hint_text( 

5356 ret, table, fromhints[table], iscrud 

5357 ) 

5358 return ret 

5359 else: 

5360 return "" 

5361 

5362 def visit_join(self, join, asfrom=False, from_linter=None, **kwargs): 

5363 if from_linter: 

5364 from_linter.edges.update( 

5365 itertools.product( 

5366 _de_clone(join.left._from_objects), 

5367 _de_clone(join.right._from_objects), 

5368 ) 

5369 ) 

5370 

5371 if join.full: 

5372 join_type = " FULL OUTER JOIN " 

5373 elif join.isouter: 

5374 join_type = " LEFT OUTER JOIN " 

5375 else: 

5376 join_type = " JOIN " 

5377 return ( 

5378 join.left._compiler_dispatch( 

5379 self, asfrom=True, from_linter=from_linter, **kwargs 

5380 ) 

5381 + join_type 

5382 + join.right._compiler_dispatch( 

5383 self, asfrom=True, from_linter=from_linter, **kwargs 

5384 ) 

5385 + " ON " 

5386 # TODO: likely need asfrom=True here? 

5387 + join.onclause._compiler_dispatch( 

5388 self, from_linter=from_linter, **kwargs 

5389 ) 

5390 ) 

5391 

5392 def _setup_crud_hints(self, stmt, table_text): 

5393 dialect_hints = { 

5394 table: hint_text 

5395 for (table, dialect), hint_text in stmt._hints.items() 

5396 if dialect in ("*", self.dialect.name) 

5397 } 

5398 if stmt.table in dialect_hints: 

5399 table_text = self.format_from_hint_text( 

5400 table_text, stmt.table, dialect_hints[stmt.table], True 

5401 ) 

5402 return dialect_hints, table_text 

5403 

5404 # within the realm of "insertmanyvalues sentinel columns", 

5405 # these lookups match different kinds of Column() configurations 

5406 # to specific backend capabilities. they are broken into two 

5407 # lookups, one for autoincrement columns and the other for non 

5408 # autoincrement columns 

5409 _sentinel_col_non_autoinc_lookup = util.immutabledict( 

5410 { 

5411 _SentinelDefaultCharacterization.CLIENTSIDE: ( 

5412 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT 

5413 ), 

5414 _SentinelDefaultCharacterization.SENTINEL_DEFAULT: ( 

5415 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT 

5416 ), 

5417 _SentinelDefaultCharacterization.NONE: ( 

5418 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT 

5419 ), 

5420 _SentinelDefaultCharacterization.IDENTITY: ( 

5421 InsertmanyvaluesSentinelOpts.IDENTITY 

5422 ), 

5423 _SentinelDefaultCharacterization.SEQUENCE: ( 

5424 InsertmanyvaluesSentinelOpts.SEQUENCE 

5425 ), 

5426 } 

5427 ) 

5428 _sentinel_col_autoinc_lookup = _sentinel_col_non_autoinc_lookup.union( 

5429 { 

5430 _SentinelDefaultCharacterization.NONE: ( 

5431 InsertmanyvaluesSentinelOpts.AUTOINCREMENT 

5432 ), 

5433 } 

5434 ) 

5435 

5436 def _get_sentinel_column_for_table( 

5437 self, table: Table 

5438 ) -> Optional[Sequence[Column[Any]]]: 

5439 """given a :class:`.Table`, return a usable sentinel column or 

5440 columns for this dialect if any. 

5441 

5442 Return None if no sentinel columns could be identified, or raise an 

5443 error if a column was marked as a sentinel explicitly but isn't 

5444 compatible with this dialect. 

5445 

5446 """ 

5447 

5448 sentinel_opts = self.dialect.insertmanyvalues_implicit_sentinel 

5449 sentinel_characteristics = table._sentinel_column_characteristics 

5450 

5451 sent_cols = sentinel_characteristics.columns 

5452 

5453 if sent_cols is None: 

5454 return None 

5455 

5456 if sentinel_characteristics.is_autoinc: 

5457 bitmask = self._sentinel_col_autoinc_lookup.get( 

5458 sentinel_characteristics.default_characterization, 0 

5459 ) 

5460 else: 

5461 bitmask = self._sentinel_col_non_autoinc_lookup.get( 

5462 sentinel_characteristics.default_characterization, 0 

5463 ) 

5464 

5465 if sentinel_opts & bitmask: 

5466 return sent_cols 

5467 

5468 if sentinel_characteristics.is_explicit: 

5469 # a column was explicitly marked as insert_sentinel=True, 

5470 # however it is not compatible with this dialect. they should 

5471 # not indicate this column as a sentinel if they need to include 

5472 # this dialect. 

5473 

5474 # TODO: do we want non-primary key explicit sentinel cols 

5475 # that can gracefully degrade for some backends? 

5476 # insert_sentinel="degrade" perhaps. not for the initial release. 

5477 # I am hoping people are generally not dealing with this sentinel 

5478 # business at all. 

5479 

5480 # if is_explicit is True, there will be only one sentinel column. 

5481 

5482 raise exc.InvalidRequestError( 

5483 f"Column {sent_cols[0]} can't be explicitly " 

5484 "marked as a sentinel column when using the " 

5485 f"{self.dialect.name} dialect, as the " 

5486 "particular type of default generation on this column is " 

5487 "not currently compatible with this dialect's specific " 

5488 f"INSERT..RETURNING syntax which can receive the " 

5489 "server-generated value in " 

5490 "a deterministic way. To remove this error, remove " 

5491 "insert_sentinel=True from primary key autoincrement " 

5492 "columns; these columns are automatically used as " 

5493 "sentinels for supported dialects in any case." 

5494 ) 

5495 

5496 return None 

5497 

5498 def _deliver_insertmanyvalues_batches( 

5499 self, 

5500 statement: str, 

5501 parameters: _DBAPIMultiExecuteParams, 

5502 compiled_parameters: List[_MutableCoreSingleExecuteParams], 

5503 generic_setinputsizes: Optional[_GenericSetInputSizesType], 

5504 batch_size: int, 

5505 sort_by_parameter_order: bool, 

5506 schema_translate_map: Optional[SchemaTranslateMapType], 

5507 ) -> Iterator[_InsertManyValuesBatch]: 

5508 imv = self._insertmanyvalues 

5509 assert imv is not None 

5510 

5511 if not imv.sentinel_param_keys: 

5512 _sentinel_from_params = None 

5513 else: 

5514 _sentinel_from_params = operator.itemgetter( 

5515 *imv.sentinel_param_keys 

5516 ) 

5517 

5518 lenparams = len(parameters) 

5519 if imv.is_default_expr and not self.dialect.supports_default_metavalue: 

5520 # backend doesn't support 

5521 # INSERT INTO table (pk_col) VALUES (DEFAULT), (DEFAULT), ... 

5522 # at the moment this is basically SQL Server due to 

5523 # not being able to use DEFAULT for identity column 

5524 # just yield out that many single statements! still 

5525 # faster than a whole connection.execute() call ;) 

5526 # 

5527 # note we still are taking advantage of the fact that we know 

5528 # we are using RETURNING. The generalized approach of fetching 

5529 # cursor.lastrowid etc. still goes through the more heavyweight 

5530 # "ExecutionContext per statement" system as it isn't usable 

5531 # as a generic "RETURNING" approach 

5532 use_row_at_a_time = True 

5533 downgraded = False 

5534 elif not self.dialect.supports_multivalues_insert or ( 

5535 sort_by_parameter_order 

5536 and self._result_columns 

5537 and (imv.sentinel_columns is None or imv.includes_upsert_behaviors) 

5538 ): 

5539 # deterministic order was requested and the compiler could 

5540 # not organize sentinel columns for this dialect/statement. 

5541 # use row at a time 

5542 use_row_at_a_time = True 

5543 downgraded = True 

5544 else: 

5545 use_row_at_a_time = False 

5546 downgraded = False 

5547 

5548 if use_row_at_a_time: 

5549 for batchnum, (param, compiled_param) in enumerate( 

5550 cast( 

5551 "Sequence[Tuple[_DBAPISingleExecuteParams, _MutableCoreSingleExecuteParams]]", # noqa: E501 

5552 zip(parameters, compiled_parameters), 

5553 ), 

5554 1, 

5555 ): 

5556 yield _InsertManyValuesBatch( 

5557 statement, 

5558 param, 

5559 generic_setinputsizes, 

5560 [param], 

5561 ( 

5562 [_sentinel_from_params(compiled_param)] 

5563 if _sentinel_from_params 

5564 else [] 

5565 ), 

5566 1, 

5567 batchnum, 

5568 lenparams, 

5569 sort_by_parameter_order, 

5570 downgraded, 

5571 ) 

5572 return 

5573 

5574 if schema_translate_map: 

5575 rst = functools.partial( 

5576 self.preparer._render_schema_translates, 

5577 schema_translate_map=schema_translate_map, 

5578 ) 

5579 else: 

5580 rst = None 

5581 

5582 imv_single_values_expr = imv.single_values_expr 

5583 if rst: 

5584 imv_single_values_expr = rst(imv_single_values_expr) 

5585 

5586 executemany_values = f"({imv_single_values_expr})" 

5587 statement = statement.replace(executemany_values, "__EXECMANY_TOKEN__") 

5588 

5589 # Use optional insertmanyvalues_max_parameters 

5590 # to further shrink the batch size so that there are no more than 

5591 # insertmanyvalues_max_parameters params. 

5592 # Currently used by SQL Server, which limits statements to 2100 bound 

5593 # parameters (actually 2099). 

5594 max_params = self.dialect.insertmanyvalues_max_parameters 

5595 if max_params: 

5596 total_num_of_params = len(self.bind_names) 

5597 num_params_per_batch = len(imv.insert_crud_params) 

5598 num_params_outside_of_batch = ( 

5599 total_num_of_params - num_params_per_batch 

5600 ) 

5601 batch_size = min( 

5602 batch_size, 

5603 ( 

5604 (max_params - num_params_outside_of_batch) 

5605 // num_params_per_batch 

5606 ), 

5607 ) 

5608 

5609 batches = cast("List[Sequence[Any]]", list(parameters)) 

5610 compiled_batches = cast( 

5611 "List[Sequence[Any]]", list(compiled_parameters) 

5612 ) 

5613 

5614 processed_setinputsizes: Optional[_GenericSetInputSizesType] = None 

5615 batchnum = 1 

5616 total_batches = lenparams // batch_size + ( 

5617 1 if lenparams % batch_size else 0 

5618 ) 

5619 

5620 insert_crud_params = imv.insert_crud_params 

5621 assert insert_crud_params is not None 

5622 

5623 if rst: 

5624 insert_crud_params = [ 

5625 (col, key, rst(expr), st) 

5626 for col, key, expr, st in insert_crud_params 

5627 ] 

5628 

5629 escaped_bind_names: Mapping[str, str] 

5630 expand_pos_lower_index = expand_pos_upper_index = 0 

5631 

5632 if not self.positional: 

5633 if self.escaped_bind_names: 

5634 escaped_bind_names = self.escaped_bind_names 

5635 else: 

5636 escaped_bind_names = {} 

5637 

5638 all_keys = set(parameters[0]) 

5639 

5640 def apply_placeholders(keys, formatted): 

5641 for key in keys: 

5642 key = escaped_bind_names.get(key, key) 

5643 formatted = formatted.replace( 

5644 self.bindtemplate % {"name": key}, 

5645 self.bindtemplate 

5646 % {"name": f"{key}__EXECMANY_INDEX__"}, 

5647 ) 

5648 return formatted 

5649 

5650 if imv.embed_values_counter: 

5651 imv_values_counter = ", _IMV_VALUES_COUNTER" 

5652 else: 

5653 imv_values_counter = "" 

5654 formatted_values_clause = f"""({', '.join( 

5655 apply_placeholders(bind_keys, formatted) 

5656 for _, _, formatted, bind_keys in insert_crud_params 

5657 )}{imv_values_counter})""" 

5658 

5659 keys_to_replace = all_keys.intersection( 

5660 escaped_bind_names.get(key, key) 

5661 for _, _, _, bind_keys in insert_crud_params 

5662 for key in bind_keys 

5663 ) 

5664 base_parameters = { 

5665 key: parameters[0][key] 

5666 for key in all_keys.difference(keys_to_replace) 

5667 } 

5668 executemany_values_w_comma = "" 

5669 else: 

5670 formatted_values_clause = "" 

5671 keys_to_replace = set() 

5672 base_parameters = {} 

5673 

5674 if imv.embed_values_counter: 

5675 executemany_values_w_comma = ( 

5676 f"({imv_single_values_expr}, _IMV_VALUES_COUNTER), " 

5677 ) 

5678 else: 

5679 executemany_values_w_comma = f"({imv_single_values_expr}), " 

5680 

5681 all_names_we_will_expand: Set[str] = set() 

5682 for elem in imv.insert_crud_params: 

5683 all_names_we_will_expand.update(elem[3]) 

5684 

5685 # get the start and end position in a particular list 

5686 # of parameters where we will be doing the "expanding". 

5687 # statements can have params on either side or both sides, 

5688 # given RETURNING and CTEs 

5689 if all_names_we_will_expand: 

5690 positiontup = self.positiontup 

5691 assert positiontup is not None 

5692 

5693 all_expand_positions = { 

5694 idx 

5695 for idx, name in enumerate(positiontup) 

5696 if name in all_names_we_will_expand 

5697 } 

5698 expand_pos_lower_index = min(all_expand_positions) 

5699 expand_pos_upper_index = max(all_expand_positions) + 1 

5700 assert ( 

5701 len(all_expand_positions) 

5702 == expand_pos_upper_index - expand_pos_lower_index 

5703 ) 

5704 

5705 if self._numeric_binds: 

5706 escaped = re.escape(self._numeric_binds_identifier_char) 

5707 executemany_values_w_comma = re.sub( 

5708 rf"{escaped}\d+", "%s", executemany_values_w_comma 

5709 ) 

5710 

5711 while batches: 

5712 batch = batches[0:batch_size] 

5713 compiled_batch = compiled_batches[0:batch_size] 

5714 

5715 batches[0:batch_size] = [] 

5716 compiled_batches[0:batch_size] = [] 

5717 

5718 if batches: 

5719 current_batch_size = batch_size 

5720 else: 

5721 current_batch_size = len(batch) 

5722 

5723 if generic_setinputsizes: 

5724 # if setinputsizes is present, expand this collection to 

5725 # suit the batch length as well 

5726 # currently this will be mssql+pyodbc for internal dialects 

5727 processed_setinputsizes = [ 

5728 (new_key, len_, typ) 

5729 for new_key, len_, typ in ( 

5730 (f"{key}_{index}", len_, typ) 

5731 for index in range(current_batch_size) 

5732 for key, len_, typ in generic_setinputsizes 

5733 ) 

5734 ] 

5735 

5736 replaced_parameters: Any 

5737 if self.positional: 

5738 num_ins_params = imv.num_positional_params_counted 

5739 

5740 batch_iterator: Iterable[Sequence[Any]] 

5741 extra_params_left: Sequence[Any] 

5742 extra_params_right: Sequence[Any] 

5743 

5744 if num_ins_params == len(batch[0]): 

5745 extra_params_left = extra_params_right = () 

5746 batch_iterator = batch 

5747 else: 

5748 extra_params_left = batch[0][:expand_pos_lower_index] 

5749 extra_params_right = batch[0][expand_pos_upper_index:] 

5750 batch_iterator = ( 

5751 b[expand_pos_lower_index:expand_pos_upper_index] 

5752 for b in batch 

5753 ) 

5754 

5755 if imv.embed_values_counter: 

5756 expanded_values_string = ( 

5757 "".join( 

5758 executemany_values_w_comma.replace( 

5759 "_IMV_VALUES_COUNTER", str(i) 

5760 ) 

5761 for i, _ in enumerate(batch) 

5762 ) 

5763 )[:-2] 

5764 else: 

5765 expanded_values_string = ( 

5766 (executemany_values_w_comma * current_batch_size) 

5767 )[:-2] 

5768 

5769 if self._numeric_binds and num_ins_params > 0: 

5770 # numeric will always number the parameters inside of 

5771 # VALUES (and thus order self.positiontup) to be higher 

5772 # than non-VALUES parameters, no matter where in the 

5773 # statement those non-VALUES parameters appear (this is 

5774 # ensured in _process_numeric by numbering first all 

5775 # params that are not in _values_bindparam) 

5776 # therefore all extra params are always 

5777 # on the left side and numbered lower than the VALUES 

5778 # parameters 

5779 assert not extra_params_right 

5780 

5781 start = expand_pos_lower_index + 1 

5782 end = num_ins_params * (current_batch_size) + start 

5783 

5784 # need to format here, since statement may contain 

5785 # unescaped %, while values_string contains just (%s, %s) 

5786 positions = tuple( 

5787 f"{self._numeric_binds_identifier_char}{i}" 

5788 for i in range(start, end) 

5789 ) 

5790 expanded_values_string = expanded_values_string % positions 

5791 

5792 replaced_statement = statement.replace( 

5793 "__EXECMANY_TOKEN__", expanded_values_string 

5794 ) 

5795 

5796 replaced_parameters = tuple( 

5797 itertools.chain.from_iterable(batch_iterator) 

5798 ) 

5799 

5800 replaced_parameters = ( 

5801 extra_params_left 

5802 + replaced_parameters 

5803 + extra_params_right 

5804 ) 

5805 

5806 else: 

5807 replaced_values_clauses = [] 

5808 replaced_parameters = base_parameters.copy() 

5809 

5810 for i, param in enumerate(batch): 

5811 fmv = formatted_values_clause.replace( 

5812 "EXECMANY_INDEX__", str(i) 

5813 ) 

5814 if imv.embed_values_counter: 

5815 fmv = fmv.replace("_IMV_VALUES_COUNTER", str(i)) 

5816 

5817 replaced_values_clauses.append(fmv) 

5818 replaced_parameters.update( 

5819 {f"{key}__{i}": param[key] for key in keys_to_replace} 

5820 ) 

5821 

5822 replaced_statement = statement.replace( 

5823 "__EXECMANY_TOKEN__", 

5824 ", ".join(replaced_values_clauses), 

5825 ) 

5826 

5827 yield _InsertManyValuesBatch( 

5828 replaced_statement, 

5829 replaced_parameters, 

5830 processed_setinputsizes, 

5831 batch, 

5832 ( 

5833 [_sentinel_from_params(cb) for cb in compiled_batch] 

5834 if _sentinel_from_params 

5835 else [] 

5836 ), 

5837 current_batch_size, 

5838 batchnum, 

5839 total_batches, 

5840 sort_by_parameter_order, 

5841 False, 

5842 ) 

5843 batchnum += 1 

5844 

5845 def visit_insert( 

5846 self, insert_stmt, visited_bindparam=None, visiting_cte=None, **kw 

5847 ): 

5848 compile_state = insert_stmt._compile_state_factory( 

5849 insert_stmt, self, **kw 

5850 ) 

5851 insert_stmt = compile_state.statement 

5852 

5853 if visiting_cte is not None: 

5854 kw["visiting_cte"] = visiting_cte 

5855 toplevel = False 

5856 else: 

5857 toplevel = not self.stack 

5858 

5859 if toplevel: 

5860 self.isinsert = True 

5861 if not self.dml_compile_state: 

5862 self.dml_compile_state = compile_state 

5863 if not self.compile_state: 

5864 self.compile_state = compile_state 

5865 

5866 self.stack.append( 

5867 { 

5868 "correlate_froms": set(), 

5869 "asfrom_froms": set(), 

5870 "selectable": insert_stmt, 

5871 } 

5872 ) 

5873 

5874 counted_bindparam = 0 

5875 

5876 # reset any incoming "visited_bindparam" collection 

5877 visited_bindparam = None 

5878 

5879 # for positional, insertmanyvalues needs to know how many 

5880 # bound parameters are in the VALUES sequence; there's no simple 

5881 # rule because default expressions etc. can have zero or more 

5882 # params inside them. After multiple attempts to figure this out, 

5883 # this very simplistic "count after" works and is 

5884 # likely the least amount of callcounts, though looks clumsy 

5885 if self.positional and visiting_cte is None: 

5886 # if we are inside a CTE, don't count parameters 

5887 # here since they wont be for insertmanyvalues. keep 

5888 # visited_bindparam at None so no counting happens. 

5889 # see #9173 

5890 visited_bindparam = [] 

5891 

5892 crud_params_struct = crud._get_crud_params( 

5893 self, 

5894 insert_stmt, 

5895 compile_state, 

5896 toplevel, 

5897 visited_bindparam=visited_bindparam, 

5898 **kw, 

5899 ) 

5900 

5901 if self.positional and visited_bindparam is not None: 

5902 counted_bindparam = len(visited_bindparam) 

5903 if self._numeric_binds: 

5904 if self._values_bindparam is not None: 

5905 self._values_bindparam += visited_bindparam 

5906 else: 

5907 self._values_bindparam = visited_bindparam 

5908 

5909 crud_params_single = crud_params_struct.single_params 

5910 

5911 if ( 

5912 not crud_params_single 

5913 and not self.dialect.supports_default_values 

5914 and not self.dialect.supports_default_metavalue 

5915 and not self.dialect.supports_empty_insert 

5916 ): 

5917 raise exc.CompileError( 

5918 "The '%s' dialect with current database " 

5919 "version settings does not support empty " 

5920 "inserts." % self.dialect.name 

5921 ) 

5922 

5923 if compile_state._has_multi_parameters: 

5924 if not self.dialect.supports_multivalues_insert: 

5925 raise exc.CompileError( 

5926 "The '%s' dialect with current database " 

5927 "version settings does not support " 

5928 "in-place multirow inserts." % self.dialect.name 

5929 ) 

5930 elif ( 

5931 self.implicit_returning or insert_stmt._returning 

5932 ) and insert_stmt._sort_by_parameter_order: 

5933 raise exc.CompileError( 

5934 "RETURNING cannot be determinstically sorted when " 

5935 "using an INSERT which includes multi-row values()." 

5936 ) 

5937 crud_params_single = crud_params_struct.single_params 

5938 else: 

5939 crud_params_single = crud_params_struct.single_params 

5940 

5941 preparer = self.preparer 

5942 supports_default_values = self.dialect.supports_default_values 

5943 

5944 text = "INSERT " 

5945 

5946 if insert_stmt._prefixes: 

5947 text += self._generate_prefixes( 

5948 insert_stmt, insert_stmt._prefixes, **kw 

5949 ) 

5950 

5951 text += "INTO " 

5952 table_text = preparer.format_table(insert_stmt.table) 

5953 

5954 if insert_stmt._hints: 

5955 _, table_text = self._setup_crud_hints(insert_stmt, table_text) 

5956 

5957 if insert_stmt._independent_ctes: 

5958 self._dispatch_independent_ctes(insert_stmt, kw) 

5959 

5960 text += table_text 

5961 

5962 if crud_params_single or not supports_default_values: 

5963 text += " (%s)" % ", ".join( 

5964 [expr for _, expr, _, _ in crud_params_single] 

5965 ) 

5966 

5967 # look for insertmanyvalues attributes that would have been configured 

5968 # by crud.py as it scanned through the columns to be part of the 

5969 # INSERT 

5970 use_insertmanyvalues = crud_params_struct.use_insertmanyvalues 

5971 named_sentinel_params: Optional[Sequence[str]] = None 

5972 add_sentinel_cols = None 

5973 implicit_sentinel = False 

5974 

5975 returning_cols = self.implicit_returning or insert_stmt._returning 

5976 if returning_cols: 

5977 add_sentinel_cols = crud_params_struct.use_sentinel_columns 

5978 if add_sentinel_cols is not None: 

5979 assert use_insertmanyvalues 

5980 

5981 # search for the sentinel column explicitly present 

5982 # in the INSERT columns list, and additionally check that 

5983 # this column has a bound parameter name set up that's in the 

5984 # parameter list. If both of these cases are present, it means 

5985 # we will have a client side value for the sentinel in each 

5986 # parameter set. 

5987 

5988 _params_by_col = { 

5989 col: param_names 

5990 for col, _, _, param_names in crud_params_single 

5991 } 

5992 named_sentinel_params = [] 

5993 for _add_sentinel_col in add_sentinel_cols: 

5994 if _add_sentinel_col not in _params_by_col: 

5995 named_sentinel_params = None 

5996 break 

5997 param_name = self._within_exec_param_key_getter( 

5998 _add_sentinel_col 

5999 ) 

6000 if param_name not in _params_by_col[_add_sentinel_col]: 

6001 named_sentinel_params = None 

6002 break 

6003 named_sentinel_params.append(param_name) 

6004 

6005 if named_sentinel_params is None: 

6006 # if we are not going to have a client side value for 

6007 # the sentinel in the parameter set, that means it's 

6008 # an autoincrement, an IDENTITY, or a server-side SQL 

6009 # expression like nextval('seqname'). So this is 

6010 # an "implicit" sentinel; we will look for it in 

6011 # RETURNING 

6012 # only, and then sort on it. For this case on PG, 

6013 # SQL Server we have to use a special INSERT form 

6014 # that guarantees the server side function lines up with 

6015 # the entries in the VALUES. 

6016 if ( 

6017 self.dialect.insertmanyvalues_implicit_sentinel 

6018 & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT 

6019 ): 

6020 implicit_sentinel = True 

6021 else: 

6022 # here, we are not using a sentinel at all 

6023 # and we are likely the SQLite dialect. 

6024 # The first add_sentinel_col that we have should not 

6025 # be marked as "insert_sentinel=True". if it was, 

6026 # an error should have been raised in 

6027 # _get_sentinel_column_for_table. 

6028 assert not add_sentinel_cols[0]._insert_sentinel, ( 

6029 "sentinel selection rules should have prevented " 

6030 "us from getting here for this dialect" 

6031 ) 

6032 

6033 # always put the sentinel columns last. even if they are 

6034 # in the returning list already, they will be there twice 

6035 # then. 

6036 returning_cols = list(returning_cols) + list(add_sentinel_cols) 

6037 

6038 returning_clause = self.returning_clause( 

6039 insert_stmt, 

6040 returning_cols, 

6041 populate_result_map=toplevel, 

6042 ) 

6043 

6044 if self.returning_precedes_values: 

6045 text += " " + returning_clause 

6046 

6047 else: 

6048 returning_clause = None 

6049 

6050 if insert_stmt.select is not None: 

6051 # placed here by crud.py 

6052 select_text = self.process( 

6053 self.stack[-1]["insert_from_select"], insert_into=True, **kw 

6054 ) 

6055 

6056 if self.ctes and self.dialect.cte_follows_insert: 

6057 nesting_level = len(self.stack) if not toplevel else None 

6058 text += " %s%s" % ( 

6059 self._render_cte_clause( 

6060 nesting_level=nesting_level, 

6061 include_following_stack=True, 

6062 ), 

6063 select_text, 

6064 ) 

6065 else: 

6066 text += " %s" % select_text 

6067 elif not crud_params_single and supports_default_values: 

6068 text += " DEFAULT VALUES" 

6069 if use_insertmanyvalues: 

6070 self._insertmanyvalues = _InsertManyValues( 

6071 True, 

6072 self.dialect.default_metavalue_token, 

6073 cast( 

6074 "List[crud._CrudParamElementStr]", crud_params_single 

6075 ), 

6076 counted_bindparam, 

6077 sort_by_parameter_order=( 

6078 insert_stmt._sort_by_parameter_order 

6079 ), 

6080 includes_upsert_behaviors=( 

6081 insert_stmt._post_values_clause is not None 

6082 ), 

6083 sentinel_columns=add_sentinel_cols, 

6084 num_sentinel_columns=( 

6085 len(add_sentinel_cols) if add_sentinel_cols else 0 

6086 ), 

6087 implicit_sentinel=implicit_sentinel, 

6088 ) 

6089 elif compile_state._has_multi_parameters: 

6090 text += " VALUES %s" % ( 

6091 ", ".join( 

6092 "(%s)" 

6093 % (", ".join(value for _, _, value, _ in crud_param_set)) 

6094 for crud_param_set in crud_params_struct.all_multi_params 

6095 ), 

6096 ) 

6097 else: 

6098 insert_single_values_expr = ", ".join( 

6099 [ 

6100 value 

6101 for _, _, value, _ in cast( 

6102 "List[crud._CrudParamElementStr]", 

6103 crud_params_single, 

6104 ) 

6105 ] 

6106 ) 

6107 

6108 if use_insertmanyvalues: 

6109 if ( 

6110 implicit_sentinel 

6111 and ( 

6112 self.dialect.insertmanyvalues_implicit_sentinel 

6113 & InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT 

6114 ) 

6115 # this is checking if we have 

6116 # INSERT INTO table (id) VALUES (DEFAULT). 

6117 and not (crud_params_struct.is_default_metavalue_only) 

6118 ): 

6119 # if we have a sentinel column that is server generated, 

6120 # then for selected backends render the VALUES list as a 

6121 # subquery. This is the orderable form supported by 

6122 # PostgreSQL and SQL Server. 

6123 embed_sentinel_value = True 

6124 

6125 render_bind_casts = ( 

6126 self.dialect.insertmanyvalues_implicit_sentinel 

6127 & InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS 

6128 ) 

6129 

6130 colnames = ", ".join( 

6131 f"p{i}" for i, _ in enumerate(crud_params_single) 

6132 ) 

6133 

6134 if render_bind_casts: 

6135 # render casts for the SELECT list. For PG, we are 

6136 # already rendering bind casts in the parameter list, 

6137 # selectively for the more "tricky" types like ARRAY. 

6138 # however, even for the "easy" types, if the parameter 

6139 # is NULL for every entry, PG gives up and says 

6140 # "it must be TEXT", which fails for other easy types 

6141 # like ints. So we cast on this side too. 

6142 colnames_w_cast = ", ".join( 

6143 self.render_bind_cast( 

6144 col.type, 

6145 col.type._unwrapped_dialect_impl(self.dialect), 

6146 f"p{i}", 

6147 ) 

6148 for i, (col, *_) in enumerate(crud_params_single) 

6149 ) 

6150 else: 

6151 colnames_w_cast = colnames 

6152 

6153 text += ( 

6154 f" SELECT {colnames_w_cast} FROM " 

6155 f"(VALUES ({insert_single_values_expr})) " 

6156 f"AS imp_sen({colnames}, sen_counter) " 

6157 "ORDER BY sen_counter" 

6158 ) 

6159 else: 

6160 # otherwise, if no sentinel or backend doesn't support 

6161 # orderable subquery form, use a plain VALUES list 

6162 embed_sentinel_value = False 

6163 text += f" VALUES ({insert_single_values_expr})" 

6164 

6165 self._insertmanyvalues = _InsertManyValues( 

6166 is_default_expr=False, 

6167 single_values_expr=insert_single_values_expr, 

6168 insert_crud_params=cast( 

6169 "List[crud._CrudParamElementStr]", 

6170 crud_params_single, 

6171 ), 

6172 num_positional_params_counted=counted_bindparam, 

6173 sort_by_parameter_order=( 

6174 insert_stmt._sort_by_parameter_order 

6175 ), 

6176 includes_upsert_behaviors=( 

6177 insert_stmt._post_values_clause is not None 

6178 ), 

6179 sentinel_columns=add_sentinel_cols, 

6180 num_sentinel_columns=( 

6181 len(add_sentinel_cols) if add_sentinel_cols else 0 

6182 ), 

6183 sentinel_param_keys=named_sentinel_params, 

6184 implicit_sentinel=implicit_sentinel, 

6185 embed_values_counter=embed_sentinel_value, 

6186 ) 

6187 

6188 else: 

6189 text += f" VALUES ({insert_single_values_expr})" 

6190 

6191 if insert_stmt._post_values_clause is not None: 

6192 post_values_clause = self.process( 

6193 insert_stmt._post_values_clause, **kw 

6194 ) 

6195 if post_values_clause: 

6196 text += " " + post_values_clause 

6197 

6198 if returning_clause and not self.returning_precedes_values: 

6199 text += " " + returning_clause 

6200 

6201 if self.ctes and not self.dialect.cte_follows_insert: 

6202 nesting_level = len(self.stack) if not toplevel else None 

6203 text = ( 

6204 self._render_cte_clause( 

6205 nesting_level=nesting_level, 

6206 include_following_stack=True, 

6207 ) 

6208 + text 

6209 ) 

6210 

6211 self.stack.pop(-1) 

6212 

6213 return text 

6214 

6215 def update_tables_clause(self, update_stmt, from_table, extra_froms, **kw): 

6216 """Provide a hook to override the initial table clause 

6217 in an UPDATE statement. 

6218 

6219 MySQL overrides this. 

6220 

6221 """ 

6222 kw["asfrom"] = True 

6223 return from_table._compiler_dispatch(self, iscrud=True, **kw) 

6224 

6225 def update_from_clause( 

6226 self, update_stmt, from_table, extra_froms, from_hints, **kw 

6227 ): 

6228 """Provide a hook to override the generation of an 

6229 UPDATE..FROM clause. 

6230 MySQL and MSSQL override this. 

6231 """ 

6232 raise NotImplementedError( 

6233 "This backend does not support multiple-table " 

6234 "criteria within UPDATE" 

6235 ) 

6236 

6237 def update_post_criteria_clause( 

6238 self, update_stmt: Update, **kw: Any 

6239 ) -> Optional[str]: 

6240 """provide a hook to override generation after the WHERE criteria 

6241 in an UPDATE statement 

6242 

6243 .. versionadded:: 2.1 

6244 

6245 """ 

6246 if update_stmt._post_criteria_clause is not None: 

6247 return self.process( 

6248 update_stmt._post_criteria_clause, 

6249 **kw, 

6250 ) 

6251 else: 

6252 return None 

6253 

6254 def delete_post_criteria_clause( 

6255 self, delete_stmt: Delete, **kw: Any 

6256 ) -> Optional[str]: 

6257 """provide a hook to override generation after the WHERE criteria 

6258 in a DELETE statement 

6259 

6260 .. versionadded:: 2.1 

6261 

6262 """ 

6263 if delete_stmt._post_criteria_clause is not None: 

6264 return self.process( 

6265 delete_stmt._post_criteria_clause, 

6266 **kw, 

6267 ) 

6268 else: 

6269 return None 

6270 

6271 def visit_update( 

6272 self, 

6273 update_stmt: Update, 

6274 visiting_cte: Optional[CTE] = None, 

6275 **kw: Any, 

6276 ) -> str: 

6277 compile_state = update_stmt._compile_state_factory( 

6278 update_stmt, self, **kw 

6279 ) 

6280 if TYPE_CHECKING: 

6281 assert isinstance(compile_state, UpdateDMLState) 

6282 update_stmt = compile_state.statement # type: ignore[assignment] 

6283 

6284 if visiting_cte is not None: 

6285 kw["visiting_cte"] = visiting_cte 

6286 toplevel = False 

6287 else: 

6288 toplevel = not self.stack 

6289 

6290 if toplevel: 

6291 self.isupdate = True 

6292 if not self.dml_compile_state: 

6293 self.dml_compile_state = compile_state 

6294 if not self.compile_state: 

6295 self.compile_state = compile_state 

6296 

6297 if self.linting & COLLECT_CARTESIAN_PRODUCTS: 

6298 from_linter = FromLinter({}, set()) 

6299 warn_linting = self.linting & WARN_LINTING 

6300 if toplevel: 

6301 self.from_linter = from_linter 

6302 else: 

6303 from_linter = None 

6304 warn_linting = False 

6305 

6306 extra_froms = compile_state._extra_froms 

6307 is_multitable = bool(extra_froms) 

6308 

6309 if is_multitable: 

6310 # main table might be a JOIN 

6311 main_froms = set(_from_objects(update_stmt.table)) 

6312 render_extra_froms = [ 

6313 f for f in extra_froms if f not in main_froms 

6314 ] 

6315 correlate_froms = main_froms.union(extra_froms) 

6316 else: 

6317 render_extra_froms = [] 

6318 correlate_froms = {update_stmt.table} 

6319 

6320 self.stack.append( 

6321 { 

6322 "correlate_froms": correlate_froms, 

6323 "asfrom_froms": correlate_froms, 

6324 "selectable": update_stmt, 

6325 } 

6326 ) 

6327 

6328 text = "UPDATE " 

6329 

6330 if update_stmt._prefixes: 

6331 text += self._generate_prefixes( 

6332 update_stmt, update_stmt._prefixes, **kw 

6333 ) 

6334 

6335 table_text = self.update_tables_clause( 

6336 update_stmt, 

6337 update_stmt.table, 

6338 render_extra_froms, 

6339 from_linter=from_linter, 

6340 **kw, 

6341 ) 

6342 crud_params_struct = crud._get_crud_params( 

6343 self, update_stmt, compile_state, toplevel, **kw 

6344 ) 

6345 crud_params = crud_params_struct.single_params 

6346 

6347 if update_stmt._hints: 

6348 dialect_hints, table_text = self._setup_crud_hints( 

6349 update_stmt, table_text 

6350 ) 

6351 else: 

6352 dialect_hints = None 

6353 

6354 if update_stmt._independent_ctes: 

6355 self._dispatch_independent_ctes(update_stmt, kw) 

6356 

6357 text += table_text 

6358 

6359 text += " SET " 

6360 text += ", ".join( 

6361 expr + "=" + value 

6362 for _, expr, value, _ in cast( 

6363 "List[Tuple[Any, str, str, Any]]", crud_params 

6364 ) 

6365 ) 

6366 

6367 if self.implicit_returning or update_stmt._returning: 

6368 if self.returning_precedes_values: 

6369 text += " " + self.returning_clause( 

6370 update_stmt, 

6371 self.implicit_returning or update_stmt._returning, 

6372 populate_result_map=toplevel, 

6373 ) 

6374 

6375 if extra_froms: 

6376 extra_from_text = self.update_from_clause( 

6377 update_stmt, 

6378 update_stmt.table, 

6379 render_extra_froms, 

6380 dialect_hints, 

6381 from_linter=from_linter, 

6382 **kw, 

6383 ) 

6384 if extra_from_text: 

6385 text += " " + extra_from_text 

6386 

6387 if update_stmt._where_criteria: 

6388 t = self._generate_delimited_and_list( 

6389 update_stmt._where_criteria, from_linter=from_linter, **kw 

6390 ) 

6391 if t: 

6392 text += " WHERE " + t 

6393 

6394 ulc = self.update_post_criteria_clause( 

6395 update_stmt, from_linter=from_linter, **kw 

6396 ) 

6397 if ulc: 

6398 text += " " + ulc 

6399 

6400 if ( 

6401 self.implicit_returning or update_stmt._returning 

6402 ) and not self.returning_precedes_values: 

6403 text += " " + self.returning_clause( 

6404 update_stmt, 

6405 self.implicit_returning or update_stmt._returning, 

6406 populate_result_map=toplevel, 

6407 ) 

6408 

6409 if self.ctes: 

6410 nesting_level = len(self.stack) if not toplevel else None 

6411 text = self._render_cte_clause(nesting_level=nesting_level) + text 

6412 

6413 if warn_linting: 

6414 assert from_linter is not None 

6415 from_linter.warn(stmt_type="UPDATE") 

6416 

6417 self.stack.pop(-1) 

6418 

6419 return text # type: ignore[no-any-return] 

6420 

6421 def delete_extra_from_clause( 

6422 self, delete_stmt, from_table, extra_froms, from_hints, **kw 

6423 ): 

6424 """Provide a hook to override the generation of an 

6425 DELETE..FROM clause. 

6426 

6427 This can be used to implement DELETE..USING for example. 

6428 

6429 MySQL and MSSQL override this. 

6430 

6431 """ 

6432 raise NotImplementedError( 

6433 "This backend does not support multiple-table " 

6434 "criteria within DELETE" 

6435 ) 

6436 

6437 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw): 

6438 return from_table._compiler_dispatch( 

6439 self, asfrom=True, iscrud=True, **kw 

6440 ) 

6441 

6442 def visit_delete(self, delete_stmt, visiting_cte=None, **kw): 

6443 compile_state = delete_stmt._compile_state_factory( 

6444 delete_stmt, self, **kw 

6445 ) 

6446 delete_stmt = compile_state.statement 

6447 

6448 if visiting_cte is not None: 

6449 kw["visiting_cte"] = visiting_cte 

6450 toplevel = False 

6451 else: 

6452 toplevel = not self.stack 

6453 

6454 if toplevel: 

6455 self.isdelete = True 

6456 if not self.dml_compile_state: 

6457 self.dml_compile_state = compile_state 

6458 if not self.compile_state: 

6459 self.compile_state = compile_state 

6460 

6461 if self.linting & COLLECT_CARTESIAN_PRODUCTS: 

6462 from_linter = FromLinter({}, set()) 

6463 warn_linting = self.linting & WARN_LINTING 

6464 if toplevel: 

6465 self.from_linter = from_linter 

6466 else: 

6467 from_linter = None 

6468 warn_linting = False 

6469 

6470 extra_froms = compile_state._extra_froms 

6471 

6472 correlate_froms = {delete_stmt.table}.union(extra_froms) 

6473 self.stack.append( 

6474 { 

6475 "correlate_froms": correlate_froms, 

6476 "asfrom_froms": correlate_froms, 

6477 "selectable": delete_stmt, 

6478 } 

6479 ) 

6480 

6481 text = "DELETE " 

6482 

6483 if delete_stmt._prefixes: 

6484 text += self._generate_prefixes( 

6485 delete_stmt, delete_stmt._prefixes, **kw 

6486 ) 

6487 

6488 text += "FROM " 

6489 

6490 try: 

6491 table_text = self.delete_table_clause( 

6492 delete_stmt, 

6493 delete_stmt.table, 

6494 extra_froms, 

6495 from_linter=from_linter, 

6496 ) 

6497 except TypeError: 

6498 # anticipate 3rd party dialects that don't include **kw 

6499 # TODO: remove in 2.1 

6500 table_text = self.delete_table_clause( 

6501 delete_stmt, delete_stmt.table, extra_froms 

6502 ) 

6503 if from_linter: 

6504 _ = self.process(delete_stmt.table, from_linter=from_linter) 

6505 

6506 crud._get_crud_params(self, delete_stmt, compile_state, toplevel, **kw) 

6507 

6508 if delete_stmt._hints: 

6509 dialect_hints, table_text = self._setup_crud_hints( 

6510 delete_stmt, table_text 

6511 ) 

6512 else: 

6513 dialect_hints = None 

6514 

6515 if delete_stmt._independent_ctes: 

6516 self._dispatch_independent_ctes(delete_stmt, kw) 

6517 

6518 text += table_text 

6519 

6520 if ( 

6521 self.implicit_returning or delete_stmt._returning 

6522 ) and self.returning_precedes_values: 

6523 text += " " + self.returning_clause( 

6524 delete_stmt, 

6525 self.implicit_returning or delete_stmt._returning, 

6526 populate_result_map=toplevel, 

6527 ) 

6528 

6529 if extra_froms: 

6530 extra_from_text = self.delete_extra_from_clause( 

6531 delete_stmt, 

6532 delete_stmt.table, 

6533 extra_froms, 

6534 dialect_hints, 

6535 from_linter=from_linter, 

6536 **kw, 

6537 ) 

6538 if extra_from_text: 

6539 text += " " + extra_from_text 

6540 

6541 if delete_stmt._where_criteria: 

6542 t = self._generate_delimited_and_list( 

6543 delete_stmt._where_criteria, from_linter=from_linter, **kw 

6544 ) 

6545 if t: 

6546 text += " WHERE " + t 

6547 

6548 dlc = self.delete_post_criteria_clause( 

6549 delete_stmt, from_linter=from_linter, **kw 

6550 ) 

6551 if dlc: 

6552 text += " " + dlc 

6553 

6554 if ( 

6555 self.implicit_returning or delete_stmt._returning 

6556 ) and not self.returning_precedes_values: 

6557 text += " " + self.returning_clause( 

6558 delete_stmt, 

6559 self.implicit_returning or delete_stmt._returning, 

6560 populate_result_map=toplevel, 

6561 ) 

6562 

6563 if self.ctes: 

6564 nesting_level = len(self.stack) if not toplevel else None 

6565 text = self._render_cte_clause(nesting_level=nesting_level) + text 

6566 

6567 if warn_linting: 

6568 assert from_linter is not None 

6569 from_linter.warn(stmt_type="DELETE") 

6570 

6571 self.stack.pop(-1) 

6572 

6573 return text 

6574 

6575 def visit_savepoint(self, savepoint_stmt, **kw): 

6576 return "SAVEPOINT %s" % self.preparer.format_savepoint(savepoint_stmt) 

6577 

6578 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw): 

6579 return "ROLLBACK TO SAVEPOINT %s" % self.preparer.format_savepoint( 

6580 savepoint_stmt 

6581 ) 

6582 

6583 def visit_release_savepoint(self, savepoint_stmt, **kw): 

6584 return "RELEASE SAVEPOINT %s" % self.preparer.format_savepoint( 

6585 savepoint_stmt 

6586 ) 

6587 

6588 

6589class StrSQLCompiler(SQLCompiler): 

6590 """A :class:`.SQLCompiler` subclass which allows a small selection 

6591 of non-standard SQL features to render into a string value. 

6592 

6593 The :class:`.StrSQLCompiler` is invoked whenever a Core expression 

6594 element is directly stringified without calling upon the 

6595 :meth:`_expression.ClauseElement.compile` method. 

6596 It can render a limited set 

6597 of non-standard SQL constructs to assist in basic stringification, 

6598 however for more substantial custom or dialect-specific SQL constructs, 

6599 it will be necessary to make use of 

6600 :meth:`_expression.ClauseElement.compile` 

6601 directly. 

6602 

6603 .. seealso:: 

6604 

6605 :ref:`faq_sql_expression_string` 

6606 

6607 """ 

6608 

6609 def _fallback_column_name(self, column): 

6610 return "<name unknown>" 

6611 

6612 @util.preload_module("sqlalchemy.engine.url") 

6613 def visit_unsupported_compilation(self, element, err, **kw): 

6614 if element.stringify_dialect != "default": 

6615 url = util.preloaded.engine_url 

6616 dialect = url.URL.create(element.stringify_dialect).get_dialect()() 

6617 

6618 compiler = dialect.statement_compiler( 

6619 dialect, None, _supporting_against=self 

6620 ) 

6621 if not isinstance(compiler, StrSQLCompiler): 

6622 return compiler.process(element, **kw) 

6623 

6624 return super().visit_unsupported_compilation(element, err) 

6625 

6626 def visit_getitem_binary(self, binary, operator, **kw): 

6627 return "%s[%s]" % ( 

6628 self.process(binary.left, **kw), 

6629 self.process(binary.right, **kw), 

6630 ) 

6631 

6632 def visit_json_getitem_op_binary(self, binary, operator, **kw): 

6633 return self.visit_getitem_binary(binary, operator, **kw) 

6634 

6635 def visit_json_path_getitem_op_binary(self, binary, operator, **kw): 

6636 return self.visit_getitem_binary(binary, operator, **kw) 

6637 

6638 def visit_sequence(self, sequence, **kw): 

6639 return ( 

6640 f"<next sequence value: {self.preparer.format_sequence(sequence)}>" 

6641 ) 

6642 

6643 def returning_clause( 

6644 self, 

6645 stmt: UpdateBase, 

6646 returning_cols: Sequence[_ColumnsClauseElement], 

6647 *, 

6648 populate_result_map: bool, 

6649 **kw: Any, 

6650 ) -> str: 

6651 columns = [ 

6652 self._label_select_column(None, c, True, False, {}) 

6653 for c in base._select_iterables(returning_cols) 

6654 ] 

6655 return "RETURNING " + ", ".join(columns) 

6656 

6657 def update_from_clause( 

6658 self, update_stmt, from_table, extra_froms, from_hints, **kw 

6659 ): 

6660 kw["asfrom"] = True 

6661 return "FROM " + ", ".join( 

6662 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

6663 for t in extra_froms 

6664 ) 

6665 

6666 def delete_extra_from_clause( 

6667 self, delete_stmt, from_table, extra_froms, from_hints, **kw 

6668 ): 

6669 kw["asfrom"] = True 

6670 return ", " + ", ".join( 

6671 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

6672 for t in extra_froms 

6673 ) 

6674 

6675 def visit_empty_set_expr(self, element_types, **kw): 

6676 return "SELECT 1 WHERE 1!=1" 

6677 

6678 def get_from_hint_text(self, table, text): 

6679 return "[%s]" % text 

6680 

6681 def visit_regexp_match_op_binary(self, binary, operator, **kw): 

6682 return self._generate_generic_binary(binary, " <regexp> ", **kw) 

6683 

6684 def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 

6685 return self._generate_generic_binary(binary, " <not regexp> ", **kw) 

6686 

6687 def visit_regexp_replace_op_binary(self, binary, operator, **kw): 

6688 return "<regexp replace>(%s, %s)" % ( 

6689 binary.left._compiler_dispatch(self, **kw), 

6690 binary.right._compiler_dispatch(self, **kw), 

6691 ) 

6692 

6693 def visit_try_cast(self, cast, **kwargs): 

6694 return "TRY_CAST(%s AS %s)" % ( 

6695 cast.clause._compiler_dispatch(self, **kwargs), 

6696 cast.typeclause._compiler_dispatch(self, **kwargs), 

6697 ) 

6698 

6699 

6700class DDLCompiler(Compiled): 

6701 is_ddl = True 

6702 

6703 if TYPE_CHECKING: 

6704 

6705 def __init__( 

6706 self, 

6707 dialect: Dialect, 

6708 statement: ExecutableDDLElement, 

6709 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

6710 render_schema_translate: bool = ..., 

6711 compile_kwargs: Mapping[str, Any] = ..., 

6712 ): ... 

6713 

6714 @util.ro_memoized_property 

6715 def sql_compiler(self) -> SQLCompiler: 

6716 return self.dialect.statement_compiler( 

6717 self.dialect, None, schema_translate_map=self.schema_translate_map 

6718 ) 

6719 

6720 @util.memoized_property 

6721 def type_compiler(self): 

6722 return self.dialect.type_compiler_instance 

6723 

6724 def construct_params( 

6725 self, 

6726 params: Optional[_CoreSingleExecuteParams] = None, 

6727 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None, 

6728 escape_names: bool = True, 

6729 ) -> Optional[_MutableCoreSingleExecuteParams]: 

6730 return None 

6731 

6732 def visit_ddl(self, ddl, **kwargs): 

6733 # table events can substitute table and schema name 

6734 context = ddl.context 

6735 if isinstance(ddl.target, schema.Table): 

6736 context = context.copy() 

6737 

6738 preparer = self.preparer 

6739 path = preparer.format_table_seq(ddl.target) 

6740 if len(path) == 1: 

6741 table, sch = path[0], "" 

6742 else: 

6743 table, sch = path[-1], path[0] 

6744 

6745 context.setdefault("table", table) 

6746 context.setdefault("schema", sch) 

6747 context.setdefault("fullname", preparer.format_table(ddl.target)) 

6748 

6749 return self.sql_compiler.post_process_text(ddl.statement % context) 

6750 

6751 def visit_create_schema(self, create, **kw): 

6752 text = "CREATE SCHEMA " 

6753 if create.if_not_exists: 

6754 text += "IF NOT EXISTS " 

6755 return text + self.preparer.format_schema(create.element) 

6756 

6757 def visit_drop_schema(self, drop, **kw): 

6758 text = "DROP SCHEMA " 

6759 if drop.if_exists: 

6760 text += "IF EXISTS " 

6761 text += self.preparer.format_schema(drop.element) 

6762 if drop.cascade: 

6763 text += " CASCADE" 

6764 return text 

6765 

6766 def visit_create_table(self, create, **kw): 

6767 table = create.element 

6768 preparer = self.preparer 

6769 

6770 text = "\nCREATE " 

6771 if table._prefixes: 

6772 text += " ".join(table._prefixes) + " " 

6773 

6774 text += "TABLE " 

6775 if create.if_not_exists: 

6776 text += "IF NOT EXISTS " 

6777 

6778 text += preparer.format_table(table) + " " 

6779 

6780 create_table_suffix = self.create_table_suffix(table) 

6781 if create_table_suffix: 

6782 text += create_table_suffix + " " 

6783 

6784 text += "(" 

6785 

6786 separator = "\n" 

6787 

6788 # if only one primary key, specify it along with the column 

6789 first_pk = False 

6790 for create_column in create.columns: 

6791 column = create_column.element 

6792 try: 

6793 processed = self.process( 

6794 create_column, first_pk=column.primary_key and not first_pk 

6795 ) 

6796 if processed is not None: 

6797 text += separator 

6798 separator = ", \n" 

6799 text += "\t" + processed 

6800 if column.primary_key: 

6801 first_pk = True 

6802 except exc.CompileError as ce: 

6803 raise exc.CompileError( 

6804 "(in table '%s', column '%s'): %s" 

6805 % (table.description, column.name, ce.args[0]) 

6806 ) from ce 

6807 

6808 const = self.create_table_constraints( 

6809 table, 

6810 _include_foreign_key_constraints=create.include_foreign_key_constraints, # noqa 

6811 ) 

6812 if const: 

6813 text += separator + "\t" + const 

6814 

6815 text += "\n)%s\n\n" % self.post_create_table(table) 

6816 return text 

6817 

6818 def visit_create_column(self, create, first_pk=False, **kw): 

6819 column = create.element 

6820 

6821 if column.system: 

6822 return None 

6823 

6824 text = self.get_column_specification(column, first_pk=first_pk) 

6825 const = " ".join( 

6826 self.process(constraint) for constraint in column.constraints 

6827 ) 

6828 if const: 

6829 text += " " + const 

6830 

6831 return text 

6832 

6833 def create_table_constraints( 

6834 self, table, _include_foreign_key_constraints=None, **kw 

6835 ): 

6836 # On some DB order is significant: visit PK first, then the 

6837 # other constraints (engine.ReflectionTest.testbasic failed on FB2) 

6838 constraints = [] 

6839 if table.primary_key: 

6840 constraints.append(table.primary_key) 

6841 

6842 all_fkcs = table.foreign_key_constraints 

6843 if _include_foreign_key_constraints is not None: 

6844 omit_fkcs = all_fkcs.difference(_include_foreign_key_constraints) 

6845 else: 

6846 omit_fkcs = set() 

6847 

6848 constraints.extend( 

6849 [ 

6850 c 

6851 for c in table._sorted_constraints 

6852 if c is not table.primary_key and c not in omit_fkcs 

6853 ] 

6854 ) 

6855 

6856 return ", \n\t".join( 

6857 p 

6858 for p in ( 

6859 self.process(constraint) 

6860 for constraint in constraints 

6861 if (constraint._should_create_for_compiler(self)) 

6862 and ( 

6863 not self.dialect.supports_alter 

6864 or not getattr(constraint, "use_alter", False) 

6865 ) 

6866 ) 

6867 if p is not None 

6868 ) 

6869 

6870 def visit_drop_table(self, drop, **kw): 

6871 text = "\nDROP TABLE " 

6872 if drop.if_exists: 

6873 text += "IF EXISTS " 

6874 return text + self.preparer.format_table(drop.element) 

6875 

6876 def visit_drop_view(self, drop, **kw): 

6877 return "\nDROP VIEW " + self.preparer.format_table(drop.element) 

6878 

6879 def _verify_index_table(self, index: Index) -> None: 

6880 if index.table is None: 

6881 raise exc.CompileError( 

6882 "Index '%s' is not associated with any table." % index.name 

6883 ) 

6884 

6885 def visit_create_index( 

6886 self, create, include_schema=False, include_table_schema=True, **kw 

6887 ): 

6888 index = create.element 

6889 self._verify_index_table(index) 

6890 preparer = self.preparer 

6891 text = "CREATE " 

6892 if index.unique: 

6893 text += "UNIQUE " 

6894 if index.name is None: 

6895 raise exc.CompileError( 

6896 "CREATE INDEX requires that the index have a name" 

6897 ) 

6898 

6899 text += "INDEX " 

6900 if create.if_not_exists: 

6901 text += "IF NOT EXISTS " 

6902 

6903 text += "%s ON %s (%s)" % ( 

6904 self._prepared_index_name(index, include_schema=include_schema), 

6905 preparer.format_table( 

6906 index.table, use_schema=include_table_schema 

6907 ), 

6908 ", ".join( 

6909 self.sql_compiler.process( 

6910 expr, include_table=False, literal_binds=True 

6911 ) 

6912 for expr in index.expressions 

6913 ), 

6914 ) 

6915 return text 

6916 

6917 def visit_drop_index(self, drop, **kw): 

6918 index = drop.element 

6919 

6920 if index.name is None: 

6921 raise exc.CompileError( 

6922 "DROP INDEX requires that the index have a name" 

6923 ) 

6924 text = "\nDROP INDEX " 

6925 if drop.if_exists: 

6926 text += "IF EXISTS " 

6927 

6928 return text + self._prepared_index_name(index, include_schema=True) 

6929 

6930 def _prepared_index_name( 

6931 self, index: Index, include_schema: bool = False 

6932 ) -> str: 

6933 if index.table is not None: 

6934 effective_schema = self.preparer.schema_for_object(index.table) 

6935 else: 

6936 effective_schema = None 

6937 if include_schema and effective_schema: 

6938 schema_name = self.preparer.quote_schema(effective_schema) 

6939 else: 

6940 schema_name = None 

6941 

6942 index_name: str = self.preparer.format_index(index) 

6943 

6944 if schema_name: 

6945 index_name = schema_name + "." + index_name 

6946 return index_name 

6947 

6948 def visit_add_constraint(self, create, **kw): 

6949 return "ALTER TABLE %s ADD %s" % ( 

6950 self.preparer.format_table(create.element.table), 

6951 self.process(create.element), 

6952 ) 

6953 

6954 def visit_set_table_comment(self, create, **kw): 

6955 return "COMMENT ON TABLE %s IS %s" % ( 

6956 self.preparer.format_table(create.element), 

6957 self.sql_compiler.render_literal_value( 

6958 create.element.comment, sqltypes.String() 

6959 ), 

6960 ) 

6961 

6962 def visit_drop_table_comment(self, drop, **kw): 

6963 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table( 

6964 drop.element 

6965 ) 

6966 

6967 def visit_set_column_comment(self, create, **kw): 

6968 return "COMMENT ON COLUMN %s IS %s" % ( 

6969 self.preparer.format_column( 

6970 create.element, use_table=True, use_schema=True 

6971 ), 

6972 self.sql_compiler.render_literal_value( 

6973 create.element.comment, sqltypes.String() 

6974 ), 

6975 ) 

6976 

6977 def visit_drop_column_comment(self, drop, **kw): 

6978 return "COMMENT ON COLUMN %s IS NULL" % self.preparer.format_column( 

6979 drop.element, use_table=True 

6980 ) 

6981 

6982 def visit_set_constraint_comment(self, create, **kw): 

6983 raise exc.UnsupportedCompilationError(self, type(create)) 

6984 

6985 def visit_drop_constraint_comment(self, drop, **kw): 

6986 raise exc.UnsupportedCompilationError(self, type(drop)) 

6987 

6988 def get_identity_options(self, identity_options): 

6989 text = [] 

6990 if identity_options.increment is not None: 

6991 text.append("INCREMENT BY %d" % identity_options.increment) 

6992 if identity_options.start is not None: 

6993 text.append("START WITH %d" % identity_options.start) 

6994 if identity_options.minvalue is not None: 

6995 text.append("MINVALUE %d" % identity_options.minvalue) 

6996 if identity_options.maxvalue is not None: 

6997 text.append("MAXVALUE %d" % identity_options.maxvalue) 

6998 if identity_options.nominvalue is not None: 

6999 text.append("NO MINVALUE") 

7000 if identity_options.nomaxvalue is not None: 

7001 text.append("NO MAXVALUE") 

7002 if identity_options.cache is not None: 

7003 text.append("CACHE %d" % identity_options.cache) 

7004 if identity_options.cycle is not None: 

7005 text.append("CYCLE" if identity_options.cycle else "NO CYCLE") 

7006 return " ".join(text) 

7007 

7008 def visit_create_sequence(self, create, prefix=None, **kw): 

7009 text = "CREATE SEQUENCE " 

7010 if create.if_not_exists: 

7011 text += "IF NOT EXISTS " 

7012 text += self.preparer.format_sequence(create.element) 

7013 

7014 if prefix: 

7015 text += prefix 

7016 options = self.get_identity_options(create.element) 

7017 if options: 

7018 text += " " + options 

7019 return text 

7020 

7021 def visit_drop_sequence(self, drop, **kw): 

7022 text = "DROP SEQUENCE " 

7023 if drop.if_exists: 

7024 text += "IF EXISTS " 

7025 return text + self.preparer.format_sequence(drop.element) 

7026 

7027 def visit_drop_constraint(self, drop, **kw): 

7028 constraint = drop.element 

7029 if constraint.name is not None: 

7030 formatted_name = self.preparer.format_constraint(constraint) 

7031 else: 

7032 formatted_name = None 

7033 

7034 if formatted_name is None: 

7035 raise exc.CompileError( 

7036 "Can't emit DROP CONSTRAINT for constraint %r; " 

7037 "it has no name" % drop.element 

7038 ) 

7039 return "ALTER TABLE %s DROP CONSTRAINT %s%s%s" % ( 

7040 self.preparer.format_table(drop.element.table), 

7041 "IF EXISTS " if drop.if_exists else "", 

7042 formatted_name, 

7043 " CASCADE" if drop.cascade else "", 

7044 ) 

7045 

7046 def get_column_specification(self, column, **kwargs): 

7047 colspec = ( 

7048 self.preparer.format_column(column) 

7049 + " " 

7050 + self.dialect.type_compiler_instance.process( 

7051 column.type, type_expression=column 

7052 ) 

7053 ) 

7054 default = self.get_column_default_string(column) 

7055 if default is not None: 

7056 colspec += " DEFAULT " + default 

7057 

7058 if column.computed is not None: 

7059 colspec += " " + self.process(column.computed) 

7060 

7061 if ( 

7062 column.identity is not None 

7063 and self.dialect.supports_identity_columns 

7064 ): 

7065 colspec += " " + self.process(column.identity) 

7066 

7067 if not column.nullable and ( 

7068 not column.identity or not self.dialect.supports_identity_columns 

7069 ): 

7070 colspec += " NOT NULL" 

7071 return colspec 

7072 

7073 def create_table_suffix(self, table): 

7074 return "" 

7075 

7076 def post_create_table(self, table): 

7077 return "" 

7078 

7079 def get_column_default_string(self, column: Column[Any]) -> Optional[str]: 

7080 if isinstance(column.server_default, schema.DefaultClause): 

7081 return self.render_default_string(column.server_default.arg) 

7082 else: 

7083 return None 

7084 

7085 def render_default_string(self, default: Union[Visitable, str]) -> str: 

7086 if isinstance(default, str): 

7087 return self.sql_compiler.render_literal_value( 

7088 default, sqltypes.STRINGTYPE 

7089 ) 

7090 else: 

7091 return self.sql_compiler.process(default, literal_binds=True) 

7092 

7093 def visit_table_or_column_check_constraint(self, constraint, **kw): 

7094 if constraint.is_column_level: 

7095 return self.visit_column_check_constraint(constraint) 

7096 else: 

7097 return self.visit_check_constraint(constraint) 

7098 

7099 def visit_check_constraint(self, constraint, **kw): 

7100 text = "" 

7101 if constraint.name is not None: 

7102 formatted_name = self.preparer.format_constraint(constraint) 

7103 if formatted_name is not None: 

7104 text += "CONSTRAINT %s " % formatted_name 

7105 text += "CHECK (%s)" % self.sql_compiler.process( 

7106 constraint.sqltext, include_table=False, literal_binds=True 

7107 ) 

7108 text += self.define_constraint_deferrability(constraint) 

7109 return text 

7110 

7111 def visit_column_check_constraint(self, constraint, **kw): 

7112 text = "" 

7113 if constraint.name is not None: 

7114 formatted_name = self.preparer.format_constraint(constraint) 

7115 if formatted_name is not None: 

7116 text += "CONSTRAINT %s " % formatted_name 

7117 text += "CHECK (%s)" % self.sql_compiler.process( 

7118 constraint.sqltext, include_table=False, literal_binds=True 

7119 ) 

7120 text += self.define_constraint_deferrability(constraint) 

7121 return text 

7122 

7123 def visit_primary_key_constraint( 

7124 self, constraint: PrimaryKeyConstraint, **kw: Any 

7125 ) -> str: 

7126 if len(constraint) == 0: 

7127 return "" 

7128 text = "" 

7129 if constraint.name is not None: 

7130 formatted_name = self.preparer.format_constraint(constraint) 

7131 if formatted_name is not None: 

7132 text += "CONSTRAINT %s " % formatted_name 

7133 text += "PRIMARY KEY " 

7134 text += "(%s)" % ", ".join( 

7135 self.preparer.quote(c.name) 

7136 for c in ( 

7137 constraint.columns_autoinc_first 

7138 if constraint._implicit_generated 

7139 else constraint.columns 

7140 ) 

7141 ) 

7142 text += self.define_constraint_deferrability(constraint) 

7143 return text 

7144 

7145 def visit_foreign_key_constraint(self, constraint, **kw): 

7146 preparer = self.preparer 

7147 text = "" 

7148 if constraint.name is not None: 

7149 formatted_name = self.preparer.format_constraint(constraint) 

7150 if formatted_name is not None: 

7151 text += "CONSTRAINT %s " % formatted_name 

7152 remote_table = list(constraint.elements)[0].column.table 

7153 text += "FOREIGN KEY(%s) REFERENCES %s (%s)" % ( 

7154 ", ".join( 

7155 preparer.quote(f.parent.name) for f in constraint.elements 

7156 ), 

7157 self.define_constraint_remote_table( 

7158 constraint, remote_table, preparer 

7159 ), 

7160 ", ".join( 

7161 preparer.quote(f.column.name) for f in constraint.elements 

7162 ), 

7163 ) 

7164 text += self.define_constraint_match(constraint) 

7165 text += self.define_constraint_cascades(constraint) 

7166 text += self.define_constraint_deferrability(constraint) 

7167 return text 

7168 

7169 def define_constraint_remote_table(self, constraint, table, preparer): 

7170 """Format the remote table clause of a CREATE CONSTRAINT clause.""" 

7171 

7172 return preparer.format_table(table) 

7173 

7174 def visit_unique_constraint( 

7175 self, constraint: UniqueConstraint, **kw: Any 

7176 ) -> str: 

7177 if len(constraint) == 0: 

7178 return "" 

7179 text = "" 

7180 if constraint.name is not None: 

7181 formatted_name = self.preparer.format_constraint(constraint) 

7182 if formatted_name is not None: 

7183 text += "CONSTRAINT %s " % formatted_name 

7184 text += "UNIQUE %s(%s)" % ( 

7185 self.define_unique_constraint_distinct(constraint, **kw), 

7186 ", ".join(self.preparer.quote(c.name) for c in constraint), 

7187 ) 

7188 text += self.define_constraint_deferrability(constraint) 

7189 return text 

7190 

7191 def define_unique_constraint_distinct( 

7192 self, constraint: UniqueConstraint, **kw: Any 

7193 ) -> str: 

7194 return "" 

7195 

7196 def define_constraint_cascades( 

7197 self, constraint: ForeignKeyConstraint 

7198 ) -> str: 

7199 text = "" 

7200 if constraint.ondelete is not None: 

7201 text += self.define_constraint_ondelete_cascade(constraint) 

7202 

7203 if constraint.onupdate is not None: 

7204 text += self.define_constraint_onupdate_cascade(constraint) 

7205 return text 

7206 

7207 def define_constraint_ondelete_cascade( 

7208 self, constraint: ForeignKeyConstraint 

7209 ) -> str: 

7210 return " ON DELETE %s" % self.preparer.validate_sql_phrase( 

7211 constraint.ondelete, FK_ON_DELETE 

7212 ) 

7213 

7214 def define_constraint_onupdate_cascade( 

7215 self, constraint: ForeignKeyConstraint 

7216 ) -> str: 

7217 return " ON UPDATE %s" % self.preparer.validate_sql_phrase( 

7218 constraint.onupdate, FK_ON_UPDATE 

7219 ) 

7220 

7221 def define_constraint_deferrability(self, constraint: Constraint) -> str: 

7222 text = "" 

7223 if constraint.deferrable is not None: 

7224 if constraint.deferrable: 

7225 text += " DEFERRABLE" 

7226 else: 

7227 text += " NOT DEFERRABLE" 

7228 if constraint.initially is not None: 

7229 text += " INITIALLY %s" % self.preparer.validate_sql_phrase( 

7230 constraint.initially, FK_INITIALLY 

7231 ) 

7232 return text 

7233 

7234 def define_constraint_match(self, constraint): 

7235 text = "" 

7236 if constraint.match is not None: 

7237 text += " MATCH %s" % constraint.match 

7238 return text 

7239 

7240 def visit_computed_column(self, generated, **kw): 

7241 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process( 

7242 generated.sqltext, include_table=False, literal_binds=True 

7243 ) 

7244 if generated.persisted is True: 

7245 text += " STORED" 

7246 elif generated.persisted is False: 

7247 text += " VIRTUAL" 

7248 return text 

7249 

7250 def visit_identity_column(self, identity, **kw): 

7251 text = "GENERATED %s AS IDENTITY" % ( 

7252 "ALWAYS" if identity.always else "BY DEFAULT", 

7253 ) 

7254 options = self.get_identity_options(identity) 

7255 if options: 

7256 text += " (%s)" % options 

7257 return text 

7258 

7259 

7260class GenericTypeCompiler(TypeCompiler): 

7261 def visit_FLOAT(self, type_: sqltypes.Float[Any], **kw: Any) -> str: 

7262 return "FLOAT" 

7263 

7264 def visit_DOUBLE(self, type_: sqltypes.Double[Any], **kw: Any) -> str: 

7265 return "DOUBLE" 

7266 

7267 def visit_DOUBLE_PRECISION( 

7268 self, type_: sqltypes.DOUBLE_PRECISION[Any], **kw: Any 

7269 ) -> str: 

7270 return "DOUBLE PRECISION" 

7271 

7272 def visit_REAL(self, type_: sqltypes.REAL[Any], **kw: Any) -> str: 

7273 return "REAL" 

7274 

7275 def visit_NUMERIC(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str: 

7276 if type_.precision is None: 

7277 return "NUMERIC" 

7278 elif type_.scale is None: 

7279 return "NUMERIC(%(precision)s)" % {"precision": type_.precision} 

7280 else: 

7281 return "NUMERIC(%(precision)s, %(scale)s)" % { 

7282 "precision": type_.precision, 

7283 "scale": type_.scale, 

7284 } 

7285 

7286 def visit_DECIMAL(self, type_: sqltypes.DECIMAL[Any], **kw: Any) -> str: 

7287 if type_.precision is None: 

7288 return "DECIMAL" 

7289 elif type_.scale is None: 

7290 return "DECIMAL(%(precision)s)" % {"precision": type_.precision} 

7291 else: 

7292 return "DECIMAL(%(precision)s, %(scale)s)" % { 

7293 "precision": type_.precision, 

7294 "scale": type_.scale, 

7295 } 

7296 

7297 def visit_INTEGER(self, type_: sqltypes.Integer, **kw: Any) -> str: 

7298 return "INTEGER" 

7299 

7300 def visit_SMALLINT(self, type_: sqltypes.SmallInteger, **kw: Any) -> str: 

7301 return "SMALLINT" 

7302 

7303 def visit_BIGINT(self, type_: sqltypes.BigInteger, **kw: Any) -> str: 

7304 return "BIGINT" 

7305 

7306 def visit_TIMESTAMP(self, type_: sqltypes.TIMESTAMP, **kw: Any) -> str: 

7307 return "TIMESTAMP" 

7308 

7309 def visit_DATETIME(self, type_: sqltypes.DateTime, **kw: Any) -> str: 

7310 return "DATETIME" 

7311 

7312 def visit_DATE(self, type_: sqltypes.Date, **kw: Any) -> str: 

7313 return "DATE" 

7314 

7315 def visit_TIME(self, type_: sqltypes.Time, **kw: Any) -> str: 

7316 return "TIME" 

7317 

7318 def visit_CLOB(self, type_: sqltypes.CLOB, **kw: Any) -> str: 

7319 return "CLOB" 

7320 

7321 def visit_NCLOB(self, type_: sqltypes.Text, **kw: Any) -> str: 

7322 return "NCLOB" 

7323 

7324 def _render_string_type( 

7325 self, name: str, length: Optional[int], collation: Optional[str] 

7326 ) -> str: 

7327 text = name 

7328 if length: 

7329 text += f"({length})" 

7330 if collation: 

7331 text += f' COLLATE "{collation}"' 

7332 return text 

7333 

7334 def visit_CHAR(self, type_: sqltypes.CHAR, **kw: Any) -> str: 

7335 return self._render_string_type("CHAR", type_.length, type_.collation) 

7336 

7337 def visit_NCHAR(self, type_: sqltypes.NCHAR, **kw: Any) -> str: 

7338 return self._render_string_type("NCHAR", type_.length, type_.collation) 

7339 

7340 def visit_VARCHAR(self, type_: sqltypes.String, **kw: Any) -> str: 

7341 return self._render_string_type( 

7342 "VARCHAR", type_.length, type_.collation 

7343 ) 

7344 

7345 def visit_NVARCHAR(self, type_: sqltypes.NVARCHAR, **kw: Any) -> str: 

7346 return self._render_string_type( 

7347 "NVARCHAR", type_.length, type_.collation 

7348 ) 

7349 

7350 def visit_TEXT(self, type_: sqltypes.Text, **kw: Any) -> str: 

7351 return self._render_string_type("TEXT", type_.length, type_.collation) 

7352 

7353 def visit_UUID(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str: 

7354 return "UUID" 

7355 

7356 def visit_BLOB(self, type_: sqltypes.LargeBinary, **kw: Any) -> str: 

7357 return "BLOB" 

7358 

7359 def visit_BINARY(self, type_: sqltypes.BINARY, **kw: Any) -> str: 

7360 return "BINARY" + (type_.length and "(%d)" % type_.length or "") 

7361 

7362 def visit_VARBINARY(self, type_: sqltypes.VARBINARY, **kw: Any) -> str: 

7363 return "VARBINARY" + (type_.length and "(%d)" % type_.length or "") 

7364 

7365 def visit_BOOLEAN(self, type_: sqltypes.Boolean, **kw: Any) -> str: 

7366 return "BOOLEAN" 

7367 

7368 def visit_uuid(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str: 

7369 if not type_.native_uuid or not self.dialect.supports_native_uuid: 

7370 return self._render_string_type("CHAR", length=32, collation=None) 

7371 else: 

7372 return self.visit_UUID(type_, **kw) 

7373 

7374 def visit_large_binary( 

7375 self, type_: sqltypes.LargeBinary, **kw: Any 

7376 ) -> str: 

7377 return self.visit_BLOB(type_, **kw) 

7378 

7379 def visit_boolean(self, type_: sqltypes.Boolean, **kw: Any) -> str: 

7380 return self.visit_BOOLEAN(type_, **kw) 

7381 

7382 def visit_time(self, type_: sqltypes.Time, **kw: Any) -> str: 

7383 return self.visit_TIME(type_, **kw) 

7384 

7385 def visit_datetime(self, type_: sqltypes.DateTime, **kw: Any) -> str: 

7386 return self.visit_DATETIME(type_, **kw) 

7387 

7388 def visit_date(self, type_: sqltypes.Date, **kw: Any) -> str: 

7389 return self.visit_DATE(type_, **kw) 

7390 

7391 def visit_big_integer(self, type_: sqltypes.BigInteger, **kw: Any) -> str: 

7392 return self.visit_BIGINT(type_, **kw) 

7393 

7394 def visit_small_integer( 

7395 self, type_: sqltypes.SmallInteger, **kw: Any 

7396 ) -> str: 

7397 return self.visit_SMALLINT(type_, **kw) 

7398 

7399 def visit_integer(self, type_: sqltypes.Integer, **kw: Any) -> str: 

7400 return self.visit_INTEGER(type_, **kw) 

7401 

7402 def visit_real(self, type_: sqltypes.REAL[Any], **kw: Any) -> str: 

7403 return self.visit_REAL(type_, **kw) 

7404 

7405 def visit_float(self, type_: sqltypes.Float[Any], **kw: Any) -> str: 

7406 return self.visit_FLOAT(type_, **kw) 

7407 

7408 def visit_double(self, type_: sqltypes.Double[Any], **kw: Any) -> str: 

7409 return self.visit_DOUBLE(type_, **kw) 

7410 

7411 def visit_numeric(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str: 

7412 return self.visit_NUMERIC(type_, **kw) 

7413 

7414 def visit_string(self, type_: sqltypes.String, **kw: Any) -> str: 

7415 return self.visit_VARCHAR(type_, **kw) 

7416 

7417 def visit_unicode(self, type_: sqltypes.Unicode, **kw: Any) -> str: 

7418 return self.visit_VARCHAR(type_, **kw) 

7419 

7420 def visit_text(self, type_: sqltypes.Text, **kw: Any) -> str: 

7421 return self.visit_TEXT(type_, **kw) 

7422 

7423 def visit_unicode_text( 

7424 self, type_: sqltypes.UnicodeText, **kw: Any 

7425 ) -> str: 

7426 return self.visit_TEXT(type_, **kw) 

7427 

7428 def visit_enum(self, type_: sqltypes.Enum, **kw: Any) -> str: 

7429 return self.visit_VARCHAR(type_, **kw) 

7430 

7431 def visit_null(self, type_, **kw): 

7432 raise exc.CompileError( 

7433 "Can't generate DDL for %r; " 

7434 "did you forget to specify a " 

7435 "type on this Column?" % type_ 

7436 ) 

7437 

7438 def visit_type_decorator( 

7439 self, type_: TypeDecorator[Any], **kw: Any 

7440 ) -> str: 

7441 return self.process(type_.type_engine(self.dialect), **kw) 

7442 

7443 def visit_user_defined( 

7444 self, type_: UserDefinedType[Any], **kw: Any 

7445 ) -> str: 

7446 return type_.get_col_spec(**kw) 

7447 

7448 

7449class StrSQLTypeCompiler(GenericTypeCompiler): 

7450 def process(self, type_, **kw): 

7451 try: 

7452 _compiler_dispatch = type_._compiler_dispatch 

7453 except AttributeError: 

7454 return self._visit_unknown(type_, **kw) 

7455 else: 

7456 return _compiler_dispatch(self, **kw) 

7457 

7458 def __getattr__(self, key): 

7459 if key.startswith("visit_"): 

7460 return self._visit_unknown 

7461 else: 

7462 raise AttributeError(key) 

7463 

7464 def _visit_unknown(self, type_, **kw): 

7465 if type_.__class__.__name__ == type_.__class__.__name__.upper(): 

7466 return type_.__class__.__name__ 

7467 else: 

7468 return repr(type_) 

7469 

7470 def visit_null(self, type_, **kw): 

7471 return "NULL" 

7472 

7473 def visit_user_defined(self, type_, **kw): 

7474 try: 

7475 get_col_spec = type_.get_col_spec 

7476 except AttributeError: 

7477 return repr(type_) 

7478 else: 

7479 return get_col_spec(**kw) 

7480 

7481 

7482class _SchemaForObjectCallable(Protocol): 

7483 def __call__(self, obj: Any, /) -> str: ... 

7484 

7485 

7486class _BindNameForColProtocol(Protocol): 

7487 def __call__(self, col: ColumnClause[Any]) -> str: ... 

7488 

7489 

7490class IdentifierPreparer: 

7491 """Handle quoting and case-folding of identifiers based on options.""" 

7492 

7493 reserved_words = RESERVED_WORDS 

7494 

7495 legal_characters = LEGAL_CHARACTERS 

7496 

7497 illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS 

7498 

7499 initial_quote: str 

7500 

7501 final_quote: str 

7502 

7503 _strings: MutableMapping[str, str] 

7504 

7505 schema_for_object: _SchemaForObjectCallable = operator.attrgetter("schema") 

7506 """Return the .schema attribute for an object. 

7507 

7508 For the default IdentifierPreparer, the schema for an object is always 

7509 the value of the ".schema" attribute. if the preparer is replaced 

7510 with one that has a non-empty schema_translate_map, the value of the 

7511 ".schema" attribute is rendered a symbol that will be converted to a 

7512 real schema name from the mapping post-compile. 

7513 

7514 """ 

7515 

7516 _includes_none_schema_translate: bool = False 

7517 

7518 def __init__( 

7519 self, 

7520 dialect: Dialect, 

7521 initial_quote: str = '"', 

7522 final_quote: Optional[str] = None, 

7523 escape_quote: str = '"', 

7524 quote_case_sensitive_collations: bool = True, 

7525 omit_schema: bool = False, 

7526 ): 

7527 """Construct a new ``IdentifierPreparer`` object. 

7528 

7529 initial_quote 

7530 Character that begins a delimited identifier. 

7531 

7532 final_quote 

7533 Character that ends a delimited identifier. Defaults to 

7534 `initial_quote`. 

7535 

7536 omit_schema 

7537 Prevent prepending schema name. Useful for databases that do 

7538 not support schemae. 

7539 """ 

7540 

7541 self.dialect = dialect 

7542 self.initial_quote = initial_quote 

7543 self.final_quote = final_quote or self.initial_quote 

7544 self.escape_quote = escape_quote 

7545 self.escape_to_quote = self.escape_quote * 2 

7546 self.omit_schema = omit_schema 

7547 self.quote_case_sensitive_collations = quote_case_sensitive_collations 

7548 self._strings = {} 

7549 self._double_percents = self.dialect.paramstyle in ( 

7550 "format", 

7551 "pyformat", 

7552 ) 

7553 

7554 def _with_schema_translate(self, schema_translate_map): 

7555 prep = self.__class__.__new__(self.__class__) 

7556 prep.__dict__.update(self.__dict__) 

7557 

7558 includes_none = None in schema_translate_map 

7559 

7560 def symbol_getter(obj): 

7561 name = obj.schema 

7562 if obj._use_schema_map and (name is not None or includes_none): 

7563 if name is not None and ("[" in name or "]" in name): 

7564 raise exc.CompileError( 

7565 "Square bracket characters ([]) not supported " 

7566 "in schema translate name '%s'" % name 

7567 ) 

7568 return quoted_name( 

7569 "__[SCHEMA_%s]" % (name or "_none"), quote=False 

7570 ) 

7571 else: 

7572 return obj.schema 

7573 

7574 prep.schema_for_object = symbol_getter 

7575 prep._includes_none_schema_translate = includes_none 

7576 return prep 

7577 

7578 def _render_schema_translates( 

7579 self, statement: str, schema_translate_map: SchemaTranslateMapType 

7580 ) -> str: 

7581 d = schema_translate_map 

7582 if None in d: 

7583 if not self._includes_none_schema_translate: 

7584 raise exc.InvalidRequestError( 

7585 "schema translate map which previously did not have " 

7586 "`None` present as a key now has `None` present; compiled " 

7587 "statement may lack adequate placeholders. Please use " 

7588 "consistent keys in successive " 

7589 "schema_translate_map dictionaries." 

7590 ) 

7591 

7592 d["_none"] = d[None] # type: ignore[index] 

7593 

7594 def replace(m): 

7595 name = m.group(2) 

7596 if name in d: 

7597 effective_schema = d[name] 

7598 else: 

7599 if name in (None, "_none"): 

7600 raise exc.InvalidRequestError( 

7601 "schema translate map which previously had `None` " 

7602 "present as a key now no longer has it present; don't " 

7603 "know how to apply schema for compiled statement. " 

7604 "Please use consistent keys in successive " 

7605 "schema_translate_map dictionaries." 

7606 ) 

7607 effective_schema = name 

7608 

7609 if not effective_schema: 

7610 effective_schema = self.dialect.default_schema_name 

7611 if not effective_schema: 

7612 # TODO: no coverage here 

7613 raise exc.CompileError( 

7614 "Dialect has no default schema name; can't " 

7615 "use None as dynamic schema target." 

7616 ) 

7617 return self.quote_schema(effective_schema) 

7618 

7619 return re.sub(r"(__\[SCHEMA_([^\]]+)\])", replace, statement) 

7620 

7621 def _escape_identifier(self, value: str) -> str: 

7622 """Escape an identifier. 

7623 

7624 Subclasses should override this to provide database-dependent 

7625 escaping behavior. 

7626 """ 

7627 

7628 value = value.replace(self.escape_quote, self.escape_to_quote) 

7629 if self._double_percents: 

7630 value = value.replace("%", "%%") 

7631 return value 

7632 

7633 def _unescape_identifier(self, value: str) -> str: 

7634 """Canonicalize an escaped identifier. 

7635 

7636 Subclasses should override this to provide database-dependent 

7637 unescaping behavior that reverses _escape_identifier. 

7638 """ 

7639 

7640 return value.replace(self.escape_to_quote, self.escape_quote) 

7641 

7642 def validate_sql_phrase(self, element, reg): 

7643 """keyword sequence filter. 

7644 

7645 a filter for elements that are intended to represent keyword sequences, 

7646 such as "INITIALLY", "INITIALLY DEFERRED", etc. no special characters 

7647 should be present. 

7648 

7649 """ 

7650 

7651 if element is not None and not reg.match(element): 

7652 raise exc.CompileError( 

7653 "Unexpected SQL phrase: %r (matching against %r)" 

7654 % (element, reg.pattern) 

7655 ) 

7656 return element 

7657 

7658 def quote_identifier(self, value: str) -> str: 

7659 """Quote an identifier. 

7660 

7661 Subclasses should override this to provide database-dependent 

7662 quoting behavior. 

7663 """ 

7664 

7665 return ( 

7666 self.initial_quote 

7667 + self._escape_identifier(value) 

7668 + self.final_quote 

7669 ) 

7670 

7671 def _requires_quotes(self, value: str) -> bool: 

7672 """Return True if the given identifier requires quoting.""" 

7673 lc_value = value.lower() 

7674 return ( 

7675 lc_value in self.reserved_words 

7676 or value[0] in self.illegal_initial_characters 

7677 or not self.legal_characters.match(str(value)) 

7678 or (lc_value != value) 

7679 ) 

7680 

7681 def _requires_quotes_illegal_chars(self, value): 

7682 """Return True if the given identifier requires quoting, but 

7683 not taking case convention into account.""" 

7684 return not self.legal_characters.match(str(value)) 

7685 

7686 def quote_schema(self, schema: str) -> str: 

7687 """Conditionally quote a schema name. 

7688 

7689 

7690 The name is quoted if it is a reserved word, contains quote-necessary 

7691 characters, or is an instance of :class:`.quoted_name` which includes 

7692 ``quote`` set to ``True``. 

7693 

7694 Subclasses can override this to provide database-dependent 

7695 quoting behavior for schema names. 

7696 

7697 :param schema: string schema name 

7698 """ 

7699 return self.quote(schema) 

7700 

7701 def quote(self, ident: str) -> str: 

7702 """Conditionally quote an identifier. 

7703 

7704 The identifier is quoted if it is a reserved word, contains 

7705 quote-necessary characters, or is an instance of 

7706 :class:`.quoted_name` which includes ``quote`` set to ``True``. 

7707 

7708 Subclasses can override this to provide database-dependent 

7709 quoting behavior for identifier names. 

7710 

7711 :param ident: string identifier 

7712 """ 

7713 force = getattr(ident, "quote", None) 

7714 

7715 if force is None: 

7716 if ident in self._strings: 

7717 return self._strings[ident] 

7718 else: 

7719 if self._requires_quotes(ident): 

7720 self._strings[ident] = self.quote_identifier(ident) 

7721 else: 

7722 self._strings[ident] = ident 

7723 return self._strings[ident] 

7724 elif force: 

7725 return self.quote_identifier(ident) 

7726 else: 

7727 return ident 

7728 

7729 def format_collation(self, collation_name): 

7730 if self.quote_case_sensitive_collations: 

7731 return self.quote(collation_name) 

7732 else: 

7733 return collation_name 

7734 

7735 def format_sequence( 

7736 self, sequence: schema.Sequence, use_schema: bool = True 

7737 ) -> str: 

7738 name = self.quote(sequence.name) 

7739 

7740 effective_schema = self.schema_for_object(sequence) 

7741 

7742 if ( 

7743 not self.omit_schema 

7744 and use_schema 

7745 and effective_schema is not None 

7746 ): 

7747 name = self.quote_schema(effective_schema) + "." + name 

7748 return name 

7749 

7750 def format_label( 

7751 self, label: Label[Any], name: Optional[str] = None 

7752 ) -> str: 

7753 return self.quote(name or label.name) 

7754 

7755 def format_alias( 

7756 self, alias: Optional[AliasedReturnsRows], name: Optional[str] = None 

7757 ) -> str: 

7758 if name is None: 

7759 assert alias is not None 

7760 return self.quote(alias.name) 

7761 else: 

7762 return self.quote(name) 

7763 

7764 def format_savepoint(self, savepoint, name=None): 

7765 # Running the savepoint name through quoting is unnecessary 

7766 # for all known dialects. This is here to support potential 

7767 # third party use cases 

7768 ident = name or savepoint.ident 

7769 if self._requires_quotes(ident): 

7770 ident = self.quote_identifier(ident) 

7771 return ident 

7772 

7773 @util.preload_module("sqlalchemy.sql.naming") 

7774 def format_constraint( 

7775 self, constraint: Union[Constraint, Index], _alembic_quote: bool = True 

7776 ) -> Optional[str]: 

7777 naming = util.preloaded.sql_naming 

7778 

7779 if constraint.name is _NONE_NAME: 

7780 name = naming._constraint_name_for_table( 

7781 constraint, constraint.table 

7782 ) 

7783 

7784 if name is None: 

7785 return None 

7786 else: 

7787 name = constraint.name 

7788 

7789 assert name is not None 

7790 if constraint.__visit_name__ == "index": 

7791 return self.truncate_and_render_index_name( 

7792 name, _alembic_quote=_alembic_quote 

7793 ) 

7794 else: 

7795 return self.truncate_and_render_constraint_name( 

7796 name, _alembic_quote=_alembic_quote 

7797 ) 

7798 

7799 def truncate_and_render_index_name( 

7800 self, name: str, _alembic_quote: bool = True 

7801 ) -> str: 

7802 # calculate these at format time so that ad-hoc changes 

7803 # to dialect.max_identifier_length etc. can be reflected 

7804 # as IdentifierPreparer is long lived 

7805 max_ = ( 

7806 self.dialect.max_index_name_length 

7807 or self.dialect.max_identifier_length 

7808 ) 

7809 return self._truncate_and_render_maxlen_name( 

7810 name, max_, _alembic_quote 

7811 ) 

7812 

7813 def truncate_and_render_constraint_name( 

7814 self, name: str, _alembic_quote: bool = True 

7815 ) -> str: 

7816 # calculate these at format time so that ad-hoc changes 

7817 # to dialect.max_identifier_length etc. can be reflected 

7818 # as IdentifierPreparer is long lived 

7819 max_ = ( 

7820 self.dialect.max_constraint_name_length 

7821 or self.dialect.max_identifier_length 

7822 ) 

7823 return self._truncate_and_render_maxlen_name( 

7824 name, max_, _alembic_quote 

7825 ) 

7826 

7827 def _truncate_and_render_maxlen_name( 

7828 self, name: str, max_: int, _alembic_quote: bool 

7829 ) -> str: 

7830 if isinstance(name, elements._truncated_label): 

7831 if len(name) > max_: 

7832 name = name[0 : max_ - 8] + "_" + util.md5_hex(name)[-4:] 

7833 else: 

7834 self.dialect.validate_identifier(name) 

7835 

7836 if not _alembic_quote: 

7837 return name 

7838 else: 

7839 return self.quote(name) 

7840 

7841 def format_index(self, index: Index) -> str: 

7842 name = self.format_constraint(index) 

7843 assert name is not None 

7844 return name 

7845 

7846 def format_table( 

7847 self, 

7848 table: FromClause, 

7849 use_schema: bool = True, 

7850 name: Optional[str] = None, 

7851 ) -> str: 

7852 """Prepare a quoted table and schema name.""" 

7853 if name is None: 

7854 if TYPE_CHECKING: 

7855 assert isinstance(table, NamedFromClause) 

7856 name = table.name 

7857 

7858 result = self.quote(name) 

7859 

7860 effective_schema = self.schema_for_object(table) 

7861 

7862 if not self.omit_schema and use_schema and effective_schema: 

7863 result = self.quote_schema(effective_schema) + "." + result 

7864 return result 

7865 

7866 def format_schema(self, name): 

7867 """Prepare a quoted schema name.""" 

7868 

7869 return self.quote(name) 

7870 

7871 def format_label_name( 

7872 self, 

7873 name, 

7874 anon_map=None, 

7875 ): 

7876 """Prepare a quoted column name.""" 

7877 

7878 if anon_map is not None and isinstance( 

7879 name, elements._truncated_label 

7880 ): 

7881 name = name.apply_map(anon_map) 

7882 

7883 return self.quote(name) 

7884 

7885 def format_column( 

7886 self, 

7887 column: ColumnElement[Any], 

7888 use_table: bool = False, 

7889 name: Optional[str] = None, 

7890 table_name: Optional[str] = None, 

7891 use_schema: bool = False, 

7892 anon_map: Optional[Mapping[str, Any]] = None, 

7893 ) -> str: 

7894 """Prepare a quoted column name.""" 

7895 

7896 if name is None: 

7897 name = column.name 

7898 assert name is not None 

7899 

7900 if anon_map is not None and isinstance( 

7901 name, elements._truncated_label 

7902 ): 

7903 name = name.apply_map(anon_map) 

7904 

7905 if not getattr(column, "is_literal", False): 

7906 if use_table: 

7907 return ( 

7908 self.format_table( 

7909 column.table, use_schema=use_schema, name=table_name 

7910 ) 

7911 + "." 

7912 + self.quote(name) 

7913 ) 

7914 else: 

7915 return self.quote(name) 

7916 else: 

7917 # literal textual elements get stuck into ColumnClause a lot, 

7918 # which shouldn't get quoted 

7919 

7920 if use_table: 

7921 return ( 

7922 self.format_table( 

7923 column.table, use_schema=use_schema, name=table_name 

7924 ) 

7925 + "." 

7926 + name 

7927 ) 

7928 else: 

7929 return name 

7930 

7931 def format_table_seq(self, table, use_schema=True): 

7932 """Format table name and schema as a tuple.""" 

7933 

7934 # Dialects with more levels in their fully qualified references 

7935 # ('database', 'owner', etc.) could override this and return 

7936 # a longer sequence. 

7937 

7938 effective_schema = self.schema_for_object(table) 

7939 

7940 if not self.omit_schema and use_schema and effective_schema: 

7941 return ( 

7942 self.quote_schema(effective_schema), 

7943 self.format_table(table, use_schema=False), 

7944 ) 

7945 else: 

7946 return (self.format_table(table, use_schema=False),) 

7947 

7948 @util.memoized_property 

7949 def _r_identifiers(self): 

7950 initial, final, escaped_final = ( 

7951 re.escape(s) 

7952 for s in ( 

7953 self.initial_quote, 

7954 self.final_quote, 

7955 self._escape_identifier(self.final_quote), 

7956 ) 

7957 ) 

7958 r = re.compile( 

7959 r"(?:" 

7960 r"(?:%(initial)s((?:%(escaped)s|[^%(final)s])+)%(final)s" 

7961 r"|([^\.]+))(?=\.|$))+" 

7962 % {"initial": initial, "final": final, "escaped": escaped_final} 

7963 ) 

7964 return r 

7965 

7966 def unformat_identifiers(self, identifiers: str) -> Sequence[str]: 

7967 """Unpack 'schema.table.column'-like strings into components.""" 

7968 

7969 r = self._r_identifiers 

7970 return [ 

7971 self._unescape_identifier(i) 

7972 for i in [a or b for a, b in r.findall(identifiers)] 

7973 ]