Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/compiler.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

3057 statements  

1# sql/compiler.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Base SQL and DDL compiler implementations. 

10 

11Classes provided include: 

12 

13:class:`.compiler.SQLCompiler` - renders SQL 

14strings 

15 

16:class:`.compiler.DDLCompiler` - renders DDL 

17(data definition language) strings 

18 

19:class:`.compiler.GenericTypeCompiler` - renders 

20type specification strings. 

21 

22To generate user-defined SQL strings, see 

23:doc:`/ext/compiler`. 

24 

25""" 

26from __future__ import annotations 

27 

28import collections 

29import collections.abc as collections_abc 

30import contextlib 

31from enum import IntEnum 

32import functools 

33import itertools 

34import operator 

35import re 

36from time import perf_counter 

37import typing 

38from typing import Any 

39from typing import Callable 

40from typing import cast 

41from typing import ClassVar 

42from typing import Dict 

43from typing import FrozenSet 

44from typing import Iterable 

45from typing import Iterator 

46from typing import List 

47from typing import Mapping 

48from typing import MutableMapping 

49from typing import NamedTuple 

50from typing import NoReturn 

51from typing import Optional 

52from typing import Pattern 

53from typing import Sequence 

54from typing import Set 

55from typing import Tuple 

56from typing import Type 

57from typing import TYPE_CHECKING 

58from typing import Union 

59 

60from . import base 

61from . import coercions 

62from . import crud 

63from . import elements 

64from . import functions 

65from . import operators 

66from . import roles 

67from . import schema 

68from . import selectable 

69from . import sqltypes 

70from . import util as sql_util 

71from ._typing import is_column_element 

72from ._typing import is_dml 

73from .base import _de_clone 

74from .base import _from_objects 

75from .base import _NONE_NAME 

76from .base import _SentinelDefaultCharacterization 

77from .base import NO_ARG 

78from .elements import quoted_name 

79from .sqltypes import TupleType 

80from .visitors import prefix_anon_map 

81from .. import exc 

82from .. import util 

83from ..util import FastIntFlag 

84from ..util.typing import Literal 

85from ..util.typing import Protocol 

86from ..util.typing import Self 

87from ..util.typing import TypedDict 

88 

89if typing.TYPE_CHECKING: 

90 from .annotation import _AnnotationDict 

91 from .base import _AmbiguousTableNameMap 

92 from .base import CompileState 

93 from .base import Executable 

94 from .cache_key import CacheKey 

95 from .ddl import ExecutableDDLElement 

96 from .dml import Insert 

97 from .dml import Update 

98 from .dml import UpdateBase 

99 from .dml import UpdateDMLState 

100 from .dml import ValuesBase 

101 from .elements import _truncated_label 

102 from .elements import BinaryExpression 

103 from .elements import BindParameter 

104 from .elements import ClauseElement 

105 from .elements import ColumnClause 

106 from .elements import ColumnElement 

107 from .elements import False_ 

108 from .elements import Label 

109 from .elements import Null 

110 from .elements import True_ 

111 from .functions import Function 

112 from .schema import CheckConstraint 

113 from .schema import Column 

114 from .schema import Constraint 

115 from .schema import ForeignKeyConstraint 

116 from .schema import IdentityOptions 

117 from .schema import Index 

118 from .schema import PrimaryKeyConstraint 

119 from .schema import Table 

120 from .schema import UniqueConstraint 

121 from .selectable import _ColumnsClauseElement 

122 from .selectable import AliasedReturnsRows 

123 from .selectable import CompoundSelectState 

124 from .selectable import CTE 

125 from .selectable import FromClause 

126 from .selectable import NamedFromClause 

127 from .selectable import ReturnsRows 

128 from .selectable import Select 

129 from .selectable import SelectState 

130 from .type_api import _BindProcessorType 

131 from .type_api import TypeDecorator 

132 from .type_api import TypeEngine 

133 from .type_api import UserDefinedType 

134 from .visitors import Visitable 

135 from ..engine.cursor import CursorResultMetaData 

136 from ..engine.interfaces import _CoreSingleExecuteParams 

137 from ..engine.interfaces import _DBAPIAnyExecuteParams 

138 from ..engine.interfaces import _DBAPIMultiExecuteParams 

139 from ..engine.interfaces import _DBAPISingleExecuteParams 

140 from ..engine.interfaces import _ExecuteOptions 

141 from ..engine.interfaces import _GenericSetInputSizesType 

142 from ..engine.interfaces import _MutableCoreSingleExecuteParams 

143 from ..engine.interfaces import Dialect 

144 from ..engine.interfaces import SchemaTranslateMapType 

145 

146 

147_FromHintsType = Dict["FromClause", str] 

148 

149RESERVED_WORDS = { 

150 "all", 

151 "analyse", 

152 "analyze", 

153 "and", 

154 "any", 

155 "array", 

156 "as", 

157 "asc", 

158 "asymmetric", 

159 "authorization", 

160 "between", 

161 "binary", 

162 "both", 

163 "case", 

164 "cast", 

165 "check", 

166 "collate", 

167 "column", 

168 "constraint", 

169 "create", 

170 "cross", 

171 "current_date", 

172 "current_role", 

173 "current_time", 

174 "current_timestamp", 

175 "current_user", 

176 "default", 

177 "deferrable", 

178 "desc", 

179 "distinct", 

180 "do", 

181 "else", 

182 "end", 

183 "except", 

184 "false", 

185 "for", 

186 "foreign", 

187 "freeze", 

188 "from", 

189 "full", 

190 "grant", 

191 "group", 

192 "having", 

193 "ilike", 

194 "in", 

195 "initially", 

196 "inner", 

197 "intersect", 

198 "into", 

199 "is", 

200 "isnull", 

201 "join", 

202 "leading", 

203 "left", 

204 "like", 

205 "limit", 

206 "localtime", 

207 "localtimestamp", 

208 "natural", 

209 "new", 

210 "not", 

211 "notnull", 

212 "null", 

213 "off", 

214 "offset", 

215 "old", 

216 "on", 

217 "only", 

218 "or", 

219 "order", 

220 "outer", 

221 "overlaps", 

222 "placing", 

223 "primary", 

224 "references", 

225 "right", 

226 "select", 

227 "session_user", 

228 "set", 

229 "similar", 

230 "some", 

231 "symmetric", 

232 "table", 

233 "then", 

234 "to", 

235 "trailing", 

236 "true", 

237 "union", 

238 "unique", 

239 "user", 

240 "using", 

241 "verbose", 

242 "when", 

243 "where", 

244} 

245 

246LEGAL_CHARACTERS = re.compile(r"^[A-Z0-9_$]+$", re.I) 

247LEGAL_CHARACTERS_PLUS_SPACE = re.compile(r"^[A-Z0-9_ $]+$", re.I) 

248ILLEGAL_INITIAL_CHARACTERS = {str(x) for x in range(0, 10)}.union(["$"]) 

249 

250FK_ON_DELETE = re.compile( 

251 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I 

252) 

253FK_ON_UPDATE = re.compile( 

254 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I 

255) 

256FK_INITIALLY = re.compile(r"^(?:DEFERRED|IMMEDIATE)$", re.I) 

257BIND_PARAMS = re.compile(r"(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])", re.UNICODE) 

258BIND_PARAMS_ESC = re.compile(r"\x5c(:[\w\$]*)(?![:\w\$])", re.UNICODE) 

259 

260_pyformat_template = "%%(%(name)s)s" 

261BIND_TEMPLATES = { 

262 "pyformat": _pyformat_template, 

263 "qmark": "?", 

264 "format": "%%s", 

265 "numeric": ":[_POSITION]", 

266 "numeric_dollar": "$[_POSITION]", 

267 "named": ":%(name)s", 

268} 

269 

270 

271OPERATORS = { 

272 # binary 

273 operators.and_: " AND ", 

274 operators.or_: " OR ", 

275 operators.add: " + ", 

276 operators.mul: " * ", 

277 operators.sub: " - ", 

278 operators.mod: " % ", 

279 operators.neg: "-", 

280 operators.lt: " < ", 

281 operators.le: " <= ", 

282 operators.ne: " != ", 

283 operators.gt: " > ", 

284 operators.ge: " >= ", 

285 operators.eq: " = ", 

286 operators.is_distinct_from: " IS DISTINCT FROM ", 

287 operators.is_not_distinct_from: " IS NOT DISTINCT FROM ", 

288 operators.concat_op: " || ", 

289 operators.match_op: " MATCH ", 

290 operators.not_match_op: " NOT MATCH ", 

291 operators.in_op: " IN ", 

292 operators.not_in_op: " NOT IN ", 

293 operators.comma_op: ", ", 

294 operators.from_: " FROM ", 

295 operators.as_: " AS ", 

296 operators.is_: " IS ", 

297 operators.is_not: " IS NOT ", 

298 operators.collate: " COLLATE ", 

299 # unary 

300 operators.exists: "EXISTS ", 

301 operators.distinct_op: "DISTINCT ", 

302 operators.inv: "NOT ", 

303 operators.any_op: "ANY ", 

304 operators.all_op: "ALL ", 

305 # modifiers 

306 operators.desc_op: " DESC", 

307 operators.asc_op: " ASC", 

308 operators.nulls_first_op: " NULLS FIRST", 

309 operators.nulls_last_op: " NULLS LAST", 

310 # bitwise 

311 operators.bitwise_xor_op: " ^ ", 

312 operators.bitwise_or_op: " | ", 

313 operators.bitwise_and_op: " & ", 

314 operators.bitwise_not_op: "~", 

315 operators.bitwise_lshift_op: " << ", 

316 operators.bitwise_rshift_op: " >> ", 

317} 

318 

319FUNCTIONS: Dict[Type[Function[Any]], str] = { 

320 functions.coalesce: "coalesce", 

321 functions.current_date: "CURRENT_DATE", 

322 functions.current_time: "CURRENT_TIME", 

323 functions.current_timestamp: "CURRENT_TIMESTAMP", 

324 functions.current_user: "CURRENT_USER", 

325 functions.localtime: "LOCALTIME", 

326 functions.localtimestamp: "LOCALTIMESTAMP", 

327 functions.random: "random", 

328 functions.sysdate: "sysdate", 

329 functions.session_user: "SESSION_USER", 

330 functions.user: "USER", 

331 functions.cube: "CUBE", 

332 functions.rollup: "ROLLUP", 

333 functions.grouping_sets: "GROUPING SETS", 

334} 

335 

336 

337EXTRACT_MAP = { 

338 "month": "month", 

339 "day": "day", 

340 "year": "year", 

341 "second": "second", 

342 "hour": "hour", 

343 "doy": "doy", 

344 "minute": "minute", 

345 "quarter": "quarter", 

346 "dow": "dow", 

347 "week": "week", 

348 "epoch": "epoch", 

349 "milliseconds": "milliseconds", 

350 "microseconds": "microseconds", 

351 "timezone_hour": "timezone_hour", 

352 "timezone_minute": "timezone_minute", 

353} 

354 

355COMPOUND_KEYWORDS = { 

356 selectable._CompoundSelectKeyword.UNION: "UNION", 

357 selectable._CompoundSelectKeyword.UNION_ALL: "UNION ALL", 

358 selectable._CompoundSelectKeyword.EXCEPT: "EXCEPT", 

359 selectable._CompoundSelectKeyword.EXCEPT_ALL: "EXCEPT ALL", 

360 selectable._CompoundSelectKeyword.INTERSECT: "INTERSECT", 

361 selectable._CompoundSelectKeyword.INTERSECT_ALL: "INTERSECT ALL", 

362} 

363 

364 

365class ResultColumnsEntry(NamedTuple): 

366 """Tracks a column expression that is expected to be represented 

367 in the result rows for this statement. 

368 

369 This normally refers to the columns clause of a SELECT statement 

370 but may also refer to a RETURNING clause, as well as for dialect-specific 

371 emulations. 

372 

373 """ 

374 

375 keyname: str 

376 """string name that's expected in cursor.description""" 

377 

378 name: str 

379 """column name, may be labeled""" 

380 

381 objects: Tuple[Any, ...] 

382 """sequence of objects that should be able to locate this column 

383 in a RowMapping. This is typically string names and aliases 

384 as well as Column objects. 

385 

386 """ 

387 

388 type: TypeEngine[Any] 

389 """Datatype to be associated with this column. This is where 

390 the "result processing" logic directly links the compiled statement 

391 to the rows that come back from the cursor. 

392 

393 """ 

394 

395 

396class _ResultMapAppender(Protocol): 

397 def __call__( 

398 self, 

399 keyname: str, 

400 name: str, 

401 objects: Sequence[Any], 

402 type_: TypeEngine[Any], 

403 ) -> None: ... 

404 

405 

406# integer indexes into ResultColumnsEntry used by cursor.py. 

407# some profiling showed integer access faster than named tuple 

408RM_RENDERED_NAME: Literal[0] = 0 

409RM_NAME: Literal[1] = 1 

410RM_OBJECTS: Literal[2] = 2 

411RM_TYPE: Literal[3] = 3 

412 

413 

414class _BaseCompilerStackEntry(TypedDict): 

415 asfrom_froms: Set[FromClause] 

416 correlate_froms: Set[FromClause] 

417 selectable: ReturnsRows 

418 

419 

420class _CompilerStackEntry(_BaseCompilerStackEntry, total=False): 

421 compile_state: CompileState 

422 need_result_map_for_nested: bool 

423 need_result_map_for_compound: bool 

424 select_0: ReturnsRows 

425 insert_from_select: Select[Any] 

426 

427 

428class ExpandedState(NamedTuple): 

429 """represents state to use when producing "expanded" and 

430 "post compile" bound parameters for a statement. 

431 

432 "expanded" parameters are parameters that are generated at 

433 statement execution time to suit a number of parameters passed, the most 

434 prominent example being the individual elements inside of an IN expression. 

435 

436 "post compile" parameters are parameters where the SQL literal value 

437 will be rendered into the SQL statement at execution time, rather than 

438 being passed as separate parameters to the driver. 

439 

440 To create an :class:`.ExpandedState` instance, use the 

441 :meth:`.SQLCompiler.construct_expanded_state` method on any 

442 :class:`.SQLCompiler` instance. 

443 

444 """ 

445 

446 statement: str 

447 """String SQL statement with parameters fully expanded""" 

448 

449 parameters: _CoreSingleExecuteParams 

450 """Parameter dictionary with parameters fully expanded. 

451 

452 For a statement that uses named parameters, this dictionary will map 

453 exactly to the names in the statement. For a statement that uses 

454 positional parameters, the :attr:`.ExpandedState.positional_parameters` 

455 will yield a tuple with the positional parameter set. 

456 

457 """ 

458 

459 processors: Mapping[str, _BindProcessorType[Any]] 

460 """mapping of bound value processors""" 

461 

462 positiontup: Optional[Sequence[str]] 

463 """Sequence of string names indicating the order of positional 

464 parameters""" 

465 

466 parameter_expansion: Mapping[str, List[str]] 

467 """Mapping representing the intermediary link from original parameter 

468 name to list of "expanded" parameter names, for those parameters that 

469 were expanded.""" 

470 

471 @property 

472 def positional_parameters(self) -> Tuple[Any, ...]: 

473 """Tuple of positional parameters, for statements that were compiled 

474 using a positional paramstyle. 

475 

476 """ 

477 if self.positiontup is None: 

478 raise exc.InvalidRequestError( 

479 "statement does not use a positional paramstyle" 

480 ) 

481 return tuple(self.parameters[key] for key in self.positiontup) 

482 

483 @property 

484 def additional_parameters(self) -> _CoreSingleExecuteParams: 

485 """synonym for :attr:`.ExpandedState.parameters`.""" 

486 return self.parameters 

487 

488 

489class _InsertManyValues(NamedTuple): 

490 """represents state to use for executing an "insertmanyvalues" statement. 

491 

492 The primary consumers of this object are the 

493 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and 

494 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods. 

495 

496 .. versionadded:: 2.0 

497 

498 """ 

499 

500 is_default_expr: bool 

501 """if True, the statement is of the form 

502 ``INSERT INTO TABLE DEFAULT VALUES``, and can't be rewritten as a "batch" 

503 

504 """ 

505 

506 single_values_expr: str 

507 """The rendered "values" clause of the INSERT statement. 

508 

509 This is typically the parenthesized section e.g. "(?, ?, ?)" or similar. 

510 The insertmanyvalues logic uses this string as a search and replace 

511 target. 

512 

513 """ 

514 

515 insert_crud_params: List[crud._CrudParamElementStr] 

516 """List of Column / bind names etc. used while rewriting the statement""" 

517 

518 num_positional_params_counted: int 

519 """the number of bound parameters in a single-row statement. 

520 

521 This count may be larger or smaller than the actual number of columns 

522 targeted in the INSERT, as it accommodates for SQL expressions 

523 in the values list that may have zero or more parameters embedded 

524 within them. 

525 

526 This count is part of what's used to organize rewritten parameter lists 

527 when batching. 

528 

529 """ 

530 

531 sort_by_parameter_order: bool = False 

532 """if the deterministic_returnined_order parameter were used on the 

533 insert. 

534 

535 All of the attributes following this will only be used if this is True. 

536 

537 """ 

538 

539 includes_upsert_behaviors: bool = False 

540 """if True, we have to accommodate for upsert behaviors. 

541 

542 This will in some cases downgrade "insertmanyvalues" that requests 

543 deterministic ordering. 

544 

545 """ 

546 

547 sentinel_columns: Optional[Sequence[Column[Any]]] = None 

548 """List of sentinel columns that were located. 

549 

550 This list is only here if the INSERT asked for 

551 sort_by_parameter_order=True, 

552 and dialect-appropriate sentinel columns were located. 

553 

554 .. versionadded:: 2.0.10 

555 

556 """ 

557 

558 num_sentinel_columns: int = 0 

559 """how many sentinel columns are in the above list, if any. 

560 

561 This is the same as 

562 ``len(sentinel_columns) if sentinel_columns is not None else 0`` 

563 

564 """ 

565 

566 sentinel_param_keys: Optional[Sequence[str]] = None 

567 """parameter str keys in each param dictionary / tuple 

568 that would link to the client side "sentinel" values for that row, which 

569 we can use to match up parameter sets to result rows. 

570 

571 This is only present if sentinel_columns is present and the INSERT 

572 statement actually refers to client side values for these sentinel 

573 columns. 

574 

575 .. versionadded:: 2.0.10 

576 

577 .. versionchanged:: 2.0.29 - the sequence is now string dictionary keys 

578 only, used against the "compiled parameteters" collection before 

579 the parameters were converted by bound parameter processors 

580 

581 """ 

582 

583 implicit_sentinel: bool = False 

584 """if True, we have exactly one sentinel column and it uses a server side 

585 value, currently has to generate an incrementing integer value. 

586 

587 The dialect in question would have asserted that it supports receiving 

588 these values back and sorting on that value as a means of guaranteeing 

589 correlation with the incoming parameter list. 

590 

591 .. versionadded:: 2.0.10 

592 

593 """ 

594 

595 embed_values_counter: bool = False 

596 """Whether to embed an incrementing integer counter in each parameter 

597 set within the VALUES clause as parameters are batched over. 

598 

599 This is only used for a specific INSERT..SELECT..VALUES..RETURNING syntax 

600 where a subquery is used to produce value tuples. Current support 

601 includes PostgreSQL, Microsoft SQL Server. 

602 

603 .. versionadded:: 2.0.10 

604 

605 """ 

606 

607 

608class _InsertManyValuesBatch(NamedTuple): 

609 """represents an individual batch SQL statement for insertmanyvalues. 

610 

611 This is passed through the 

612 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and 

613 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods out 

614 to the :class:`.Connection` within the 

615 :meth:`.Connection._exec_insertmany_context` method. 

616 

617 .. versionadded:: 2.0.10 

618 

619 """ 

620 

621 replaced_statement: str 

622 replaced_parameters: _DBAPIAnyExecuteParams 

623 processed_setinputsizes: Optional[_GenericSetInputSizesType] 

624 batch: Sequence[_DBAPISingleExecuteParams] 

625 sentinel_values: Sequence[Tuple[Any, ...]] 

626 current_batch_size: int 

627 batchnum: int 

628 total_batches: int 

629 rows_sorted: bool 

630 is_downgraded: bool 

631 

632 

633class InsertmanyvaluesSentinelOpts(FastIntFlag): 

634 """bitflag enum indicating styles of PK defaults 

635 which can work as implicit sentinel columns 

636 

637 """ 

638 

639 NOT_SUPPORTED = 1 

640 AUTOINCREMENT = 2 

641 IDENTITY = 4 

642 SEQUENCE = 8 

643 

644 ANY_AUTOINCREMENT = AUTOINCREMENT | IDENTITY | SEQUENCE 

645 _SUPPORTED_OR_NOT = NOT_SUPPORTED | ANY_AUTOINCREMENT 

646 

647 USE_INSERT_FROM_SELECT = 16 

648 RENDER_SELECT_COL_CASTS = 64 

649 

650 

651class CompilerState(IntEnum): 

652 COMPILING = 0 

653 """statement is present, compilation phase in progress""" 

654 

655 STRING_APPLIED = 1 

656 """statement is present, string form of the statement has been applied. 

657 

658 Additional processors by subclasses may still be pending. 

659 

660 """ 

661 

662 NO_STATEMENT = 2 

663 """compiler does not have a statement to compile, is used 

664 for method access""" 

665 

666 

667class Linting(IntEnum): 

668 """represent preferences for the 'SQL linting' feature. 

669 

670 this feature currently includes support for flagging cartesian products 

671 in SQL statements. 

672 

673 """ 

674 

675 NO_LINTING = 0 

676 "Disable all linting." 

677 

678 COLLECT_CARTESIAN_PRODUCTS = 1 

679 """Collect data on FROMs and cartesian products and gather into 

680 'self.from_linter'""" 

681 

682 WARN_LINTING = 2 

683 "Emit warnings for linters that find problems" 

684 

685 FROM_LINTING = COLLECT_CARTESIAN_PRODUCTS | WARN_LINTING 

686 """Warn for cartesian products; combines COLLECT_CARTESIAN_PRODUCTS 

687 and WARN_LINTING""" 

688 

689 

690NO_LINTING, COLLECT_CARTESIAN_PRODUCTS, WARN_LINTING, FROM_LINTING = tuple( 

691 Linting 

692) 

693 

694 

695class FromLinter(collections.namedtuple("FromLinter", ["froms", "edges"])): 

696 """represents current state for the "cartesian product" detection 

697 feature.""" 

698 

699 def lint(self, start=None): 

700 froms = self.froms 

701 if not froms: 

702 return None, None 

703 

704 edges = set(self.edges) 

705 the_rest = set(froms) 

706 

707 if start is not None: 

708 start_with = start 

709 the_rest.remove(start_with) 

710 else: 

711 start_with = the_rest.pop() 

712 

713 stack = collections.deque([start_with]) 

714 

715 while stack and the_rest: 

716 node = stack.popleft() 

717 the_rest.discard(node) 

718 

719 # comparison of nodes in edges here is based on hash equality, as 

720 # there are "annotated" elements that match the non-annotated ones. 

721 # to remove the need for in-python hash() calls, use native 

722 # containment routines (e.g. "node in edge", "edge.index(node)") 

723 to_remove = {edge for edge in edges if node in edge} 

724 

725 # appendleft the node in each edge that is not 

726 # the one that matched. 

727 stack.extendleft(edge[not edge.index(node)] for edge in to_remove) 

728 edges.difference_update(to_remove) 

729 

730 # FROMS left over? boom 

731 if the_rest: 

732 return the_rest, start_with 

733 else: 

734 return None, None 

735 

736 def warn(self, stmt_type="SELECT"): 

737 the_rest, start_with = self.lint() 

738 

739 # FROMS left over? boom 

740 if the_rest: 

741 froms = the_rest 

742 if froms: 

743 template = ( 

744 "{stmt_type} statement has a cartesian product between " 

745 "FROM element(s) {froms} and " 

746 'FROM element "{start}". Apply join condition(s) ' 

747 "between each element to resolve." 

748 ) 

749 froms_str = ", ".join( 

750 f'"{self.froms[from_]}"' for from_ in froms 

751 ) 

752 message = template.format( 

753 stmt_type=stmt_type, 

754 froms=froms_str, 

755 start=self.froms[start_with], 

756 ) 

757 

758 util.warn(message) 

759 

760 

761class Compiled: 

762 """Represent a compiled SQL or DDL expression. 

763 

764 The ``__str__`` method of the ``Compiled`` object should produce 

765 the actual text of the statement. ``Compiled`` objects are 

766 specific to their underlying database dialect, and also may 

767 or may not be specific to the columns referenced within a 

768 particular set of bind parameters. In no case should the 

769 ``Compiled`` object be dependent on the actual values of those 

770 bind parameters, even though it may reference those values as 

771 defaults. 

772 """ 

773 

774 statement: Optional[ClauseElement] = None 

775 "The statement to compile." 

776 string: str = "" 

777 "The string representation of the ``statement``" 

778 

779 state: CompilerState 

780 """description of the compiler's state""" 

781 

782 is_sql = False 

783 is_ddl = False 

784 

785 _cached_metadata: Optional[CursorResultMetaData] = None 

786 

787 _result_columns: Optional[List[ResultColumnsEntry]] = None 

788 

789 schema_translate_map: Optional[SchemaTranslateMapType] = None 

790 

791 execution_options: _ExecuteOptions = util.EMPTY_DICT 

792 """ 

793 Execution options propagated from the statement. In some cases, 

794 sub-elements of the statement can modify these. 

795 """ 

796 

797 preparer: IdentifierPreparer 

798 

799 _annotations: _AnnotationDict = util.EMPTY_DICT 

800 

801 compile_state: Optional[CompileState] = None 

802 """Optional :class:`.CompileState` object that maintains additional 

803 state used by the compiler. 

804 

805 Major executable objects such as :class:`_expression.Insert`, 

806 :class:`_expression.Update`, :class:`_expression.Delete`, 

807 :class:`_expression.Select` will generate this 

808 state when compiled in order to calculate additional information about the 

809 object. For the top level object that is to be executed, the state can be 

810 stored here where it can also have applicability towards result set 

811 processing. 

812 

813 .. versionadded:: 1.4 

814 

815 """ 

816 

817 dml_compile_state: Optional[CompileState] = None 

818 """Optional :class:`.CompileState` assigned at the same point that 

819 .isinsert, .isupdate, or .isdelete is assigned. 

820 

821 This will normally be the same object as .compile_state, with the 

822 exception of cases like the :class:`.ORMFromStatementCompileState` 

823 object. 

824 

825 .. versionadded:: 1.4.40 

826 

827 """ 

828 

829 cache_key: Optional[CacheKey] = None 

830 """The :class:`.CacheKey` that was generated ahead of creating this 

831 :class:`.Compiled` object. 

832 

833 This is used for routines that need access to the original 

834 :class:`.CacheKey` instance generated when the :class:`.Compiled` 

835 instance was first cached, typically in order to reconcile 

836 the original list of :class:`.BindParameter` objects with a 

837 per-statement list that's generated on each call. 

838 

839 """ 

840 

841 _gen_time: float 

842 """Generation time of this :class:`.Compiled`, used for reporting 

843 cache stats.""" 

844 

845 def __init__( 

846 self, 

847 dialect: Dialect, 

848 statement: Optional[ClauseElement], 

849 schema_translate_map: Optional[SchemaTranslateMapType] = None, 

850 render_schema_translate: bool = False, 

851 compile_kwargs: Mapping[str, Any] = util.immutabledict(), 

852 ): 

853 """Construct a new :class:`.Compiled` object. 

854 

855 :param dialect: :class:`.Dialect` to compile against. 

856 

857 :param statement: :class:`_expression.ClauseElement` to be compiled. 

858 

859 :param schema_translate_map: dictionary of schema names to be 

860 translated when forming the resultant SQL 

861 

862 .. seealso:: 

863 

864 :ref:`schema_translating` 

865 

866 :param compile_kwargs: additional kwargs that will be 

867 passed to the initial call to :meth:`.Compiled.process`. 

868 

869 

870 """ 

871 self.dialect = dialect 

872 self.preparer = self.dialect.identifier_preparer 

873 if schema_translate_map: 

874 self.schema_translate_map = schema_translate_map 

875 self.preparer = self.preparer._with_schema_translate( 

876 schema_translate_map 

877 ) 

878 

879 if statement is not None: 

880 self.state = CompilerState.COMPILING 

881 self.statement = statement 

882 self.can_execute = statement.supports_execution 

883 self._annotations = statement._annotations 

884 if self.can_execute: 

885 if TYPE_CHECKING: 

886 assert isinstance(statement, Executable) 

887 self.execution_options = statement._execution_options 

888 self.string = self.process(self.statement, **compile_kwargs) 

889 

890 if render_schema_translate: 

891 assert schema_translate_map is not None 

892 self.string = self.preparer._render_schema_translates( 

893 self.string, schema_translate_map 

894 ) 

895 

896 self.state = CompilerState.STRING_APPLIED 

897 else: 

898 self.state = CompilerState.NO_STATEMENT 

899 

900 self._gen_time = perf_counter() 

901 

902 def __init_subclass__(cls) -> None: 

903 cls._init_compiler_cls() 

904 return super().__init_subclass__() 

905 

906 @classmethod 

907 def _init_compiler_cls(cls): 

908 pass 

909 

910 def _execute_on_connection( 

911 self, connection, distilled_params, execution_options 

912 ): 

913 if self.can_execute: 

914 return connection._execute_compiled( 

915 self, distilled_params, execution_options 

916 ) 

917 else: 

918 raise exc.ObjectNotExecutableError(self.statement) 

919 

920 def visit_unsupported_compilation(self, element, err, **kw): 

921 raise exc.UnsupportedCompilationError(self, type(element)) from err 

922 

923 @property 

924 def sql_compiler(self) -> SQLCompiler: 

925 """Return a Compiled that is capable of processing SQL expressions. 

926 

927 If this compiler is one, it would likely just return 'self'. 

928 

929 """ 

930 

931 raise NotImplementedError() 

932 

933 def process(self, obj: Visitable, **kwargs: Any) -> str: 

934 return obj._compiler_dispatch(self, **kwargs) 

935 

936 def __str__(self) -> str: 

937 """Return the string text of the generated SQL or DDL.""" 

938 

939 if self.state is CompilerState.STRING_APPLIED: 

940 return self.string 

941 else: 

942 return "" 

943 

944 def construct_params( 

945 self, 

946 params: Optional[_CoreSingleExecuteParams] = None, 

947 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None, 

948 escape_names: bool = True, 

949 ) -> Optional[_MutableCoreSingleExecuteParams]: 

950 """Return the bind params for this compiled object. 

951 

952 :param params: a dict of string/object pairs whose values will 

953 override bind values compiled in to the 

954 statement. 

955 """ 

956 

957 raise NotImplementedError() 

958 

959 @property 

960 def params(self): 

961 """Return the bind params for this compiled object.""" 

962 return self.construct_params() 

963 

964 

965class TypeCompiler(util.EnsureKWArg): 

966 """Produces DDL specification for TypeEngine objects.""" 

967 

968 ensure_kwarg = r"visit_\w+" 

969 

970 def __init__(self, dialect: Dialect): 

971 self.dialect = dialect 

972 

973 def process(self, type_: TypeEngine[Any], **kw: Any) -> str: 

974 if ( 

975 type_._variant_mapping 

976 and self.dialect.name in type_._variant_mapping 

977 ): 

978 type_ = type_._variant_mapping[self.dialect.name] 

979 return type_._compiler_dispatch(self, **kw) 

980 

981 def visit_unsupported_compilation( 

982 self, element: Any, err: Exception, **kw: Any 

983 ) -> NoReturn: 

984 raise exc.UnsupportedCompilationError(self, element) from err 

985 

986 

987# this was a Visitable, but to allow accurate detection of 

988# column elements this is actually a column element 

989class _CompileLabel( 

990 roles.BinaryElementRole[Any], elements.CompilerColumnElement 

991): 

992 """lightweight label object which acts as an expression.Label.""" 

993 

994 __visit_name__ = "label" 

995 __slots__ = "element", "name", "_alt_names" 

996 

997 def __init__(self, col, name, alt_names=()): 

998 self.element = col 

999 self.name = name 

1000 self._alt_names = (col,) + alt_names 

1001 

1002 @property 

1003 def proxy_set(self): 

1004 return self.element.proxy_set 

1005 

1006 @property 

1007 def type(self): 

1008 return self.element.type 

1009 

1010 def self_group(self, **kw): 

1011 return self 

1012 

1013 

1014class ilike_case_insensitive( 

1015 roles.BinaryElementRole[Any], elements.CompilerColumnElement 

1016): 

1017 """produce a wrapping element for a case-insensitive portion of 

1018 an ILIKE construct. 

1019 

1020 The construct usually renders the ``lower()`` function, but on 

1021 PostgreSQL will pass silently with the assumption that "ILIKE" 

1022 is being used. 

1023 

1024 .. versionadded:: 2.0 

1025 

1026 """ 

1027 

1028 __visit_name__ = "ilike_case_insensitive_operand" 

1029 __slots__ = "element", "comparator" 

1030 

1031 def __init__(self, element): 

1032 self.element = element 

1033 self.comparator = element.comparator 

1034 

1035 @property 

1036 def proxy_set(self): 

1037 return self.element.proxy_set 

1038 

1039 @property 

1040 def type(self): 

1041 return self.element.type 

1042 

1043 def self_group(self, **kw): 

1044 return self 

1045 

1046 def _with_binary_element_type(self, type_): 

1047 return ilike_case_insensitive( 

1048 self.element._with_binary_element_type(type_) 

1049 ) 

1050 

1051 

1052class SQLCompiler(Compiled): 

1053 """Default implementation of :class:`.Compiled`. 

1054 

1055 Compiles :class:`_expression.ClauseElement` objects into SQL strings. 

1056 

1057 """ 

1058 

1059 extract_map = EXTRACT_MAP 

1060 

1061 bindname_escape_characters: ClassVar[Mapping[str, str]] = ( 

1062 util.immutabledict( 

1063 { 

1064 "%": "P", 

1065 "(": "A", 

1066 ")": "Z", 

1067 ":": "C", 

1068 ".": "_", 

1069 "[": "_", 

1070 "]": "_", 

1071 " ": "_", 

1072 } 

1073 ) 

1074 ) 

1075 """A mapping (e.g. dict or similar) containing a lookup of 

1076 characters keyed to replacement characters which will be applied to all 

1077 'bind names' used in SQL statements as a form of 'escaping'; the given 

1078 characters are replaced entirely with the 'replacement' character when 

1079 rendered in the SQL statement, and a similar translation is performed 

1080 on the incoming names used in parameter dictionaries passed to methods 

1081 like :meth:`_engine.Connection.execute`. 

1082 

1083 This allows bound parameter names used in :func:`_sql.bindparam` and 

1084 other constructs to have any arbitrary characters present without any 

1085 concern for characters that aren't allowed at all on the target database. 

1086 

1087 Third party dialects can establish their own dictionary here to replace the 

1088 default mapping, which will ensure that the particular characters in the 

1089 mapping will never appear in a bound parameter name. 

1090 

1091 The dictionary is evaluated at **class creation time**, so cannot be 

1092 modified at runtime; it must be present on the class when the class 

1093 is first declared. 

1094 

1095 Note that for dialects that have additional bound parameter rules such 

1096 as additional restrictions on leading characters, the 

1097 :meth:`_sql.SQLCompiler.bindparam_string` method may need to be augmented. 

1098 See the cx_Oracle compiler for an example of this. 

1099 

1100 .. versionadded:: 2.0.0rc1 

1101 

1102 """ 

1103 

1104 _bind_translate_re: ClassVar[Pattern[str]] 

1105 _bind_translate_chars: ClassVar[Mapping[str, str]] 

1106 

1107 is_sql = True 

1108 

1109 compound_keywords = COMPOUND_KEYWORDS 

1110 

1111 isdelete: bool = False 

1112 isinsert: bool = False 

1113 isupdate: bool = False 

1114 """class-level defaults which can be set at the instance 

1115 level to define if this Compiled instance represents 

1116 INSERT/UPDATE/DELETE 

1117 """ 

1118 

1119 postfetch: Optional[List[Column[Any]]] 

1120 """list of columns that can be post-fetched after INSERT or UPDATE to 

1121 receive server-updated values""" 

1122 

1123 insert_prefetch: Sequence[Column[Any]] = () 

1124 """list of columns for which default values should be evaluated before 

1125 an INSERT takes place""" 

1126 

1127 update_prefetch: Sequence[Column[Any]] = () 

1128 """list of columns for which onupdate default values should be evaluated 

1129 before an UPDATE takes place""" 

1130 

1131 implicit_returning: Optional[Sequence[ColumnElement[Any]]] = None 

1132 """list of "implicit" returning columns for a toplevel INSERT or UPDATE 

1133 statement, used to receive newly generated values of columns. 

1134 

1135 .. versionadded:: 2.0 ``implicit_returning`` replaces the previous 

1136 ``returning`` collection, which was not a generalized RETURNING 

1137 collection and instead was in fact specific to the "implicit returning" 

1138 feature. 

1139 

1140 """ 

1141 

1142 isplaintext: bool = False 

1143 

1144 binds: Dict[str, BindParameter[Any]] 

1145 """a dictionary of bind parameter keys to BindParameter instances.""" 

1146 

1147 bind_names: Dict[BindParameter[Any], str] 

1148 """a dictionary of BindParameter instances to "compiled" names 

1149 that are actually present in the generated SQL""" 

1150 

1151 stack: List[_CompilerStackEntry] 

1152 """major statements such as SELECT, INSERT, UPDATE, DELETE are 

1153 tracked in this stack using an entry format.""" 

1154 

1155 returning_precedes_values: bool = False 

1156 """set to True classwide to generate RETURNING 

1157 clauses before the VALUES or WHERE clause (i.e. MSSQL) 

1158 """ 

1159 

1160 render_table_with_column_in_update_from: bool = False 

1161 """set to True classwide to indicate the SET clause 

1162 in a multi-table UPDATE statement should qualify 

1163 columns with the table name (i.e. MySQL only) 

1164 """ 

1165 

1166 ansi_bind_rules: bool = False 

1167 """SQL 92 doesn't allow bind parameters to be used 

1168 in the columns clause of a SELECT, nor does it allow 

1169 ambiguous expressions like "? = ?". A compiler 

1170 subclass can set this flag to False if the target 

1171 driver/DB enforces this 

1172 """ 

1173 

1174 bindtemplate: str 

1175 """template to render bound parameters based on paramstyle.""" 

1176 

1177 compilation_bindtemplate: str 

1178 """template used by compiler to render parameters before positional 

1179 paramstyle application""" 

1180 

1181 _numeric_binds_identifier_char: str 

1182 """Character that's used to as the identifier of a numerical bind param. 

1183 For example if this char is set to ``$``, numerical binds will be rendered 

1184 in the form ``$1, $2, $3``. 

1185 """ 

1186 

1187 _result_columns: List[ResultColumnsEntry] 

1188 """relates label names in the final SQL to a tuple of local 

1189 column/label name, ColumnElement object (if any) and 

1190 TypeEngine. CursorResult uses this for type processing and 

1191 column targeting""" 

1192 

1193 _textual_ordered_columns: bool = False 

1194 """tell the result object that the column names as rendered are important, 

1195 but they are also "ordered" vs. what is in the compiled object here. 

1196 

1197 As of 1.4.42 this condition is only present when the statement is a 

1198 TextualSelect, e.g. text("....").columns(...), where it is required 

1199 that the columns are considered positionally and not by name. 

1200 

1201 """ 

1202 

1203 _ad_hoc_textual: bool = False 

1204 """tell the result that we encountered text() or '*' constructs in the 

1205 middle of the result columns, but we also have compiled columns, so 

1206 if the number of columns in cursor.description does not match how many 

1207 expressions we have, that means we can't rely on positional at all and 

1208 should match on name. 

1209 

1210 """ 

1211 

1212 _ordered_columns: bool = True 

1213 """ 

1214 if False, means we can't be sure the list of entries 

1215 in _result_columns is actually the rendered order. Usually 

1216 True unless using an unordered TextualSelect. 

1217 """ 

1218 

1219 _loose_column_name_matching: bool = False 

1220 """tell the result object that the SQL statement is textual, wants to match 

1221 up to Column objects, and may be using the ._tq_label in the SELECT rather 

1222 than the base name. 

1223 

1224 """ 

1225 

1226 _numeric_binds: bool = False 

1227 """ 

1228 True if paramstyle is "numeric". This paramstyle is trickier than 

1229 all the others. 

1230 

1231 """ 

1232 

1233 _render_postcompile: bool = False 

1234 """ 

1235 whether to render out POSTCOMPILE params during the compile phase. 

1236 

1237 This attribute is used only for end-user invocation of stmt.compile(); 

1238 it's never used for actual statement execution, where instead the 

1239 dialect internals access and render the internal postcompile structure 

1240 directly. 

1241 

1242 """ 

1243 

1244 _post_compile_expanded_state: Optional[ExpandedState] = None 

1245 """When render_postcompile is used, the ``ExpandedState`` used to create 

1246 the "expanded" SQL is assigned here, and then used by the ``.params`` 

1247 accessor and ``.construct_params()`` methods for their return values. 

1248 

1249 .. versionadded:: 2.0.0rc1 

1250 

1251 """ 

1252 

1253 _pre_expanded_string: Optional[str] = None 

1254 """Stores the original string SQL before 'post_compile' is applied, 

1255 for cases where 'post_compile' were used. 

1256 

1257 """ 

1258 

1259 _pre_expanded_positiontup: Optional[List[str]] = None 

1260 

1261 _insertmanyvalues: Optional[_InsertManyValues] = None 

1262 

1263 _insert_crud_params: Optional[crud._CrudParamSequence] = None 

1264 

1265 literal_execute_params: FrozenSet[BindParameter[Any]] = frozenset() 

1266 """bindparameter objects that are rendered as literal values at statement 

1267 execution time. 

1268 

1269 """ 

1270 

1271 post_compile_params: FrozenSet[BindParameter[Any]] = frozenset() 

1272 """bindparameter objects that are rendered as bound parameter placeholders 

1273 at statement execution time. 

1274 

1275 """ 

1276 

1277 escaped_bind_names: util.immutabledict[str, str] = util.EMPTY_DICT 

1278 """Late escaping of bound parameter names that has to be converted 

1279 to the original name when looking in the parameter dictionary. 

1280 

1281 """ 

1282 

1283 has_out_parameters = False 

1284 """if True, there are bindparam() objects that have the isoutparam 

1285 flag set.""" 

1286 

1287 postfetch_lastrowid = False 

1288 """if True, and this in insert, use cursor.lastrowid to populate 

1289 result.inserted_primary_key. """ 

1290 

1291 _cache_key_bind_match: Optional[ 

1292 Tuple[ 

1293 Dict[ 

1294 BindParameter[Any], 

1295 List[BindParameter[Any]], 

1296 ], 

1297 Dict[ 

1298 str, 

1299 BindParameter[Any], 

1300 ], 

1301 ] 

1302 ] = None 

1303 """a mapping that will relate the BindParameter object we compile 

1304 to those that are part of the extracted collection of parameters 

1305 in the cache key, if we were given a cache key. 

1306 

1307 """ 

1308 

1309 positiontup: Optional[List[str]] = None 

1310 """for a compiled construct that uses a positional paramstyle, will be 

1311 a sequence of strings, indicating the names of bound parameters in order. 

1312 

1313 This is used in order to render bound parameters in their correct order, 

1314 and is combined with the :attr:`_sql.Compiled.params` dictionary to 

1315 render parameters. 

1316 

1317 This sequence always contains the unescaped name of the parameters. 

1318 

1319 .. seealso:: 

1320 

1321 :ref:`faq_sql_expression_string` - includes a usage example for 

1322 debugging use cases. 

1323 

1324 """ 

1325 _values_bindparam: Optional[List[str]] = None 

1326 

1327 _visited_bindparam: Optional[List[str]] = None 

1328 

1329 inline: bool = False 

1330 

1331 ctes: Optional[MutableMapping[CTE, str]] 

1332 

1333 # Detect same CTE references - Dict[(level, name), cte] 

1334 # Level is required for supporting nesting 

1335 ctes_by_level_name: Dict[Tuple[int, str], CTE] 

1336 

1337 # To retrieve key/level in ctes_by_level_name - 

1338 # Dict[cte_reference, (level, cte_name, cte_opts)] 

1339 level_name_by_cte: Dict[CTE, Tuple[int, str, selectable._CTEOpts]] 

1340 

1341 ctes_recursive: bool 

1342 

1343 _post_compile_pattern = re.compile(r"__\[POSTCOMPILE_(\S+?)(~~.+?~~)?\]") 

1344 _pyformat_pattern = re.compile(r"%\(([^)]+?)\)s") 

1345 _positional_pattern = re.compile( 

1346 f"{_pyformat_pattern.pattern}|{_post_compile_pattern.pattern}" 

1347 ) 

1348 

1349 @classmethod 

1350 def _init_compiler_cls(cls): 

1351 cls._init_bind_translate() 

1352 

1353 @classmethod 

1354 def _init_bind_translate(cls): 

1355 reg = re.escape("".join(cls.bindname_escape_characters)) 

1356 cls._bind_translate_re = re.compile(f"[{reg}]") 

1357 cls._bind_translate_chars = cls.bindname_escape_characters 

1358 

1359 def __init__( 

1360 self, 

1361 dialect: Dialect, 

1362 statement: Optional[ClauseElement], 

1363 cache_key: Optional[CacheKey] = None, 

1364 column_keys: Optional[Sequence[str]] = None, 

1365 for_executemany: bool = False, 

1366 linting: Linting = NO_LINTING, 

1367 _supporting_against: Optional[SQLCompiler] = None, 

1368 **kwargs: Any, 

1369 ): 

1370 """Construct a new :class:`.SQLCompiler` object. 

1371 

1372 :param dialect: :class:`.Dialect` to be used 

1373 

1374 :param statement: :class:`_expression.ClauseElement` to be compiled 

1375 

1376 :param column_keys: a list of column names to be compiled into an 

1377 INSERT or UPDATE statement. 

1378 

1379 :param for_executemany: whether INSERT / UPDATE statements should 

1380 expect that they are to be invoked in an "executemany" style, 

1381 which may impact how the statement will be expected to return the 

1382 values of defaults and autoincrement / sequences and similar. 

1383 Depending on the backend and driver in use, support for retrieving 

1384 these values may be disabled which means SQL expressions may 

1385 be rendered inline, RETURNING may not be rendered, etc. 

1386 

1387 :param kwargs: additional keyword arguments to be consumed by the 

1388 superclass. 

1389 

1390 """ 

1391 self.column_keys = column_keys 

1392 

1393 self.cache_key = cache_key 

1394 

1395 if cache_key: 

1396 cksm = {b.key: b for b in cache_key[1]} 

1397 ckbm = {b: [b] for b in cache_key[1]} 

1398 self._cache_key_bind_match = (ckbm, cksm) 

1399 

1400 # compile INSERT/UPDATE defaults/sequences to expect executemany 

1401 # style execution, which may mean no pre-execute of defaults, 

1402 # or no RETURNING 

1403 self.for_executemany = for_executemany 

1404 

1405 self.linting = linting 

1406 

1407 # a dictionary of bind parameter keys to BindParameter 

1408 # instances. 

1409 self.binds = {} 

1410 

1411 # a dictionary of BindParameter instances to "compiled" names 

1412 # that are actually present in the generated SQL 

1413 self.bind_names = util.column_dict() 

1414 

1415 # stack which keeps track of nested SELECT statements 

1416 self.stack = [] 

1417 

1418 self._result_columns = [] 

1419 

1420 # true if the paramstyle is positional 

1421 self.positional = dialect.positional 

1422 if self.positional: 

1423 self._numeric_binds = nb = dialect.paramstyle.startswith("numeric") 

1424 if nb: 

1425 self._numeric_binds_identifier_char = ( 

1426 "$" if dialect.paramstyle == "numeric_dollar" else ":" 

1427 ) 

1428 

1429 self.compilation_bindtemplate = _pyformat_template 

1430 else: 

1431 self.compilation_bindtemplate = BIND_TEMPLATES[dialect.paramstyle] 

1432 

1433 self.ctes = None 

1434 

1435 self.label_length = ( 

1436 dialect.label_length or dialect.max_identifier_length 

1437 ) 

1438 

1439 # a map which tracks "anonymous" identifiers that are created on 

1440 # the fly here 

1441 self.anon_map = prefix_anon_map() 

1442 

1443 # a map which tracks "truncated" names based on 

1444 # dialect.label_length or dialect.max_identifier_length 

1445 self.truncated_names: Dict[Tuple[str, str], str] = {} 

1446 self._truncated_counters: Dict[str, int] = {} 

1447 

1448 Compiled.__init__(self, dialect, statement, **kwargs) 

1449 

1450 if self.isinsert or self.isupdate or self.isdelete: 

1451 if TYPE_CHECKING: 

1452 assert isinstance(statement, UpdateBase) 

1453 

1454 if self.isinsert or self.isupdate: 

1455 if TYPE_CHECKING: 

1456 assert isinstance(statement, ValuesBase) 

1457 if statement._inline: 

1458 self.inline = True 

1459 elif self.for_executemany and ( 

1460 not self.isinsert 

1461 or ( 

1462 self.dialect.insert_executemany_returning 

1463 and statement._return_defaults 

1464 ) 

1465 ): 

1466 self.inline = True 

1467 

1468 self.bindtemplate = BIND_TEMPLATES[dialect.paramstyle] 

1469 

1470 if _supporting_against: 

1471 self.__dict__.update( 

1472 { 

1473 k: v 

1474 for k, v in _supporting_against.__dict__.items() 

1475 if k 

1476 not in { 

1477 "state", 

1478 "dialect", 

1479 "preparer", 

1480 "positional", 

1481 "_numeric_binds", 

1482 "compilation_bindtemplate", 

1483 "bindtemplate", 

1484 } 

1485 } 

1486 ) 

1487 

1488 if self.state is CompilerState.STRING_APPLIED: 

1489 if self.positional: 

1490 if self._numeric_binds: 

1491 self._process_numeric() 

1492 else: 

1493 self._process_positional() 

1494 

1495 if self._render_postcompile: 

1496 parameters = self.construct_params( 

1497 escape_names=False, 

1498 _no_postcompile=True, 

1499 ) 

1500 

1501 self._process_parameters_for_postcompile( 

1502 parameters, _populate_self=True 

1503 ) 

1504 

1505 @property 

1506 def insert_single_values_expr(self) -> Optional[str]: 

1507 """When an INSERT is compiled with a single set of parameters inside 

1508 a VALUES expression, the string is assigned here, where it can be 

1509 used for insert batching schemes to rewrite the VALUES expression. 

1510 

1511 .. versionadded:: 1.3.8 

1512 

1513 .. versionchanged:: 2.0 This collection is no longer used by 

1514 SQLAlchemy's built-in dialects, in favor of the currently 

1515 internal ``_insertmanyvalues`` collection that is used only by 

1516 :class:`.SQLCompiler`. 

1517 

1518 """ 

1519 if self._insertmanyvalues is None: 

1520 return None 

1521 else: 

1522 return self._insertmanyvalues.single_values_expr 

1523 

1524 @util.ro_memoized_property 

1525 def effective_returning(self) -> Optional[Sequence[ColumnElement[Any]]]: 

1526 """The effective "returning" columns for INSERT, UPDATE or DELETE. 

1527 

1528 This is either the so-called "implicit returning" columns which are 

1529 calculated by the compiler on the fly, or those present based on what's 

1530 present in ``self.statement._returning`` (expanded into individual 

1531 columns using the ``._all_selected_columns`` attribute) i.e. those set 

1532 explicitly using the :meth:`.UpdateBase.returning` method. 

1533 

1534 .. versionadded:: 2.0 

1535 

1536 """ 

1537 if self.implicit_returning: 

1538 return self.implicit_returning 

1539 elif self.statement is not None and is_dml(self.statement): 

1540 return [ 

1541 c 

1542 for c in self.statement._all_selected_columns 

1543 if is_column_element(c) 

1544 ] 

1545 

1546 else: 

1547 return None 

1548 

1549 @property 

1550 def returning(self): 

1551 """backwards compatibility; returns the 

1552 effective_returning collection. 

1553 

1554 """ 

1555 return self.effective_returning 

1556 

1557 @property 

1558 def current_executable(self): 

1559 """Return the current 'executable' that is being compiled. 

1560 

1561 This is currently the :class:`_sql.Select`, :class:`_sql.Insert`, 

1562 :class:`_sql.Update`, :class:`_sql.Delete`, 

1563 :class:`_sql.CompoundSelect` object that is being compiled. 

1564 Specifically it's assigned to the ``self.stack`` list of elements. 

1565 

1566 When a statement like the above is being compiled, it normally 

1567 is also assigned to the ``.statement`` attribute of the 

1568 :class:`_sql.Compiler` object. However, all SQL constructs are 

1569 ultimately nestable, and this attribute should never be consulted 

1570 by a ``visit_`` method, as it is not guaranteed to be assigned 

1571 nor guaranteed to correspond to the current statement being compiled. 

1572 

1573 .. versionadded:: 1.3.21 

1574 

1575 For compatibility with previous versions, use the following 

1576 recipe:: 

1577 

1578 statement = getattr(self, "current_executable", False) 

1579 if statement is False: 

1580 statement = self.stack[-1]["selectable"] 

1581 

1582 For versions 1.4 and above, ensure only .current_executable 

1583 is used; the format of "self.stack" may change. 

1584 

1585 

1586 """ 

1587 try: 

1588 return self.stack[-1]["selectable"] 

1589 except IndexError as ie: 

1590 raise IndexError("Compiler does not have a stack entry") from ie 

1591 

1592 @property 

1593 def prefetch(self): 

1594 return list(self.insert_prefetch) + list(self.update_prefetch) 

1595 

1596 @util.memoized_property 

1597 def _global_attributes(self) -> Dict[Any, Any]: 

1598 return {} 

1599 

1600 @util.memoized_instancemethod 

1601 def _init_cte_state(self) -> MutableMapping[CTE, str]: 

1602 """Initialize collections related to CTEs only if 

1603 a CTE is located, to save on the overhead of 

1604 these collections otherwise. 

1605 

1606 """ 

1607 # collect CTEs to tack on top of a SELECT 

1608 # To store the query to print - Dict[cte, text_query] 

1609 ctes: MutableMapping[CTE, str] = util.OrderedDict() 

1610 self.ctes = ctes 

1611 

1612 # Detect same CTE references - Dict[(level, name), cte] 

1613 # Level is required for supporting nesting 

1614 self.ctes_by_level_name = {} 

1615 

1616 # To retrieve key/level in ctes_by_level_name - 

1617 # Dict[cte_reference, (level, cte_name, cte_opts)] 

1618 self.level_name_by_cte = {} 

1619 

1620 self.ctes_recursive = False 

1621 

1622 return ctes 

1623 

1624 @contextlib.contextmanager 

1625 def _nested_result(self): 

1626 """special API to support the use case of 'nested result sets'""" 

1627 result_columns, ordered_columns = ( 

1628 self._result_columns, 

1629 self._ordered_columns, 

1630 ) 

1631 self._result_columns, self._ordered_columns = [], False 

1632 

1633 try: 

1634 if self.stack: 

1635 entry = self.stack[-1] 

1636 entry["need_result_map_for_nested"] = True 

1637 else: 

1638 entry = None 

1639 yield self._result_columns, self._ordered_columns 

1640 finally: 

1641 if entry: 

1642 entry.pop("need_result_map_for_nested") 

1643 self._result_columns, self._ordered_columns = ( 

1644 result_columns, 

1645 ordered_columns, 

1646 ) 

1647 

1648 def _process_positional(self): 

1649 assert not self.positiontup 

1650 assert self.state is CompilerState.STRING_APPLIED 

1651 assert not self._numeric_binds 

1652 

1653 if self.dialect.paramstyle == "format": 

1654 placeholder = "%s" 

1655 else: 

1656 assert self.dialect.paramstyle == "qmark" 

1657 placeholder = "?" 

1658 

1659 positions = [] 

1660 

1661 def find_position(m: re.Match[str]) -> str: 

1662 normal_bind = m.group(1) 

1663 if normal_bind: 

1664 positions.append(normal_bind) 

1665 return placeholder 

1666 else: 

1667 # this a post-compile bind 

1668 positions.append(m.group(2)) 

1669 return m.group(0) 

1670 

1671 self.string = re.sub( 

1672 self._positional_pattern, find_position, self.string 

1673 ) 

1674 

1675 if self.escaped_bind_names: 

1676 reverse_escape = {v: k for k, v in self.escaped_bind_names.items()} 

1677 assert len(self.escaped_bind_names) == len(reverse_escape) 

1678 self.positiontup = [ 

1679 reverse_escape.get(name, name) for name in positions 

1680 ] 

1681 else: 

1682 self.positiontup = positions 

1683 

1684 if self._insertmanyvalues: 

1685 positions = [] 

1686 

1687 single_values_expr = re.sub( 

1688 self._positional_pattern, 

1689 find_position, 

1690 self._insertmanyvalues.single_values_expr, 

1691 ) 

1692 insert_crud_params = [ 

1693 ( 

1694 v[0], 

1695 v[1], 

1696 re.sub(self._positional_pattern, find_position, v[2]), 

1697 v[3], 

1698 ) 

1699 for v in self._insertmanyvalues.insert_crud_params 

1700 ] 

1701 

1702 self._insertmanyvalues = self._insertmanyvalues._replace( 

1703 single_values_expr=single_values_expr, 

1704 insert_crud_params=insert_crud_params, 

1705 ) 

1706 

1707 def _process_numeric(self): 

1708 assert self._numeric_binds 

1709 assert self.state is CompilerState.STRING_APPLIED 

1710 

1711 num = 1 

1712 param_pos: Dict[str, str] = {} 

1713 order: Iterable[str] 

1714 if self._insertmanyvalues and self._values_bindparam is not None: 

1715 # bindparams that are not in values are always placed first. 

1716 # this avoids the need of changing them when using executemany 

1717 # values () () 

1718 order = itertools.chain( 

1719 ( 

1720 name 

1721 for name in self.bind_names.values() 

1722 if name not in self._values_bindparam 

1723 ), 

1724 self.bind_names.values(), 

1725 ) 

1726 else: 

1727 order = self.bind_names.values() 

1728 

1729 for bind_name in order: 

1730 if bind_name in param_pos: 

1731 continue 

1732 bind = self.binds[bind_name] 

1733 if ( 

1734 bind in self.post_compile_params 

1735 or bind in self.literal_execute_params 

1736 ): 

1737 # set to None to just mark the in positiontup, it will not 

1738 # be replaced below. 

1739 param_pos[bind_name] = None # type: ignore 

1740 else: 

1741 ph = f"{self._numeric_binds_identifier_char}{num}" 

1742 num += 1 

1743 param_pos[bind_name] = ph 

1744 

1745 self.next_numeric_pos = num 

1746 

1747 self.positiontup = list(param_pos) 

1748 if self.escaped_bind_names: 

1749 len_before = len(param_pos) 

1750 param_pos = { 

1751 self.escaped_bind_names.get(name, name): pos 

1752 for name, pos in param_pos.items() 

1753 } 

1754 assert len(param_pos) == len_before 

1755 

1756 # Can't use format here since % chars are not escaped. 

1757 self.string = self._pyformat_pattern.sub( 

1758 lambda m: param_pos[m.group(1)], self.string 

1759 ) 

1760 

1761 if self._insertmanyvalues: 

1762 single_values_expr = ( 

1763 # format is ok here since single_values_expr includes only 

1764 # place-holders 

1765 self._insertmanyvalues.single_values_expr 

1766 % param_pos 

1767 ) 

1768 insert_crud_params = [ 

1769 (v[0], v[1], "%s", v[3]) 

1770 for v in self._insertmanyvalues.insert_crud_params 

1771 ] 

1772 

1773 self._insertmanyvalues = self._insertmanyvalues._replace( 

1774 # This has the numbers (:1, :2) 

1775 single_values_expr=single_values_expr, 

1776 # The single binds are instead %s so they can be formatted 

1777 insert_crud_params=insert_crud_params, 

1778 ) 

1779 

1780 @util.memoized_property 

1781 def _bind_processors( 

1782 self, 

1783 ) -> MutableMapping[ 

1784 str, Union[_BindProcessorType[Any], Sequence[_BindProcessorType[Any]]] 

1785 ]: 

1786 # mypy is not able to see the two value types as the above Union, 

1787 # it just sees "object". don't know how to resolve 

1788 return { 

1789 key: value # type: ignore 

1790 for key, value in ( 

1791 ( 

1792 self.bind_names[bindparam], 

1793 ( 

1794 bindparam.type._cached_bind_processor(self.dialect) 

1795 if not bindparam.type._is_tuple_type 

1796 else tuple( 

1797 elem_type._cached_bind_processor(self.dialect) 

1798 for elem_type in cast( 

1799 TupleType, bindparam.type 

1800 ).types 

1801 ) 

1802 ), 

1803 ) 

1804 for bindparam in self.bind_names 

1805 ) 

1806 if value is not None 

1807 } 

1808 

1809 def is_subquery(self): 

1810 return len(self.stack) > 1 

1811 

1812 @property 

1813 def sql_compiler(self) -> Self: 

1814 return self 

1815 

1816 def construct_expanded_state( 

1817 self, 

1818 params: Optional[_CoreSingleExecuteParams] = None, 

1819 escape_names: bool = True, 

1820 ) -> ExpandedState: 

1821 """Return a new :class:`.ExpandedState` for a given parameter set. 

1822 

1823 For queries that use "expanding" or other late-rendered parameters, 

1824 this method will provide for both the finalized SQL string as well 

1825 as the parameters that would be used for a particular parameter set. 

1826 

1827 .. versionadded:: 2.0.0rc1 

1828 

1829 """ 

1830 parameters = self.construct_params( 

1831 params, 

1832 escape_names=escape_names, 

1833 _no_postcompile=True, 

1834 ) 

1835 return self._process_parameters_for_postcompile( 

1836 parameters, 

1837 ) 

1838 

1839 def construct_params( 

1840 self, 

1841 params: Optional[_CoreSingleExecuteParams] = None, 

1842 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None, 

1843 escape_names: bool = True, 

1844 _group_number: Optional[int] = None, 

1845 _check: bool = True, 

1846 _no_postcompile: bool = False, 

1847 ) -> _MutableCoreSingleExecuteParams: 

1848 """return a dictionary of bind parameter keys and values""" 

1849 

1850 if self._render_postcompile and not _no_postcompile: 

1851 assert self._post_compile_expanded_state is not None 

1852 if not params: 

1853 return dict(self._post_compile_expanded_state.parameters) 

1854 else: 

1855 raise exc.InvalidRequestError( 

1856 "can't construct new parameters when render_postcompile " 

1857 "is used; the statement is hard-linked to the original " 

1858 "parameters. Use construct_expanded_state to generate a " 

1859 "new statement and parameters." 

1860 ) 

1861 

1862 has_escaped_names = escape_names and bool(self.escaped_bind_names) 

1863 

1864 if extracted_parameters: 

1865 # related the bound parameters collected in the original cache key 

1866 # to those collected in the incoming cache key. They will not have 

1867 # matching names but they will line up positionally in the same 

1868 # way. The parameters present in self.bind_names may be clones of 

1869 # these original cache key params in the case of DML but the .key 

1870 # will be guaranteed to match. 

1871 if self.cache_key is None: 

1872 raise exc.CompileError( 

1873 "This compiled object has no original cache key; " 

1874 "can't pass extracted_parameters to construct_params" 

1875 ) 

1876 else: 

1877 orig_extracted = self.cache_key[1] 

1878 

1879 ckbm_tuple = self._cache_key_bind_match 

1880 assert ckbm_tuple is not None 

1881 ckbm, _ = ckbm_tuple 

1882 resolved_extracted = { 

1883 bind: extracted 

1884 for b, extracted in zip(orig_extracted, extracted_parameters) 

1885 for bind in ckbm[b] 

1886 } 

1887 else: 

1888 resolved_extracted = None 

1889 

1890 if params: 

1891 pd = {} 

1892 for bindparam, name in self.bind_names.items(): 

1893 escaped_name = ( 

1894 self.escaped_bind_names.get(name, name) 

1895 if has_escaped_names 

1896 else name 

1897 ) 

1898 

1899 if bindparam.key in params: 

1900 pd[escaped_name] = params[bindparam.key] 

1901 elif name in params: 

1902 pd[escaped_name] = params[name] 

1903 

1904 elif _check and bindparam.required: 

1905 if _group_number: 

1906 raise exc.InvalidRequestError( 

1907 "A value is required for bind parameter %r, " 

1908 "in parameter group %d" 

1909 % (bindparam.key, _group_number), 

1910 code="cd3x", 

1911 ) 

1912 else: 

1913 raise exc.InvalidRequestError( 

1914 "A value is required for bind parameter %r" 

1915 % bindparam.key, 

1916 code="cd3x", 

1917 ) 

1918 else: 

1919 if resolved_extracted: 

1920 value_param = resolved_extracted.get( 

1921 bindparam, bindparam 

1922 ) 

1923 else: 

1924 value_param = bindparam 

1925 

1926 if bindparam.callable: 

1927 pd[escaped_name] = value_param.effective_value 

1928 else: 

1929 pd[escaped_name] = value_param.value 

1930 return pd 

1931 else: 

1932 pd = {} 

1933 for bindparam, name in self.bind_names.items(): 

1934 escaped_name = ( 

1935 self.escaped_bind_names.get(name, name) 

1936 if has_escaped_names 

1937 else name 

1938 ) 

1939 

1940 if _check and bindparam.required: 

1941 if _group_number: 

1942 raise exc.InvalidRequestError( 

1943 "A value is required for bind parameter %r, " 

1944 "in parameter group %d" 

1945 % (bindparam.key, _group_number), 

1946 code="cd3x", 

1947 ) 

1948 else: 

1949 raise exc.InvalidRequestError( 

1950 "A value is required for bind parameter %r" 

1951 % bindparam.key, 

1952 code="cd3x", 

1953 ) 

1954 

1955 if resolved_extracted: 

1956 value_param = resolved_extracted.get(bindparam, bindparam) 

1957 else: 

1958 value_param = bindparam 

1959 

1960 if bindparam.callable: 

1961 pd[escaped_name] = value_param.effective_value 

1962 else: 

1963 pd[escaped_name] = value_param.value 

1964 

1965 return pd 

1966 

1967 @util.memoized_instancemethod 

1968 def _get_set_input_sizes_lookup(self): 

1969 dialect = self.dialect 

1970 

1971 include_types = dialect.include_set_input_sizes 

1972 exclude_types = dialect.exclude_set_input_sizes 

1973 

1974 dbapi = dialect.dbapi 

1975 

1976 def lookup_type(typ): 

1977 dbtype = typ._unwrapped_dialect_impl(dialect).get_dbapi_type(dbapi) 

1978 

1979 if ( 

1980 dbtype is not None 

1981 and (exclude_types is None or dbtype not in exclude_types) 

1982 and (include_types is None or dbtype in include_types) 

1983 ): 

1984 return dbtype 

1985 else: 

1986 return None 

1987 

1988 inputsizes = {} 

1989 

1990 literal_execute_params = self.literal_execute_params 

1991 

1992 for bindparam in self.bind_names: 

1993 if bindparam in literal_execute_params: 

1994 continue 

1995 

1996 if bindparam.type._is_tuple_type: 

1997 inputsizes[bindparam] = [ 

1998 lookup_type(typ) 

1999 for typ in cast(TupleType, bindparam.type).types 

2000 ] 

2001 else: 

2002 inputsizes[bindparam] = lookup_type(bindparam.type) 

2003 

2004 return inputsizes 

2005 

2006 @property 

2007 def params(self): 

2008 """Return the bind param dictionary embedded into this 

2009 compiled object, for those values that are present. 

2010 

2011 .. seealso:: 

2012 

2013 :ref:`faq_sql_expression_string` - includes a usage example for 

2014 debugging use cases. 

2015 

2016 """ 

2017 return self.construct_params(_check=False) 

2018 

2019 def _process_parameters_for_postcompile( 

2020 self, 

2021 parameters: _MutableCoreSingleExecuteParams, 

2022 _populate_self: bool = False, 

2023 ) -> ExpandedState: 

2024 """handle special post compile parameters. 

2025 

2026 These include: 

2027 

2028 * "expanding" parameters -typically IN tuples that are rendered 

2029 on a per-parameter basis for an otherwise fixed SQL statement string. 

2030 

2031 * literal_binds compiled with the literal_execute flag. Used for 

2032 things like SQL Server "TOP N" where the driver does not accommodate 

2033 N as a bound parameter. 

2034 

2035 """ 

2036 

2037 expanded_parameters = {} 

2038 new_positiontup: Optional[List[str]] 

2039 

2040 pre_expanded_string = self._pre_expanded_string 

2041 if pre_expanded_string is None: 

2042 pre_expanded_string = self.string 

2043 

2044 if self.positional: 

2045 new_positiontup = [] 

2046 

2047 pre_expanded_positiontup = self._pre_expanded_positiontup 

2048 if pre_expanded_positiontup is None: 

2049 pre_expanded_positiontup = self.positiontup 

2050 

2051 else: 

2052 new_positiontup = pre_expanded_positiontup = None 

2053 

2054 processors = self._bind_processors 

2055 single_processors = cast( 

2056 "Mapping[str, _BindProcessorType[Any]]", processors 

2057 ) 

2058 tuple_processors = cast( 

2059 "Mapping[str, Sequence[_BindProcessorType[Any]]]", processors 

2060 ) 

2061 

2062 new_processors: Dict[str, _BindProcessorType[Any]] = {} 

2063 

2064 replacement_expressions: Dict[str, Any] = {} 

2065 to_update_sets: Dict[str, Any] = {} 

2066 

2067 # notes: 

2068 # *unescaped* parameter names in: 

2069 # self.bind_names, self.binds, self._bind_processors, self.positiontup 

2070 # 

2071 # *escaped* parameter names in: 

2072 # construct_params(), replacement_expressions 

2073 

2074 numeric_positiontup: Optional[List[str]] = None 

2075 

2076 if self.positional and pre_expanded_positiontup is not None: 

2077 names: Iterable[str] = pre_expanded_positiontup 

2078 if self._numeric_binds: 

2079 numeric_positiontup = [] 

2080 else: 

2081 names = self.bind_names.values() 

2082 

2083 ebn = self.escaped_bind_names 

2084 for name in names: 

2085 escaped_name = ebn.get(name, name) if ebn else name 

2086 parameter = self.binds[name] 

2087 

2088 if parameter in self.literal_execute_params: 

2089 if escaped_name not in replacement_expressions: 

2090 replacement_expressions[escaped_name] = ( 

2091 self.render_literal_bindparam( 

2092 parameter, 

2093 render_literal_value=parameters.pop(escaped_name), 

2094 ) 

2095 ) 

2096 continue 

2097 

2098 if parameter in self.post_compile_params: 

2099 if escaped_name in replacement_expressions: 

2100 to_update = to_update_sets[escaped_name] 

2101 values = None 

2102 else: 

2103 # we are removing the parameter from parameters 

2104 # because it is a list value, which is not expected by 

2105 # TypeEngine objects that would otherwise be asked to 

2106 # process it. the single name is being replaced with 

2107 # individual numbered parameters for each value in the 

2108 # param. 

2109 # 

2110 # note we are also inserting *escaped* parameter names 

2111 # into the given dictionary. default dialect will 

2112 # use these param names directly as they will not be 

2113 # in the escaped_bind_names dictionary. 

2114 values = parameters.pop(name) 

2115 

2116 leep_res = self._literal_execute_expanding_parameter( 

2117 escaped_name, parameter, values 

2118 ) 

2119 (to_update, replacement_expr) = leep_res 

2120 

2121 to_update_sets[escaped_name] = to_update 

2122 replacement_expressions[escaped_name] = replacement_expr 

2123 

2124 if not parameter.literal_execute: 

2125 parameters.update(to_update) 

2126 if parameter.type._is_tuple_type: 

2127 assert values is not None 

2128 new_processors.update( 

2129 ( 

2130 "%s_%s_%s" % (name, i, j), 

2131 tuple_processors[name][j - 1], 

2132 ) 

2133 for i, tuple_element in enumerate(values, 1) 

2134 for j, _ in enumerate(tuple_element, 1) 

2135 if name in tuple_processors 

2136 and tuple_processors[name][j - 1] is not None 

2137 ) 

2138 else: 

2139 new_processors.update( 

2140 (key, single_processors[name]) 

2141 for key, _ in to_update 

2142 if name in single_processors 

2143 ) 

2144 if numeric_positiontup is not None: 

2145 numeric_positiontup.extend( 

2146 name for name, _ in to_update 

2147 ) 

2148 elif new_positiontup is not None: 

2149 # to_update has escaped names, but that's ok since 

2150 # these are new names, that aren't in the 

2151 # escaped_bind_names dict. 

2152 new_positiontup.extend(name for name, _ in to_update) 

2153 expanded_parameters[name] = [ 

2154 expand_key for expand_key, _ in to_update 

2155 ] 

2156 elif new_positiontup is not None: 

2157 new_positiontup.append(name) 

2158 

2159 def process_expanding(m): 

2160 key = m.group(1) 

2161 expr = replacement_expressions[key] 

2162 

2163 # if POSTCOMPILE included a bind_expression, render that 

2164 # around each element 

2165 if m.group(2): 

2166 tok = m.group(2).split("~~") 

2167 be_left, be_right = tok[1], tok[3] 

2168 expr = ", ".join( 

2169 "%s%s%s" % (be_left, exp, be_right) 

2170 for exp in expr.split(", ") 

2171 ) 

2172 return expr 

2173 

2174 statement = re.sub( 

2175 self._post_compile_pattern, process_expanding, pre_expanded_string 

2176 ) 

2177 

2178 if numeric_positiontup is not None: 

2179 assert new_positiontup is not None 

2180 param_pos = { 

2181 key: f"{self._numeric_binds_identifier_char}{num}" 

2182 for num, key in enumerate( 

2183 numeric_positiontup, self.next_numeric_pos 

2184 ) 

2185 } 

2186 # Can't use format here since % chars are not escaped. 

2187 statement = self._pyformat_pattern.sub( 

2188 lambda m: param_pos[m.group(1)], statement 

2189 ) 

2190 new_positiontup.extend(numeric_positiontup) 

2191 

2192 expanded_state = ExpandedState( 

2193 statement, 

2194 parameters, 

2195 new_processors, 

2196 new_positiontup, 

2197 expanded_parameters, 

2198 ) 

2199 

2200 if _populate_self: 

2201 # this is for the "render_postcompile" flag, which is not 

2202 # otherwise used internally and is for end-user debugging and 

2203 # special use cases. 

2204 self._pre_expanded_string = pre_expanded_string 

2205 self._pre_expanded_positiontup = pre_expanded_positiontup 

2206 self.string = expanded_state.statement 

2207 self.positiontup = ( 

2208 list(expanded_state.positiontup or ()) 

2209 if self.positional 

2210 else None 

2211 ) 

2212 self._post_compile_expanded_state = expanded_state 

2213 

2214 return expanded_state 

2215 

2216 @util.preload_module("sqlalchemy.engine.cursor") 

2217 def _create_result_map(self): 

2218 """utility method used for unit tests only.""" 

2219 cursor = util.preloaded.engine_cursor 

2220 return cursor.CursorResultMetaData._create_description_match_map( 

2221 self._result_columns 

2222 ) 

2223 

2224 # assigned by crud.py for insert/update statements 

2225 _get_bind_name_for_col: _BindNameForColProtocol 

2226 

2227 @util.memoized_property 

2228 def _within_exec_param_key_getter(self) -> Callable[[Any], str]: 

2229 getter = self._get_bind_name_for_col 

2230 return getter 

2231 

2232 @util.memoized_property 

2233 @util.preload_module("sqlalchemy.engine.result") 

2234 def _inserted_primary_key_from_lastrowid_getter(self): 

2235 result = util.preloaded.engine_result 

2236 

2237 param_key_getter = self._within_exec_param_key_getter 

2238 

2239 assert self.compile_state is not None 

2240 statement = self.compile_state.statement 

2241 

2242 if TYPE_CHECKING: 

2243 assert isinstance(statement, Insert) 

2244 

2245 table = statement.table 

2246 

2247 getters = [ 

2248 (operator.methodcaller("get", param_key_getter(col), None), col) 

2249 for col in table.primary_key 

2250 ] 

2251 

2252 autoinc_getter = None 

2253 autoinc_col = table._autoincrement_column 

2254 if autoinc_col is not None: 

2255 # apply type post processors to the lastrowid 

2256 lastrowid_processor = autoinc_col.type._cached_result_processor( 

2257 self.dialect, None 

2258 ) 

2259 autoinc_key = param_key_getter(autoinc_col) 

2260 

2261 # if a bind value is present for the autoincrement column 

2262 # in the parameters, we need to do the logic dictated by 

2263 # #7998; honor a non-None user-passed parameter over lastrowid. 

2264 # previously in the 1.4 series we weren't fetching lastrowid 

2265 # at all if the key were present in the parameters 

2266 if autoinc_key in self.binds: 

2267 

2268 def _autoinc_getter(lastrowid, parameters): 

2269 param_value = parameters.get(autoinc_key, lastrowid) 

2270 if param_value is not None: 

2271 # they supplied non-None parameter, use that. 

2272 # SQLite at least is observed to return the wrong 

2273 # cursor.lastrowid for INSERT..ON CONFLICT so it 

2274 # can't be used in all cases 

2275 return param_value 

2276 else: 

2277 # use lastrowid 

2278 return lastrowid 

2279 

2280 # work around mypy https://github.com/python/mypy/issues/14027 

2281 autoinc_getter = _autoinc_getter 

2282 

2283 else: 

2284 lastrowid_processor = None 

2285 

2286 row_fn = result.result_tuple([col.key for col in table.primary_key]) 

2287 

2288 def get(lastrowid, parameters): 

2289 """given cursor.lastrowid value and the parameters used for INSERT, 

2290 return a "row" that represents the primary key, either by 

2291 using the "lastrowid" or by extracting values from the parameters 

2292 that were sent along with the INSERT. 

2293 

2294 """ 

2295 if lastrowid_processor is not None: 

2296 lastrowid = lastrowid_processor(lastrowid) 

2297 

2298 if lastrowid is None: 

2299 return row_fn(getter(parameters) for getter, col in getters) 

2300 else: 

2301 return row_fn( 

2302 ( 

2303 ( 

2304 autoinc_getter(lastrowid, parameters) 

2305 if autoinc_getter is not None 

2306 else lastrowid 

2307 ) 

2308 if col is autoinc_col 

2309 else getter(parameters) 

2310 ) 

2311 for getter, col in getters 

2312 ) 

2313 

2314 return get 

2315 

2316 @util.memoized_property 

2317 @util.preload_module("sqlalchemy.engine.result") 

2318 def _inserted_primary_key_from_returning_getter(self): 

2319 result = util.preloaded.engine_result 

2320 

2321 assert self.compile_state is not None 

2322 statement = self.compile_state.statement 

2323 

2324 if TYPE_CHECKING: 

2325 assert isinstance(statement, Insert) 

2326 

2327 param_key_getter = self._within_exec_param_key_getter 

2328 table = statement.table 

2329 

2330 returning = self.implicit_returning 

2331 assert returning is not None 

2332 ret = {col: idx for idx, col in enumerate(returning)} 

2333 

2334 getters = cast( 

2335 "List[Tuple[Callable[[Any], Any], bool]]", 

2336 [ 

2337 ( 

2338 (operator.itemgetter(ret[col]), True) 

2339 if col in ret 

2340 else ( 

2341 operator.methodcaller( 

2342 "get", param_key_getter(col), None 

2343 ), 

2344 False, 

2345 ) 

2346 ) 

2347 for col in table.primary_key 

2348 ], 

2349 ) 

2350 

2351 row_fn = result.result_tuple([col.key for col in table.primary_key]) 

2352 

2353 def get(row, parameters): 

2354 return row_fn( 

2355 getter(row) if use_row else getter(parameters) 

2356 for getter, use_row in getters 

2357 ) 

2358 

2359 return get 

2360 

2361 def default_from(self) -> str: 

2362 """Called when a SELECT statement has no froms, and no FROM clause is 

2363 to be appended. 

2364 

2365 Gives Oracle Database a chance to tack on a ``FROM DUAL`` to the string 

2366 output. 

2367 

2368 """ 

2369 return "" 

2370 

2371 def visit_override_binds(self, override_binds, **kw): 

2372 """SQL compile the nested element of an _OverrideBinds with 

2373 bindparams swapped out. 

2374 

2375 The _OverrideBinds is not normally expected to be compiled; it 

2376 is meant to be used when an already cached statement is to be used, 

2377 the compilation was already performed, and only the bound params should 

2378 be swapped in at execution time. 

2379 

2380 However, there are test cases that exericise this object, and 

2381 additionally the ORM subquery loader is known to feed in expressions 

2382 which include this construct into new queries (discovered in #11173), 

2383 so it has to do the right thing at compile time as well. 

2384 

2385 """ 

2386 

2387 # get SQL text first 

2388 sqltext = override_binds.element._compiler_dispatch(self, **kw) 

2389 

2390 # for a test compile that is not for caching, change binds after the 

2391 # fact. note that we don't try to 

2392 # swap the bindparam as we compile, because our element may be 

2393 # elsewhere in the statement already (e.g. a subquery or perhaps a 

2394 # CTE) and was already visited / compiled. See 

2395 # test_relationship_criteria.py -> 

2396 # test_selectinload_local_criteria_subquery 

2397 for k in override_binds.translate: 

2398 if k not in self.binds: 

2399 continue 

2400 bp = self.binds[k] 

2401 

2402 # so this would work, just change the value of bp in place. 

2403 # but we dont want to mutate things outside. 

2404 # bp.value = override_binds.translate[bp.key] 

2405 # continue 

2406 

2407 # instead, need to replace bp with new_bp or otherwise accommodate 

2408 # in all internal collections 

2409 new_bp = bp._with_value( 

2410 override_binds.translate[bp.key], 

2411 maintain_key=True, 

2412 required=False, 

2413 ) 

2414 

2415 name = self.bind_names[bp] 

2416 self.binds[k] = self.binds[name] = new_bp 

2417 self.bind_names[new_bp] = name 

2418 self.bind_names.pop(bp, None) 

2419 

2420 if bp in self.post_compile_params: 

2421 self.post_compile_params |= {new_bp} 

2422 if bp in self.literal_execute_params: 

2423 self.literal_execute_params |= {new_bp} 

2424 

2425 ckbm_tuple = self._cache_key_bind_match 

2426 if ckbm_tuple: 

2427 ckbm, cksm = ckbm_tuple 

2428 for bp in bp._cloned_set: 

2429 if bp.key in cksm: 

2430 cb = cksm[bp.key] 

2431 ckbm[cb].append(new_bp) 

2432 

2433 return sqltext 

2434 

2435 def visit_grouping(self, grouping, asfrom=False, **kwargs): 

2436 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")" 

2437 

2438 def visit_select_statement_grouping(self, grouping, **kwargs): 

2439 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")" 

2440 

2441 def visit_label_reference( 

2442 self, element, within_columns_clause=False, **kwargs 

2443 ): 

2444 if self.stack and self.dialect.supports_simple_order_by_label: 

2445 try: 

2446 compile_state = cast( 

2447 "Union[SelectState, CompoundSelectState]", 

2448 self.stack[-1]["compile_state"], 

2449 ) 

2450 except KeyError as ke: 

2451 raise exc.CompileError( 

2452 "Can't resolve label reference for ORDER BY / " 

2453 "GROUP BY / DISTINCT etc." 

2454 ) from ke 

2455 

2456 ( 

2457 with_cols, 

2458 only_froms, 

2459 only_cols, 

2460 ) = compile_state._label_resolve_dict 

2461 if within_columns_clause: 

2462 resolve_dict = only_froms 

2463 else: 

2464 resolve_dict = only_cols 

2465 

2466 # this can be None in the case that a _label_reference() 

2467 # were subject to a replacement operation, in which case 

2468 # the replacement of the Label element may have changed 

2469 # to something else like a ColumnClause expression. 

2470 order_by_elem = element.element._order_by_label_element 

2471 

2472 if ( 

2473 order_by_elem is not None 

2474 and order_by_elem.name in resolve_dict 

2475 and order_by_elem.shares_lineage( 

2476 resolve_dict[order_by_elem.name] 

2477 ) 

2478 ): 

2479 kwargs["render_label_as_label"] = ( 

2480 element.element._order_by_label_element 

2481 ) 

2482 return self.process( 

2483 element.element, 

2484 within_columns_clause=within_columns_clause, 

2485 **kwargs, 

2486 ) 

2487 

2488 def visit_textual_label_reference( 

2489 self, element, within_columns_clause=False, **kwargs 

2490 ): 

2491 if not self.stack: 

2492 # compiling the element outside of the context of a SELECT 

2493 return self.process(element._text_clause) 

2494 

2495 try: 

2496 compile_state = cast( 

2497 "Union[SelectState, CompoundSelectState]", 

2498 self.stack[-1]["compile_state"], 

2499 ) 

2500 except KeyError as ke: 

2501 coercions._no_text_coercion( 

2502 element.element, 

2503 extra=( 

2504 "Can't resolve label reference for ORDER BY / " 

2505 "GROUP BY / DISTINCT etc." 

2506 ), 

2507 exc_cls=exc.CompileError, 

2508 err=ke, 

2509 ) 

2510 

2511 with_cols, only_froms, only_cols = compile_state._label_resolve_dict 

2512 try: 

2513 if within_columns_clause: 

2514 col = only_froms[element.element] 

2515 else: 

2516 col = with_cols[element.element] 

2517 except KeyError as err: 

2518 coercions._no_text_coercion( 

2519 element.element, 

2520 extra=( 

2521 "Can't resolve label reference for ORDER BY / " 

2522 "GROUP BY / DISTINCT etc." 

2523 ), 

2524 exc_cls=exc.CompileError, 

2525 err=err, 

2526 ) 

2527 else: 

2528 kwargs["render_label_as_label"] = col 

2529 return self.process( 

2530 col, within_columns_clause=within_columns_clause, **kwargs 

2531 ) 

2532 

2533 def visit_label( 

2534 self, 

2535 label, 

2536 add_to_result_map=None, 

2537 within_label_clause=False, 

2538 within_columns_clause=False, 

2539 render_label_as_label=None, 

2540 result_map_targets=(), 

2541 **kw, 

2542 ): 

2543 # only render labels within the columns clause 

2544 # or ORDER BY clause of a select. dialect-specific compilers 

2545 # can modify this behavior. 

2546 render_label_with_as = ( 

2547 within_columns_clause and not within_label_clause 

2548 ) 

2549 render_label_only = render_label_as_label is label 

2550 

2551 if render_label_only or render_label_with_as: 

2552 if isinstance(label.name, elements._truncated_label): 

2553 labelname = self._truncated_identifier("colident", label.name) 

2554 else: 

2555 labelname = label.name 

2556 

2557 if render_label_with_as: 

2558 if add_to_result_map is not None: 

2559 add_to_result_map( 

2560 labelname, 

2561 label.name, 

2562 (label, labelname) + label._alt_names + result_map_targets, 

2563 label.type, 

2564 ) 

2565 return ( 

2566 label.element._compiler_dispatch( 

2567 self, 

2568 within_columns_clause=True, 

2569 within_label_clause=True, 

2570 **kw, 

2571 ) 

2572 + OPERATORS[operators.as_] 

2573 + self.preparer.format_label(label, labelname) 

2574 ) 

2575 elif render_label_only: 

2576 return self.preparer.format_label(label, labelname) 

2577 else: 

2578 return label.element._compiler_dispatch( 

2579 self, within_columns_clause=False, **kw 

2580 ) 

2581 

2582 def _fallback_column_name(self, column): 

2583 raise exc.CompileError( 

2584 "Cannot compile Column object until its 'name' is assigned." 

2585 ) 

2586 

2587 def visit_lambda_element(self, element, **kw): 

2588 sql_element = element._resolved 

2589 return self.process(sql_element, **kw) 

2590 

2591 def visit_column( 

2592 self, 

2593 column: ColumnClause[Any], 

2594 add_to_result_map: Optional[_ResultMapAppender] = None, 

2595 include_table: bool = True, 

2596 result_map_targets: Tuple[Any, ...] = (), 

2597 ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] = None, 

2598 **kwargs: Any, 

2599 ) -> str: 

2600 name = orig_name = column.name 

2601 if name is None: 

2602 name = self._fallback_column_name(column) 

2603 

2604 is_literal = column.is_literal 

2605 if not is_literal and isinstance(name, elements._truncated_label): 

2606 name = self._truncated_identifier("colident", name) 

2607 

2608 if add_to_result_map is not None: 

2609 targets = (column, name, column.key) + result_map_targets 

2610 if column._tq_label: 

2611 targets += (column._tq_label,) 

2612 

2613 add_to_result_map(name, orig_name, targets, column.type) 

2614 

2615 if is_literal: 

2616 # note we are not currently accommodating for 

2617 # literal_column(quoted_name('ident', True)) here 

2618 name = self.escape_literal_column(name) 

2619 else: 

2620 name = self.preparer.quote(name) 

2621 table = column.table 

2622 if table is None or not include_table or not table.named_with_column: 

2623 return name 

2624 else: 

2625 effective_schema = self.preparer.schema_for_object(table) 

2626 

2627 if effective_schema: 

2628 schema_prefix = ( 

2629 self.preparer.quote_schema(effective_schema) + "." 

2630 ) 

2631 else: 

2632 schema_prefix = "" 

2633 

2634 if TYPE_CHECKING: 

2635 assert isinstance(table, NamedFromClause) 

2636 tablename = table.name 

2637 

2638 if ( 

2639 not effective_schema 

2640 and ambiguous_table_name_map 

2641 and tablename in ambiguous_table_name_map 

2642 ): 

2643 tablename = ambiguous_table_name_map[tablename] 

2644 

2645 if isinstance(tablename, elements._truncated_label): 

2646 tablename = self._truncated_identifier("alias", tablename) 

2647 

2648 return schema_prefix + self.preparer.quote(tablename) + "." + name 

2649 

2650 def visit_collation(self, element, **kw): 

2651 return self.preparer.format_collation(element.collation) 

2652 

2653 def visit_fromclause(self, fromclause, **kwargs): 

2654 return fromclause.name 

2655 

2656 def visit_index(self, index, **kwargs): 

2657 return index.name 

2658 

2659 def visit_typeclause(self, typeclause, **kw): 

2660 kw["type_expression"] = typeclause 

2661 kw["identifier_preparer"] = self.preparer 

2662 return self.dialect.type_compiler_instance.process( 

2663 typeclause.type, **kw 

2664 ) 

2665 

2666 def post_process_text(self, text): 

2667 if self.preparer._double_percents: 

2668 text = text.replace("%", "%%") 

2669 return text 

2670 

2671 def escape_literal_column(self, text): 

2672 if self.preparer._double_percents: 

2673 text = text.replace("%", "%%") 

2674 return text 

2675 

2676 def visit_textclause(self, textclause, add_to_result_map=None, **kw): 

2677 def do_bindparam(m): 

2678 name = m.group(1) 

2679 if name in textclause._bindparams: 

2680 return self.process(textclause._bindparams[name], **kw) 

2681 else: 

2682 return self.bindparam_string(name, **kw) 

2683 

2684 if not self.stack: 

2685 self.isplaintext = True 

2686 

2687 if add_to_result_map: 

2688 # text() object is present in the columns clause of a 

2689 # select(). Add a no-name entry to the result map so that 

2690 # row[text()] produces a result 

2691 add_to_result_map(None, None, (textclause,), sqltypes.NULLTYPE) 

2692 

2693 # un-escape any \:params 

2694 return BIND_PARAMS_ESC.sub( 

2695 lambda m: m.group(1), 

2696 BIND_PARAMS.sub( 

2697 do_bindparam, self.post_process_text(textclause.text) 

2698 ), 

2699 ) 

2700 

2701 def visit_textual_select( 

2702 self, taf, compound_index=None, asfrom=False, **kw 

2703 ): 

2704 toplevel = not self.stack 

2705 entry = self._default_stack_entry if toplevel else self.stack[-1] 

2706 

2707 new_entry: _CompilerStackEntry = { 

2708 "correlate_froms": set(), 

2709 "asfrom_froms": set(), 

2710 "selectable": taf, 

2711 } 

2712 self.stack.append(new_entry) 

2713 

2714 if taf._independent_ctes: 

2715 self._dispatch_independent_ctes(taf, kw) 

2716 

2717 populate_result_map = ( 

2718 toplevel 

2719 or ( 

2720 compound_index == 0 

2721 and entry.get("need_result_map_for_compound", False) 

2722 ) 

2723 or entry.get("need_result_map_for_nested", False) 

2724 ) 

2725 

2726 if populate_result_map: 

2727 self._ordered_columns = self._textual_ordered_columns = ( 

2728 taf.positional 

2729 ) 

2730 

2731 # enable looser result column matching when the SQL text links to 

2732 # Column objects by name only 

2733 self._loose_column_name_matching = not taf.positional and bool( 

2734 taf.column_args 

2735 ) 

2736 

2737 for c in taf.column_args: 

2738 self.process( 

2739 c, 

2740 within_columns_clause=True, 

2741 add_to_result_map=self._add_to_result_map, 

2742 ) 

2743 

2744 text = self.process(taf.element, **kw) 

2745 if self.ctes: 

2746 nesting_level = len(self.stack) if not toplevel else None 

2747 text = self._render_cte_clause(nesting_level=nesting_level) + text 

2748 

2749 self.stack.pop(-1) 

2750 

2751 return text 

2752 

2753 def visit_null(self, expr: Null, **kw: Any) -> str: 

2754 return "NULL" 

2755 

2756 def visit_true(self, expr: True_, **kw: Any) -> str: 

2757 if self.dialect.supports_native_boolean: 

2758 return "true" 

2759 else: 

2760 return "1" 

2761 

2762 def visit_false(self, expr: False_, **kw: Any) -> str: 

2763 if self.dialect.supports_native_boolean: 

2764 return "false" 

2765 else: 

2766 return "0" 

2767 

2768 def _generate_delimited_list(self, elements, separator, **kw): 

2769 return separator.join( 

2770 s 

2771 for s in (c._compiler_dispatch(self, **kw) for c in elements) 

2772 if s 

2773 ) 

2774 

2775 def _generate_delimited_and_list(self, clauses, **kw): 

2776 lcc, clauses = elements.BooleanClauseList._process_clauses_for_boolean( 

2777 operators.and_, 

2778 elements.True_._singleton, 

2779 elements.False_._singleton, 

2780 clauses, 

2781 ) 

2782 if lcc == 1: 

2783 return clauses[0]._compiler_dispatch(self, **kw) 

2784 else: 

2785 separator = OPERATORS[operators.and_] 

2786 return separator.join( 

2787 s 

2788 for s in (c._compiler_dispatch(self, **kw) for c in clauses) 

2789 if s 

2790 ) 

2791 

2792 def visit_tuple(self, clauselist, **kw): 

2793 return "(%s)" % self.visit_clauselist(clauselist, **kw) 

2794 

2795 def visit_clauselist(self, clauselist, **kw): 

2796 sep = clauselist.operator 

2797 if sep is None: 

2798 sep = " " 

2799 else: 

2800 sep = OPERATORS[clauselist.operator] 

2801 

2802 return self._generate_delimited_list(clauselist.clauses, sep, **kw) 

2803 

2804 def visit_expression_clauselist(self, clauselist, **kw): 

2805 operator_ = clauselist.operator 

2806 

2807 disp = self._get_operator_dispatch( 

2808 operator_, "expression_clauselist", None 

2809 ) 

2810 if disp: 

2811 return disp(clauselist, operator_, **kw) 

2812 

2813 try: 

2814 opstring = OPERATORS[operator_] 

2815 except KeyError as err: 

2816 raise exc.UnsupportedCompilationError(self, operator_) from err 

2817 else: 

2818 kw["_in_operator_expression"] = True 

2819 return self._generate_delimited_list( 

2820 clauselist.clauses, opstring, **kw 

2821 ) 

2822 

2823 def visit_case(self, clause, **kwargs): 

2824 x = "CASE " 

2825 if clause.value is not None: 

2826 x += clause.value._compiler_dispatch(self, **kwargs) + " " 

2827 for cond, result in clause.whens: 

2828 x += ( 

2829 "WHEN " 

2830 + cond._compiler_dispatch(self, **kwargs) 

2831 + " THEN " 

2832 + result._compiler_dispatch(self, **kwargs) 

2833 + " " 

2834 ) 

2835 if clause.else_ is not None: 

2836 x += ( 

2837 "ELSE " + clause.else_._compiler_dispatch(self, **kwargs) + " " 

2838 ) 

2839 x += "END" 

2840 return x 

2841 

2842 def visit_type_coerce(self, type_coerce, **kw): 

2843 return type_coerce.typed_expression._compiler_dispatch(self, **kw) 

2844 

2845 def visit_cast(self, cast, **kwargs): 

2846 type_clause = cast.typeclause._compiler_dispatch(self, **kwargs) 

2847 match = re.match("(.*)( COLLATE .*)", type_clause) 

2848 return "CAST(%s AS %s)%s" % ( 

2849 cast.clause._compiler_dispatch(self, **kwargs), 

2850 match.group(1) if match else type_clause, 

2851 match.group(2) if match else "", 

2852 ) 

2853 

2854 def _format_frame_clause(self, range_, **kw): 

2855 return "%s AND %s" % ( 

2856 ( 

2857 "UNBOUNDED PRECEDING" 

2858 if range_[0] is elements.RANGE_UNBOUNDED 

2859 else ( 

2860 "CURRENT ROW" 

2861 if range_[0] is elements.RANGE_CURRENT 

2862 else ( 

2863 "%s PRECEDING" 

2864 % ( 

2865 self.process( 

2866 elements.literal(abs(range_[0])), **kw 

2867 ), 

2868 ) 

2869 if range_[0] < 0 

2870 else "%s FOLLOWING" 

2871 % (self.process(elements.literal(range_[0]), **kw),) 

2872 ) 

2873 ) 

2874 ), 

2875 ( 

2876 "UNBOUNDED FOLLOWING" 

2877 if range_[1] is elements.RANGE_UNBOUNDED 

2878 else ( 

2879 "CURRENT ROW" 

2880 if range_[1] is elements.RANGE_CURRENT 

2881 else ( 

2882 "%s PRECEDING" 

2883 % ( 

2884 self.process( 

2885 elements.literal(abs(range_[1])), **kw 

2886 ), 

2887 ) 

2888 if range_[1] < 0 

2889 else "%s FOLLOWING" 

2890 % (self.process(elements.literal(range_[1]), **kw),) 

2891 ) 

2892 ) 

2893 ), 

2894 ) 

2895 

2896 def visit_over(self, over, **kwargs): 

2897 text = over.element._compiler_dispatch(self, **kwargs) 

2898 if over.range_ is not None: 

2899 range_ = "RANGE BETWEEN %s" % self._format_frame_clause( 

2900 over.range_, **kwargs 

2901 ) 

2902 elif over.rows is not None: 

2903 range_ = "ROWS BETWEEN %s" % self._format_frame_clause( 

2904 over.rows, **kwargs 

2905 ) 

2906 elif over.groups is not None: 

2907 range_ = "GROUPS BETWEEN %s" % self._format_frame_clause( 

2908 over.groups, **kwargs 

2909 ) 

2910 else: 

2911 range_ = None 

2912 

2913 return "%s OVER (%s)" % ( 

2914 text, 

2915 " ".join( 

2916 [ 

2917 "%s BY %s" 

2918 % (word, clause._compiler_dispatch(self, **kwargs)) 

2919 for word, clause in ( 

2920 ("PARTITION", over.partition_by), 

2921 ("ORDER", over.order_by), 

2922 ) 

2923 if clause is not None and len(clause) 

2924 ] 

2925 + ([range_] if range_ else []) 

2926 ), 

2927 ) 

2928 

2929 def visit_withingroup(self, withingroup, **kwargs): 

2930 return "%s WITHIN GROUP (ORDER BY %s)" % ( 

2931 withingroup.element._compiler_dispatch(self, **kwargs), 

2932 withingroup.order_by._compiler_dispatch(self, **kwargs), 

2933 ) 

2934 

2935 def visit_funcfilter(self, funcfilter, **kwargs): 

2936 return "%s FILTER (WHERE %s)" % ( 

2937 funcfilter.func._compiler_dispatch(self, **kwargs), 

2938 funcfilter.criterion._compiler_dispatch(self, **kwargs), 

2939 ) 

2940 

2941 def visit_extract(self, extract, **kwargs): 

2942 field = self.extract_map.get(extract.field, extract.field) 

2943 return "EXTRACT(%s FROM %s)" % ( 

2944 field, 

2945 extract.expr._compiler_dispatch(self, **kwargs), 

2946 ) 

2947 

2948 def visit_scalar_function_column(self, element, **kw): 

2949 compiled_fn = self.visit_function(element.fn, **kw) 

2950 compiled_col = self.visit_column(element, **kw) 

2951 return "(%s).%s" % (compiled_fn, compiled_col) 

2952 

2953 def visit_function( 

2954 self, 

2955 func: Function[Any], 

2956 add_to_result_map: Optional[_ResultMapAppender] = None, 

2957 **kwargs: Any, 

2958 ) -> str: 

2959 if add_to_result_map is not None: 

2960 add_to_result_map(func.name, func.name, (func.name,), func.type) 

2961 

2962 disp = getattr(self, "visit_%s_func" % func.name.lower(), None) 

2963 

2964 text: str 

2965 

2966 if disp: 

2967 text = disp(func, **kwargs) 

2968 else: 

2969 name = FUNCTIONS.get(func._deannotate().__class__, None) 

2970 if name: 

2971 if func._has_args: 

2972 name += "%(expr)s" 

2973 else: 

2974 name = func.name 

2975 name = ( 

2976 self.preparer.quote(name) 

2977 if self.preparer._requires_quotes_illegal_chars(name) 

2978 or isinstance(name, elements.quoted_name) 

2979 else name 

2980 ) 

2981 name = name + "%(expr)s" 

2982 text = ".".join( 

2983 [ 

2984 ( 

2985 self.preparer.quote(tok) 

2986 if self.preparer._requires_quotes_illegal_chars(tok) 

2987 or isinstance(name, elements.quoted_name) 

2988 else tok 

2989 ) 

2990 for tok in func.packagenames 

2991 ] 

2992 + [name] 

2993 ) % {"expr": self.function_argspec(func, **kwargs)} 

2994 

2995 if func._with_ordinality: 

2996 text += " WITH ORDINALITY" 

2997 return text 

2998 

2999 def visit_next_value_func(self, next_value, **kw): 

3000 return self.visit_sequence(next_value.sequence) 

3001 

3002 def visit_sequence(self, sequence, **kw): 

3003 raise NotImplementedError( 

3004 "Dialect '%s' does not support sequence increments." 

3005 % self.dialect.name 

3006 ) 

3007 

3008 def function_argspec(self, func: Function[Any], **kwargs: Any) -> str: 

3009 return func.clause_expr._compiler_dispatch(self, **kwargs) 

3010 

3011 def visit_compound_select( 

3012 self, cs, asfrom=False, compound_index=None, **kwargs 

3013 ): 

3014 toplevel = not self.stack 

3015 

3016 compile_state = cs._compile_state_factory(cs, self, **kwargs) 

3017 

3018 if toplevel and not self.compile_state: 

3019 self.compile_state = compile_state 

3020 

3021 compound_stmt = compile_state.statement 

3022 

3023 entry = self._default_stack_entry if toplevel else self.stack[-1] 

3024 need_result_map = toplevel or ( 

3025 not compound_index 

3026 and entry.get("need_result_map_for_compound", False) 

3027 ) 

3028 

3029 # indicates there is already a CompoundSelect in play 

3030 if compound_index == 0: 

3031 entry["select_0"] = cs 

3032 

3033 self.stack.append( 

3034 { 

3035 "correlate_froms": entry["correlate_froms"], 

3036 "asfrom_froms": entry["asfrom_froms"], 

3037 "selectable": cs, 

3038 "compile_state": compile_state, 

3039 "need_result_map_for_compound": need_result_map, 

3040 } 

3041 ) 

3042 

3043 if compound_stmt._independent_ctes: 

3044 self._dispatch_independent_ctes(compound_stmt, kwargs) 

3045 

3046 keyword = self.compound_keywords[cs.keyword] 

3047 

3048 text = (" " + keyword + " ").join( 

3049 ( 

3050 c._compiler_dispatch( 

3051 self, asfrom=asfrom, compound_index=i, **kwargs 

3052 ) 

3053 for i, c in enumerate(cs.selects) 

3054 ) 

3055 ) 

3056 

3057 kwargs["include_table"] = False 

3058 text += self.group_by_clause(cs, **dict(asfrom=asfrom, **kwargs)) 

3059 text += self.order_by_clause(cs, **kwargs) 

3060 if cs._has_row_limiting_clause: 

3061 text += self._row_limit_clause(cs, **kwargs) 

3062 

3063 if self.ctes: 

3064 nesting_level = len(self.stack) if not toplevel else None 

3065 text = ( 

3066 self._render_cte_clause( 

3067 nesting_level=nesting_level, 

3068 include_following_stack=True, 

3069 ) 

3070 + text 

3071 ) 

3072 

3073 self.stack.pop(-1) 

3074 return text 

3075 

3076 def _row_limit_clause(self, cs, **kwargs): 

3077 if cs._fetch_clause is not None: 

3078 return self.fetch_clause(cs, **kwargs) 

3079 else: 

3080 return self.limit_clause(cs, **kwargs) 

3081 

3082 def _get_operator_dispatch(self, operator_, qualifier1, qualifier2): 

3083 attrname = "visit_%s_%s%s" % ( 

3084 operator_.__name__, 

3085 qualifier1, 

3086 "_" + qualifier2 if qualifier2 else "", 

3087 ) 

3088 return getattr(self, attrname, None) 

3089 

3090 def visit_unary( 

3091 self, unary, add_to_result_map=None, result_map_targets=(), **kw 

3092 ): 

3093 if add_to_result_map is not None: 

3094 result_map_targets += (unary,) 

3095 kw["add_to_result_map"] = add_to_result_map 

3096 kw["result_map_targets"] = result_map_targets 

3097 

3098 if unary.operator: 

3099 if unary.modifier: 

3100 raise exc.CompileError( 

3101 "Unary expression does not support operator " 

3102 "and modifier simultaneously" 

3103 ) 

3104 disp = self._get_operator_dispatch( 

3105 unary.operator, "unary", "operator" 

3106 ) 

3107 if disp: 

3108 return disp(unary, unary.operator, **kw) 

3109 else: 

3110 return self._generate_generic_unary_operator( 

3111 unary, OPERATORS[unary.operator], **kw 

3112 ) 

3113 elif unary.modifier: 

3114 disp = self._get_operator_dispatch( 

3115 unary.modifier, "unary", "modifier" 

3116 ) 

3117 if disp: 

3118 return disp(unary, unary.modifier, **kw) 

3119 else: 

3120 return self._generate_generic_unary_modifier( 

3121 unary, OPERATORS[unary.modifier], **kw 

3122 ) 

3123 else: 

3124 raise exc.CompileError( 

3125 "Unary expression has no operator or modifier" 

3126 ) 

3127 

3128 def visit_truediv_binary(self, binary, operator, **kw): 

3129 if self.dialect.div_is_floordiv: 

3130 return ( 

3131 self.process(binary.left, **kw) 

3132 + " / " 

3133 # TODO: would need a fast cast again here, 

3134 # unless we want to use an implicit cast like "+ 0.0" 

3135 + self.process( 

3136 elements.Cast( 

3137 binary.right, 

3138 ( 

3139 binary.right.type 

3140 if binary.right.type._type_affinity 

3141 is sqltypes.Numeric 

3142 else sqltypes.Numeric() 

3143 ), 

3144 ), 

3145 **kw, 

3146 ) 

3147 ) 

3148 else: 

3149 return ( 

3150 self.process(binary.left, **kw) 

3151 + " / " 

3152 + self.process(binary.right, **kw) 

3153 ) 

3154 

3155 def visit_floordiv_binary(self, binary, operator, **kw): 

3156 if ( 

3157 self.dialect.div_is_floordiv 

3158 and binary.right.type._type_affinity is sqltypes.Integer 

3159 ): 

3160 return ( 

3161 self.process(binary.left, **kw) 

3162 + " / " 

3163 + self.process(binary.right, **kw) 

3164 ) 

3165 else: 

3166 return "FLOOR(%s)" % ( 

3167 self.process(binary.left, **kw) 

3168 + " / " 

3169 + self.process(binary.right, **kw) 

3170 ) 

3171 

3172 def visit_is_true_unary_operator(self, element, operator, **kw): 

3173 if ( 

3174 element._is_implicitly_boolean 

3175 or self.dialect.supports_native_boolean 

3176 ): 

3177 return self.process(element.element, **kw) 

3178 else: 

3179 return "%s = 1" % self.process(element.element, **kw) 

3180 

3181 def visit_is_false_unary_operator(self, element, operator, **kw): 

3182 if ( 

3183 element._is_implicitly_boolean 

3184 or self.dialect.supports_native_boolean 

3185 ): 

3186 return "NOT %s" % self.process(element.element, **kw) 

3187 else: 

3188 return "%s = 0" % self.process(element.element, **kw) 

3189 

3190 def visit_not_match_op_binary(self, binary, operator, **kw): 

3191 return "NOT %s" % self.visit_binary( 

3192 binary, override_operator=operators.match_op 

3193 ) 

3194 

3195 def visit_not_in_op_binary(self, binary, operator, **kw): 

3196 # The brackets are required in the NOT IN operation because the empty 

3197 # case is handled using the form "(col NOT IN (null) OR 1 = 1)". 

3198 # The presence of the OR makes the brackets required. 

3199 return "(%s)" % self._generate_generic_binary( 

3200 binary, OPERATORS[operator], **kw 

3201 ) 

3202 

3203 def visit_empty_set_op_expr(self, type_, expand_op, **kw): 

3204 if expand_op is operators.not_in_op: 

3205 if len(type_) > 1: 

3206 return "(%s)) OR (1 = 1" % ( 

3207 ", ".join("NULL" for element in type_) 

3208 ) 

3209 else: 

3210 return "NULL) OR (1 = 1" 

3211 elif expand_op is operators.in_op: 

3212 if len(type_) > 1: 

3213 return "(%s)) AND (1 != 1" % ( 

3214 ", ".join("NULL" for element in type_) 

3215 ) 

3216 else: 

3217 return "NULL) AND (1 != 1" 

3218 else: 

3219 return self.visit_empty_set_expr(type_) 

3220 

3221 def visit_empty_set_expr(self, element_types, **kw): 

3222 raise NotImplementedError( 

3223 "Dialect '%s' does not support empty set expression." 

3224 % self.dialect.name 

3225 ) 

3226 

3227 def _literal_execute_expanding_parameter_literal_binds( 

3228 self, parameter, values, bind_expression_template=None 

3229 ): 

3230 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(self.dialect) 

3231 

3232 if not values: 

3233 # empty IN expression. note we don't need to use 

3234 # bind_expression_template here because there are no 

3235 # expressions to render. 

3236 

3237 if typ_dialect_impl._is_tuple_type: 

3238 replacement_expression = ( 

3239 "VALUES " if self.dialect.tuple_in_values else "" 

3240 ) + self.visit_empty_set_op_expr( 

3241 parameter.type.types, parameter.expand_op 

3242 ) 

3243 

3244 else: 

3245 replacement_expression = self.visit_empty_set_op_expr( 

3246 [parameter.type], parameter.expand_op 

3247 ) 

3248 

3249 elif typ_dialect_impl._is_tuple_type or ( 

3250 typ_dialect_impl._isnull 

3251 and isinstance(values[0], collections_abc.Sequence) 

3252 and not isinstance(values[0], (str, bytes)) 

3253 ): 

3254 if typ_dialect_impl._has_bind_expression: 

3255 raise NotImplementedError( 

3256 "bind_expression() on TupleType not supported with " 

3257 "literal_binds" 

3258 ) 

3259 

3260 replacement_expression = ( 

3261 "VALUES " if self.dialect.tuple_in_values else "" 

3262 ) + ", ".join( 

3263 "(%s)" 

3264 % ( 

3265 ", ".join( 

3266 self.render_literal_value(value, param_type) 

3267 for value, param_type in zip( 

3268 tuple_element, parameter.type.types 

3269 ) 

3270 ) 

3271 ) 

3272 for i, tuple_element in enumerate(values) 

3273 ) 

3274 else: 

3275 if bind_expression_template: 

3276 post_compile_pattern = self._post_compile_pattern 

3277 m = post_compile_pattern.search(bind_expression_template) 

3278 assert m and m.group( 

3279 2 

3280 ), "unexpected format for expanding parameter" 

3281 

3282 tok = m.group(2).split("~~") 

3283 be_left, be_right = tok[1], tok[3] 

3284 replacement_expression = ", ".join( 

3285 "%s%s%s" 

3286 % ( 

3287 be_left, 

3288 self.render_literal_value(value, parameter.type), 

3289 be_right, 

3290 ) 

3291 for value in values 

3292 ) 

3293 else: 

3294 replacement_expression = ", ".join( 

3295 self.render_literal_value(value, parameter.type) 

3296 for value in values 

3297 ) 

3298 

3299 return (), replacement_expression 

3300 

3301 def _literal_execute_expanding_parameter(self, name, parameter, values): 

3302 if parameter.literal_execute: 

3303 return self._literal_execute_expanding_parameter_literal_binds( 

3304 parameter, values 

3305 ) 

3306 

3307 dialect = self.dialect 

3308 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(dialect) 

3309 

3310 if self._numeric_binds: 

3311 bind_template = self.compilation_bindtemplate 

3312 else: 

3313 bind_template = self.bindtemplate 

3314 

3315 if ( 

3316 self.dialect._bind_typing_render_casts 

3317 and typ_dialect_impl.render_bind_cast 

3318 ): 

3319 

3320 def _render_bindtemplate(name): 

3321 return self.render_bind_cast( 

3322 parameter.type, 

3323 typ_dialect_impl, 

3324 bind_template % {"name": name}, 

3325 ) 

3326 

3327 else: 

3328 

3329 def _render_bindtemplate(name): 

3330 return bind_template % {"name": name} 

3331 

3332 if not values: 

3333 to_update = [] 

3334 if typ_dialect_impl._is_tuple_type: 

3335 replacement_expression = self.visit_empty_set_op_expr( 

3336 parameter.type.types, parameter.expand_op 

3337 ) 

3338 else: 

3339 replacement_expression = self.visit_empty_set_op_expr( 

3340 [parameter.type], parameter.expand_op 

3341 ) 

3342 

3343 elif typ_dialect_impl._is_tuple_type or ( 

3344 typ_dialect_impl._isnull 

3345 and isinstance(values[0], collections_abc.Sequence) 

3346 and not isinstance(values[0], (str, bytes)) 

3347 ): 

3348 assert not typ_dialect_impl._is_array 

3349 to_update = [ 

3350 ("%s_%s_%s" % (name, i, j), value) 

3351 for i, tuple_element in enumerate(values, 1) 

3352 for j, value in enumerate(tuple_element, 1) 

3353 ] 

3354 

3355 replacement_expression = ( 

3356 "VALUES " if dialect.tuple_in_values else "" 

3357 ) + ", ".join( 

3358 "(%s)" 

3359 % ( 

3360 ", ".join( 

3361 _render_bindtemplate( 

3362 to_update[i * len(tuple_element) + j][0] 

3363 ) 

3364 for j, value in enumerate(tuple_element) 

3365 ) 

3366 ) 

3367 for i, tuple_element in enumerate(values) 

3368 ) 

3369 else: 

3370 to_update = [ 

3371 ("%s_%s" % (name, i), value) 

3372 for i, value in enumerate(values, 1) 

3373 ] 

3374 replacement_expression = ", ".join( 

3375 _render_bindtemplate(key) for key, value in to_update 

3376 ) 

3377 

3378 return to_update, replacement_expression 

3379 

3380 def visit_binary( 

3381 self, 

3382 binary, 

3383 override_operator=None, 

3384 eager_grouping=False, 

3385 from_linter=None, 

3386 lateral_from_linter=None, 

3387 **kw, 

3388 ): 

3389 if from_linter and operators.is_comparison(binary.operator): 

3390 if lateral_from_linter is not None: 

3391 enclosing_lateral = kw["enclosing_lateral"] 

3392 lateral_from_linter.edges.update( 

3393 itertools.product( 

3394 _de_clone( 

3395 binary.left._from_objects + [enclosing_lateral] 

3396 ), 

3397 _de_clone( 

3398 binary.right._from_objects + [enclosing_lateral] 

3399 ), 

3400 ) 

3401 ) 

3402 else: 

3403 from_linter.edges.update( 

3404 itertools.product( 

3405 _de_clone(binary.left._from_objects), 

3406 _de_clone(binary.right._from_objects), 

3407 ) 

3408 ) 

3409 

3410 # don't allow "? = ?" to render 

3411 if ( 

3412 self.ansi_bind_rules 

3413 and isinstance(binary.left, elements.BindParameter) 

3414 and isinstance(binary.right, elements.BindParameter) 

3415 ): 

3416 kw["literal_execute"] = True 

3417 

3418 operator_ = override_operator or binary.operator 

3419 disp = self._get_operator_dispatch(operator_, "binary", None) 

3420 if disp: 

3421 return disp(binary, operator_, **kw) 

3422 else: 

3423 try: 

3424 opstring = OPERATORS[operator_] 

3425 except KeyError as err: 

3426 raise exc.UnsupportedCompilationError(self, operator_) from err 

3427 else: 

3428 return self._generate_generic_binary( 

3429 binary, 

3430 opstring, 

3431 from_linter=from_linter, 

3432 lateral_from_linter=lateral_from_linter, 

3433 **kw, 

3434 ) 

3435 

3436 def visit_function_as_comparison_op_binary(self, element, operator, **kw): 

3437 return self.process(element.sql_function, **kw) 

3438 

3439 def visit_mod_binary(self, binary, operator, **kw): 

3440 if self.preparer._double_percents: 

3441 return ( 

3442 self.process(binary.left, **kw) 

3443 + " %% " 

3444 + self.process(binary.right, **kw) 

3445 ) 

3446 else: 

3447 return ( 

3448 self.process(binary.left, **kw) 

3449 + " % " 

3450 + self.process(binary.right, **kw) 

3451 ) 

3452 

3453 def visit_custom_op_binary(self, element, operator, **kw): 

3454 kw["eager_grouping"] = operator.eager_grouping 

3455 return self._generate_generic_binary( 

3456 element, 

3457 " " + self.escape_literal_column(operator.opstring) + " ", 

3458 **kw, 

3459 ) 

3460 

3461 def visit_custom_op_unary_operator(self, element, operator, **kw): 

3462 return self._generate_generic_unary_operator( 

3463 element, self.escape_literal_column(operator.opstring) + " ", **kw 

3464 ) 

3465 

3466 def visit_custom_op_unary_modifier(self, element, operator, **kw): 

3467 return self._generate_generic_unary_modifier( 

3468 element, " " + self.escape_literal_column(operator.opstring), **kw 

3469 ) 

3470 

3471 def _generate_generic_binary( 

3472 self, 

3473 binary: BinaryExpression[Any], 

3474 opstring: str, 

3475 eager_grouping: bool = False, 

3476 **kw: Any, 

3477 ) -> str: 

3478 _in_operator_expression = kw.get("_in_operator_expression", False) 

3479 

3480 kw["_in_operator_expression"] = True 

3481 kw["_binary_op"] = binary.operator 

3482 text = ( 

3483 binary.left._compiler_dispatch( 

3484 self, eager_grouping=eager_grouping, **kw 

3485 ) 

3486 + opstring 

3487 + binary.right._compiler_dispatch( 

3488 self, eager_grouping=eager_grouping, **kw 

3489 ) 

3490 ) 

3491 

3492 if _in_operator_expression and eager_grouping: 

3493 text = "(%s)" % text 

3494 return text 

3495 

3496 def _generate_generic_unary_operator(self, unary, opstring, **kw): 

3497 return opstring + unary.element._compiler_dispatch(self, **kw) 

3498 

3499 def _generate_generic_unary_modifier(self, unary, opstring, **kw): 

3500 return unary.element._compiler_dispatch(self, **kw) + opstring 

3501 

3502 @util.memoized_property 

3503 def _like_percent_literal(self): 

3504 return elements.literal_column("'%'", type_=sqltypes.STRINGTYPE) 

3505 

3506 def visit_ilike_case_insensitive_operand(self, element, **kw): 

3507 return f"lower({element.element._compiler_dispatch(self, **kw)})" 

3508 

3509 def visit_contains_op_binary(self, binary, operator, **kw): 

3510 binary = binary._clone() 

3511 percent = self._like_percent_literal 

3512 binary.right = percent.concat(binary.right).concat(percent) 

3513 return self.visit_like_op_binary(binary, operator, **kw) 

3514 

3515 def visit_not_contains_op_binary(self, binary, operator, **kw): 

3516 binary = binary._clone() 

3517 percent = self._like_percent_literal 

3518 binary.right = percent.concat(binary.right).concat(percent) 

3519 return self.visit_not_like_op_binary(binary, operator, **kw) 

3520 

3521 def visit_icontains_op_binary(self, binary, operator, **kw): 

3522 binary = binary._clone() 

3523 percent = self._like_percent_literal 

3524 binary.left = ilike_case_insensitive(binary.left) 

3525 binary.right = percent.concat( 

3526 ilike_case_insensitive(binary.right) 

3527 ).concat(percent) 

3528 return self.visit_ilike_op_binary(binary, operator, **kw) 

3529 

3530 def visit_not_icontains_op_binary(self, binary, operator, **kw): 

3531 binary = binary._clone() 

3532 percent = self._like_percent_literal 

3533 binary.left = ilike_case_insensitive(binary.left) 

3534 binary.right = percent.concat( 

3535 ilike_case_insensitive(binary.right) 

3536 ).concat(percent) 

3537 return self.visit_not_ilike_op_binary(binary, operator, **kw) 

3538 

3539 def visit_startswith_op_binary(self, binary, operator, **kw): 

3540 binary = binary._clone() 

3541 percent = self._like_percent_literal 

3542 binary.right = percent._rconcat(binary.right) 

3543 return self.visit_like_op_binary(binary, operator, **kw) 

3544 

3545 def visit_not_startswith_op_binary(self, binary, operator, **kw): 

3546 binary = binary._clone() 

3547 percent = self._like_percent_literal 

3548 binary.right = percent._rconcat(binary.right) 

3549 return self.visit_not_like_op_binary(binary, operator, **kw) 

3550 

3551 def visit_istartswith_op_binary(self, binary, operator, **kw): 

3552 binary = binary._clone() 

3553 percent = self._like_percent_literal 

3554 binary.left = ilike_case_insensitive(binary.left) 

3555 binary.right = percent._rconcat(ilike_case_insensitive(binary.right)) 

3556 return self.visit_ilike_op_binary(binary, operator, **kw) 

3557 

3558 def visit_not_istartswith_op_binary(self, binary, operator, **kw): 

3559 binary = binary._clone() 

3560 percent = self._like_percent_literal 

3561 binary.left = ilike_case_insensitive(binary.left) 

3562 binary.right = percent._rconcat(ilike_case_insensitive(binary.right)) 

3563 return self.visit_not_ilike_op_binary(binary, operator, **kw) 

3564 

3565 def visit_endswith_op_binary(self, binary, operator, **kw): 

3566 binary = binary._clone() 

3567 percent = self._like_percent_literal 

3568 binary.right = percent.concat(binary.right) 

3569 return self.visit_like_op_binary(binary, operator, **kw) 

3570 

3571 def visit_not_endswith_op_binary(self, binary, operator, **kw): 

3572 binary = binary._clone() 

3573 percent = self._like_percent_literal 

3574 binary.right = percent.concat(binary.right) 

3575 return self.visit_not_like_op_binary(binary, operator, **kw) 

3576 

3577 def visit_iendswith_op_binary(self, binary, operator, **kw): 

3578 binary = binary._clone() 

3579 percent = self._like_percent_literal 

3580 binary.left = ilike_case_insensitive(binary.left) 

3581 binary.right = percent.concat(ilike_case_insensitive(binary.right)) 

3582 return self.visit_ilike_op_binary(binary, operator, **kw) 

3583 

3584 def visit_not_iendswith_op_binary(self, binary, operator, **kw): 

3585 binary = binary._clone() 

3586 percent = self._like_percent_literal 

3587 binary.left = ilike_case_insensitive(binary.left) 

3588 binary.right = percent.concat(ilike_case_insensitive(binary.right)) 

3589 return self.visit_not_ilike_op_binary(binary, operator, **kw) 

3590 

3591 def visit_like_op_binary(self, binary, operator, **kw): 

3592 escape = binary.modifiers.get("escape", None) 

3593 

3594 return "%s LIKE %s" % ( 

3595 binary.left._compiler_dispatch(self, **kw), 

3596 binary.right._compiler_dispatch(self, **kw), 

3597 ) + ( 

3598 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) 

3599 if escape is not None 

3600 else "" 

3601 ) 

3602 

3603 def visit_not_like_op_binary(self, binary, operator, **kw): 

3604 escape = binary.modifiers.get("escape", None) 

3605 return "%s NOT LIKE %s" % ( 

3606 binary.left._compiler_dispatch(self, **kw), 

3607 binary.right._compiler_dispatch(self, **kw), 

3608 ) + ( 

3609 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) 

3610 if escape is not None 

3611 else "" 

3612 ) 

3613 

3614 def visit_ilike_op_binary(self, binary, operator, **kw): 

3615 if operator is operators.ilike_op: 

3616 binary = binary._clone() 

3617 binary.left = ilike_case_insensitive(binary.left) 

3618 binary.right = ilike_case_insensitive(binary.right) 

3619 # else we assume ilower() has been applied 

3620 

3621 return self.visit_like_op_binary(binary, operator, **kw) 

3622 

3623 def visit_not_ilike_op_binary(self, binary, operator, **kw): 

3624 if operator is operators.not_ilike_op: 

3625 binary = binary._clone() 

3626 binary.left = ilike_case_insensitive(binary.left) 

3627 binary.right = ilike_case_insensitive(binary.right) 

3628 # else we assume ilower() has been applied 

3629 

3630 return self.visit_not_like_op_binary(binary, operator, **kw) 

3631 

3632 def visit_between_op_binary(self, binary, operator, **kw): 

3633 symmetric = binary.modifiers.get("symmetric", False) 

3634 return self._generate_generic_binary( 

3635 binary, " BETWEEN SYMMETRIC " if symmetric else " BETWEEN ", **kw 

3636 ) 

3637 

3638 def visit_not_between_op_binary(self, binary, operator, **kw): 

3639 symmetric = binary.modifiers.get("symmetric", False) 

3640 return self._generate_generic_binary( 

3641 binary, 

3642 " NOT BETWEEN SYMMETRIC " if symmetric else " NOT BETWEEN ", 

3643 **kw, 

3644 ) 

3645 

3646 def visit_regexp_match_op_binary( 

3647 self, binary: BinaryExpression[Any], operator: Any, **kw: Any 

3648 ) -> str: 

3649 raise exc.CompileError( 

3650 "%s dialect does not support regular expressions" 

3651 % self.dialect.name 

3652 ) 

3653 

3654 def visit_not_regexp_match_op_binary( 

3655 self, binary: BinaryExpression[Any], operator: Any, **kw: Any 

3656 ) -> str: 

3657 raise exc.CompileError( 

3658 "%s dialect does not support regular expressions" 

3659 % self.dialect.name 

3660 ) 

3661 

3662 def visit_regexp_replace_op_binary( 

3663 self, binary: BinaryExpression[Any], operator: Any, **kw: Any 

3664 ) -> str: 

3665 raise exc.CompileError( 

3666 "%s dialect does not support regular expression replacements" 

3667 % self.dialect.name 

3668 ) 

3669 

3670 def visit_bindparam( 

3671 self, 

3672 bindparam, 

3673 within_columns_clause=False, 

3674 literal_binds=False, 

3675 skip_bind_expression=False, 

3676 literal_execute=False, 

3677 render_postcompile=False, 

3678 **kwargs, 

3679 ): 

3680 

3681 if not skip_bind_expression: 

3682 impl = bindparam.type.dialect_impl(self.dialect) 

3683 if impl._has_bind_expression: 

3684 bind_expression = impl.bind_expression(bindparam) 

3685 wrapped = self.process( 

3686 bind_expression, 

3687 skip_bind_expression=True, 

3688 within_columns_clause=within_columns_clause, 

3689 literal_binds=literal_binds and not bindparam.expanding, 

3690 literal_execute=literal_execute, 

3691 render_postcompile=render_postcompile, 

3692 **kwargs, 

3693 ) 

3694 if bindparam.expanding: 

3695 # for postcompile w/ expanding, move the "wrapped" part 

3696 # of this into the inside 

3697 

3698 m = re.match( 

3699 r"^(.*)\(__\[POSTCOMPILE_(\S+?)\]\)(.*)$", wrapped 

3700 ) 

3701 assert m, "unexpected format for expanding parameter" 

3702 wrapped = "(__[POSTCOMPILE_%s~~%s~~REPL~~%s~~])" % ( 

3703 m.group(2), 

3704 m.group(1), 

3705 m.group(3), 

3706 ) 

3707 

3708 if literal_binds: 

3709 ret = self.render_literal_bindparam( 

3710 bindparam, 

3711 within_columns_clause=True, 

3712 bind_expression_template=wrapped, 

3713 **kwargs, 

3714 ) 

3715 return "(%s)" % ret 

3716 

3717 return wrapped 

3718 

3719 if not literal_binds: 

3720 literal_execute = ( 

3721 literal_execute 

3722 or bindparam.literal_execute 

3723 or (within_columns_clause and self.ansi_bind_rules) 

3724 ) 

3725 post_compile = literal_execute or bindparam.expanding 

3726 else: 

3727 post_compile = False 

3728 

3729 if literal_binds: 

3730 ret = self.render_literal_bindparam( 

3731 bindparam, within_columns_clause=True, **kwargs 

3732 ) 

3733 if bindparam.expanding: 

3734 ret = "(%s)" % ret 

3735 return ret 

3736 

3737 name = self._truncate_bindparam(bindparam) 

3738 

3739 if name in self.binds: 

3740 existing = self.binds[name] 

3741 if existing is not bindparam: 

3742 if ( 

3743 (existing.unique or bindparam.unique) 

3744 and not existing.proxy_set.intersection( 

3745 bindparam.proxy_set 

3746 ) 

3747 and not existing._cloned_set.intersection( 

3748 bindparam._cloned_set 

3749 ) 

3750 ): 

3751 raise exc.CompileError( 

3752 "Bind parameter '%s' conflicts with " 

3753 "unique bind parameter of the same name" % name 

3754 ) 

3755 elif existing.expanding != bindparam.expanding: 

3756 raise exc.CompileError( 

3757 "Can't reuse bound parameter name '%s' in both " 

3758 "'expanding' (e.g. within an IN expression) and " 

3759 "non-expanding contexts. If this parameter is to " 

3760 "receive a list/array value, set 'expanding=True' on " 

3761 "it for expressions that aren't IN, otherwise use " 

3762 "a different parameter name." % (name,) 

3763 ) 

3764 elif existing._is_crud or bindparam._is_crud: 

3765 if existing._is_crud and bindparam._is_crud: 

3766 # TODO: this condition is not well understood. 

3767 # see tests in test/sql/test_update.py 

3768 raise exc.CompileError( 

3769 "Encountered unsupported case when compiling an " 

3770 "INSERT or UPDATE statement. If this is a " 

3771 "multi-table " 

3772 "UPDATE statement, please provide string-named " 

3773 "arguments to the " 

3774 "values() method with distinct names; support for " 

3775 "multi-table UPDATE statements that " 

3776 "target multiple tables for UPDATE is very " 

3777 "limited", 

3778 ) 

3779 else: 

3780 raise exc.CompileError( 

3781 f"bindparam() name '{bindparam.key}' is reserved " 

3782 "for automatic usage in the VALUES or SET " 

3783 "clause of this " 

3784 "insert/update statement. Please use a " 

3785 "name other than column name when using " 

3786 "bindparam() " 

3787 "with insert() or update() (for example, " 

3788 f"'b_{bindparam.key}')." 

3789 ) 

3790 

3791 self.binds[bindparam.key] = self.binds[name] = bindparam 

3792 

3793 # if we are given a cache key that we're going to match against, 

3794 # relate the bindparam here to one that is most likely present 

3795 # in the "extracted params" portion of the cache key. this is used 

3796 # to set up a positional mapping that is used to determine the 

3797 # correct parameters for a subsequent use of this compiled with 

3798 # a different set of parameter values. here, we accommodate for 

3799 # parameters that may have been cloned both before and after the cache 

3800 # key was been generated. 

3801 ckbm_tuple = self._cache_key_bind_match 

3802 

3803 if ckbm_tuple: 

3804 ckbm, cksm = ckbm_tuple 

3805 for bp in bindparam._cloned_set: 

3806 if bp.key in cksm: 

3807 cb = cksm[bp.key] 

3808 ckbm[cb].append(bindparam) 

3809 

3810 if bindparam.isoutparam: 

3811 self.has_out_parameters = True 

3812 

3813 if post_compile: 

3814 if render_postcompile: 

3815 self._render_postcompile = True 

3816 

3817 if literal_execute: 

3818 self.literal_execute_params |= {bindparam} 

3819 else: 

3820 self.post_compile_params |= {bindparam} 

3821 

3822 ret = self.bindparam_string( 

3823 name, 

3824 post_compile=post_compile, 

3825 expanding=bindparam.expanding, 

3826 bindparam_type=bindparam.type, 

3827 **kwargs, 

3828 ) 

3829 

3830 if bindparam.expanding: 

3831 ret = "(%s)" % ret 

3832 

3833 return ret 

3834 

3835 def render_bind_cast(self, type_, dbapi_type, sqltext): 

3836 raise NotImplementedError() 

3837 

3838 def render_literal_bindparam( 

3839 self, 

3840 bindparam, 

3841 render_literal_value=NO_ARG, 

3842 bind_expression_template=None, 

3843 **kw, 

3844 ): 

3845 if render_literal_value is not NO_ARG: 

3846 value = render_literal_value 

3847 else: 

3848 if bindparam.value is None and bindparam.callable is None: 

3849 op = kw.get("_binary_op", None) 

3850 if op and op not in (operators.is_, operators.is_not): 

3851 util.warn_limited( 

3852 "Bound parameter '%s' rendering literal NULL in a SQL " 

3853 "expression; comparisons to NULL should not use " 

3854 "operators outside of 'is' or 'is not'", 

3855 (bindparam.key,), 

3856 ) 

3857 return self.process(sqltypes.NULLTYPE, **kw) 

3858 value = bindparam.effective_value 

3859 

3860 if bindparam.expanding: 

3861 leep = self._literal_execute_expanding_parameter_literal_binds 

3862 to_update, replacement_expr = leep( 

3863 bindparam, 

3864 value, 

3865 bind_expression_template=bind_expression_template, 

3866 ) 

3867 return replacement_expr 

3868 else: 

3869 return self.render_literal_value(value, bindparam.type) 

3870 

3871 def render_literal_value( 

3872 self, value: Any, type_: sqltypes.TypeEngine[Any] 

3873 ) -> str: 

3874 """Render the value of a bind parameter as a quoted literal. 

3875 

3876 This is used for statement sections that do not accept bind parameters 

3877 on the target driver/database. 

3878 

3879 This should be implemented by subclasses using the quoting services 

3880 of the DBAPI. 

3881 

3882 """ 

3883 

3884 if value is None and not type_.should_evaluate_none: 

3885 # issue #10535 - handle NULL in the compiler without placing 

3886 # this onto each type, except for "evaluate None" types 

3887 # (e.g. JSON) 

3888 return self.process(elements.Null._instance()) 

3889 

3890 processor = type_._cached_literal_processor(self.dialect) 

3891 if processor: 

3892 try: 

3893 return processor(value) 

3894 except Exception as e: 

3895 raise exc.CompileError( 

3896 f"Could not render literal value " 

3897 f'"{sql_util._repr_single_value(value)}" ' 

3898 f"with datatype " 

3899 f"{type_}; see parent stack trace for " 

3900 "more detail." 

3901 ) from e 

3902 

3903 else: 

3904 raise exc.CompileError( 

3905 f"No literal value renderer is available for literal value " 

3906 f'"{sql_util._repr_single_value(value)}" ' 

3907 f"with datatype {type_}" 

3908 ) 

3909 

3910 def _truncate_bindparam(self, bindparam): 

3911 if bindparam in self.bind_names: 

3912 return self.bind_names[bindparam] 

3913 

3914 bind_name = bindparam.key 

3915 if isinstance(bind_name, elements._truncated_label): 

3916 bind_name = self._truncated_identifier("bindparam", bind_name) 

3917 

3918 # add to bind_names for translation 

3919 self.bind_names[bindparam] = bind_name 

3920 

3921 return bind_name 

3922 

3923 def _truncated_identifier( 

3924 self, ident_class: str, name: _truncated_label 

3925 ) -> str: 

3926 if (ident_class, name) in self.truncated_names: 

3927 return self.truncated_names[(ident_class, name)] 

3928 

3929 anonname = name.apply_map(self.anon_map) 

3930 

3931 if len(anonname) > self.label_length - 6: 

3932 counter = self._truncated_counters.get(ident_class, 1) 

3933 truncname = ( 

3934 anonname[0 : max(self.label_length - 6, 0)] 

3935 + "_" 

3936 + hex(counter)[2:] 

3937 ) 

3938 self._truncated_counters[ident_class] = counter + 1 

3939 else: 

3940 truncname = anonname 

3941 self.truncated_names[(ident_class, name)] = truncname 

3942 return truncname 

3943 

3944 def _anonymize(self, name: str) -> str: 

3945 return name % self.anon_map 

3946 

3947 def bindparam_string( 

3948 self, 

3949 name: str, 

3950 post_compile: bool = False, 

3951 expanding: bool = False, 

3952 escaped_from: Optional[str] = None, 

3953 bindparam_type: Optional[TypeEngine[Any]] = None, 

3954 accumulate_bind_names: Optional[Set[str]] = None, 

3955 visited_bindparam: Optional[List[str]] = None, 

3956 **kw: Any, 

3957 ) -> str: 

3958 # TODO: accumulate_bind_names is passed by crud.py to gather 

3959 # names on a per-value basis, visited_bindparam is passed by 

3960 # visit_insert() to collect all parameters in the statement. 

3961 # see if this gathering can be simplified somehow 

3962 if accumulate_bind_names is not None: 

3963 accumulate_bind_names.add(name) 

3964 if visited_bindparam is not None: 

3965 visited_bindparam.append(name) 

3966 

3967 if not escaped_from: 

3968 if self._bind_translate_re.search(name): 

3969 # not quite the translate use case as we want to 

3970 # also get a quick boolean if we even found 

3971 # unusual characters in the name 

3972 new_name = self._bind_translate_re.sub( 

3973 lambda m: self._bind_translate_chars[m.group(0)], 

3974 name, 

3975 ) 

3976 escaped_from = name 

3977 name = new_name 

3978 

3979 if escaped_from: 

3980 self.escaped_bind_names = self.escaped_bind_names.union( 

3981 {escaped_from: name} 

3982 ) 

3983 if post_compile: 

3984 ret = "__[POSTCOMPILE_%s]" % name 

3985 if expanding: 

3986 # for expanding, bound parameters or literal values will be 

3987 # rendered per item 

3988 return ret 

3989 

3990 # otherwise, for non-expanding "literal execute", apply 

3991 # bind casts as determined by the datatype 

3992 if bindparam_type is not None: 

3993 type_impl = bindparam_type._unwrapped_dialect_impl( 

3994 self.dialect 

3995 ) 

3996 if type_impl.render_literal_cast: 

3997 ret = self.render_bind_cast(bindparam_type, type_impl, ret) 

3998 return ret 

3999 elif self.state is CompilerState.COMPILING: 

4000 ret = self.compilation_bindtemplate % {"name": name} 

4001 else: 

4002 ret = self.bindtemplate % {"name": name} 

4003 

4004 if ( 

4005 bindparam_type is not None 

4006 and self.dialect._bind_typing_render_casts 

4007 ): 

4008 type_impl = bindparam_type._unwrapped_dialect_impl(self.dialect) 

4009 if type_impl.render_bind_cast: 

4010 ret = self.render_bind_cast(bindparam_type, type_impl, ret) 

4011 

4012 return ret 

4013 

4014 def _dispatch_independent_ctes(self, stmt, kw): 

4015 local_kw = kw.copy() 

4016 local_kw.pop("cte_opts", None) 

4017 for cte, opt in zip( 

4018 stmt._independent_ctes, stmt._independent_ctes_opts 

4019 ): 

4020 cte._compiler_dispatch(self, cte_opts=opt, **local_kw) 

4021 

4022 def visit_cte( 

4023 self, 

4024 cte: CTE, 

4025 asfrom: bool = False, 

4026 ashint: bool = False, 

4027 fromhints: Optional[_FromHintsType] = None, 

4028 visiting_cte: Optional[CTE] = None, 

4029 from_linter: Optional[FromLinter] = None, 

4030 cte_opts: selectable._CTEOpts = selectable._CTEOpts(False), 

4031 **kwargs: Any, 

4032 ) -> Optional[str]: 

4033 self_ctes = self._init_cte_state() 

4034 assert self_ctes is self.ctes 

4035 

4036 kwargs["visiting_cte"] = cte 

4037 

4038 cte_name = cte.name 

4039 

4040 if isinstance(cte_name, elements._truncated_label): 

4041 cte_name = self._truncated_identifier("alias", cte_name) 

4042 

4043 is_new_cte = True 

4044 embedded_in_current_named_cte = False 

4045 

4046 _reference_cte = cte._get_reference_cte() 

4047 

4048 nesting = cte.nesting or cte_opts.nesting 

4049 

4050 # check for CTE already encountered 

4051 if _reference_cte in self.level_name_by_cte: 

4052 cte_level, _, existing_cte_opts = self.level_name_by_cte[ 

4053 _reference_cte 

4054 ] 

4055 assert _ == cte_name 

4056 

4057 cte_level_name = (cte_level, cte_name) 

4058 existing_cte = self.ctes_by_level_name[cte_level_name] 

4059 

4060 # check if we are receiving it here with a specific 

4061 # "nest_here" location; if so, move it to this location 

4062 

4063 if cte_opts.nesting: 

4064 if existing_cte_opts.nesting: 

4065 raise exc.CompileError( 

4066 "CTE is stated as 'nest_here' in " 

4067 "more than one location" 

4068 ) 

4069 

4070 old_level_name = (cte_level, cte_name) 

4071 cte_level = len(self.stack) if nesting else 1 

4072 cte_level_name = new_level_name = (cte_level, cte_name) 

4073 

4074 del self.ctes_by_level_name[old_level_name] 

4075 self.ctes_by_level_name[new_level_name] = existing_cte 

4076 self.level_name_by_cte[_reference_cte] = new_level_name + ( 

4077 cte_opts, 

4078 ) 

4079 

4080 else: 

4081 cte_level = len(self.stack) if nesting else 1 

4082 cte_level_name = (cte_level, cte_name) 

4083 

4084 if cte_level_name in self.ctes_by_level_name: 

4085 existing_cte = self.ctes_by_level_name[cte_level_name] 

4086 else: 

4087 existing_cte = None 

4088 

4089 if existing_cte is not None: 

4090 embedded_in_current_named_cte = visiting_cte is existing_cte 

4091 

4092 # we've generated a same-named CTE that we are enclosed in, 

4093 # or this is the same CTE. just return the name. 

4094 if cte is existing_cte._restates or cte is existing_cte: 

4095 is_new_cte = False 

4096 elif existing_cte is cte._restates: 

4097 # we've generated a same-named CTE that is 

4098 # enclosed in us - we take precedence, so 

4099 # discard the text for the "inner". 

4100 del self_ctes[existing_cte] 

4101 

4102 existing_cte_reference_cte = existing_cte._get_reference_cte() 

4103 

4104 assert existing_cte_reference_cte is _reference_cte 

4105 assert existing_cte_reference_cte is existing_cte 

4106 

4107 del self.level_name_by_cte[existing_cte_reference_cte] 

4108 else: 

4109 if ( 

4110 # if the two CTEs have the same hash, which we expect 

4111 # here means that one/both is an annotated of the other 

4112 (hash(cte) == hash(existing_cte)) 

4113 # or... 

4114 or ( 

4115 ( 

4116 # if they are clones, i.e. they came from the ORM 

4117 # or some other visit method 

4118 cte._is_clone_of is not None 

4119 or existing_cte._is_clone_of is not None 

4120 ) 

4121 # and are deep-copy identical 

4122 and cte.compare(existing_cte) 

4123 ) 

4124 ): 

4125 # then consider these two CTEs the same 

4126 is_new_cte = False 

4127 else: 

4128 # otherwise these are two CTEs that either will render 

4129 # differently, or were indicated separately by the user, 

4130 # with the same name 

4131 raise exc.CompileError( 

4132 "Multiple, unrelated CTEs found with " 

4133 "the same name: %r" % cte_name 

4134 ) 

4135 

4136 if not asfrom and not is_new_cte: 

4137 return None 

4138 

4139 if cte._cte_alias is not None: 

4140 pre_alias_cte = cte._cte_alias 

4141 cte_pre_alias_name = cte._cte_alias.name 

4142 if isinstance(cte_pre_alias_name, elements._truncated_label): 

4143 cte_pre_alias_name = self._truncated_identifier( 

4144 "alias", cte_pre_alias_name 

4145 ) 

4146 else: 

4147 pre_alias_cte = cte 

4148 cte_pre_alias_name = None 

4149 

4150 if is_new_cte: 

4151 self.ctes_by_level_name[cte_level_name] = cte 

4152 self.level_name_by_cte[_reference_cte] = cte_level_name + ( 

4153 cte_opts, 

4154 ) 

4155 

4156 if pre_alias_cte not in self.ctes: 

4157 self.visit_cte(pre_alias_cte, **kwargs) 

4158 

4159 if not cte_pre_alias_name and cte not in self_ctes: 

4160 if cte.recursive: 

4161 self.ctes_recursive = True 

4162 text = self.preparer.format_alias(cte, cte_name) 

4163 if cte.recursive or cte.element.name_cte_columns: 

4164 col_source = cte.element 

4165 

4166 # TODO: can we get at the .columns_plus_names collection 

4167 # that is already (or will be?) generated for the SELECT 

4168 # rather than calling twice? 

4169 recur_cols = [ 

4170 # TODO: proxy_name is not technically safe, 

4171 # see test_cte-> 

4172 # test_with_recursive_no_name_currently_buggy. not 

4173 # clear what should be done with such a case 

4174 fallback_label_name or proxy_name 

4175 for ( 

4176 _, 

4177 proxy_name, 

4178 fallback_label_name, 

4179 c, 

4180 repeated, 

4181 ) in (col_source._generate_columns_plus_names(True)) 

4182 if not repeated 

4183 ] 

4184 

4185 text += "(%s)" % ( 

4186 ", ".join( 

4187 self.preparer.format_label_name( 

4188 ident, anon_map=self.anon_map 

4189 ) 

4190 for ident in recur_cols 

4191 ) 

4192 ) 

4193 

4194 assert kwargs.get("subquery", False) is False 

4195 

4196 if not self.stack: 

4197 # toplevel, this is a stringify of the 

4198 # cte directly. just compile the inner 

4199 # the way alias() does. 

4200 return cte.element._compiler_dispatch( 

4201 self, asfrom=asfrom, **kwargs 

4202 ) 

4203 else: 

4204 prefixes = self._generate_prefixes( 

4205 cte, cte._prefixes, **kwargs 

4206 ) 

4207 inner = cte.element._compiler_dispatch( 

4208 self, asfrom=True, **kwargs 

4209 ) 

4210 

4211 text += " AS %s\n(%s)" % (prefixes, inner) 

4212 

4213 if cte._suffixes: 

4214 text += " " + self._generate_prefixes( 

4215 cte, cte._suffixes, **kwargs 

4216 ) 

4217 

4218 self_ctes[cte] = text 

4219 

4220 if asfrom: 

4221 if from_linter: 

4222 from_linter.froms[cte._de_clone()] = cte_name 

4223 

4224 if not is_new_cte and embedded_in_current_named_cte: 

4225 return self.preparer.format_alias(cte, cte_name) 

4226 

4227 if cte_pre_alias_name: 

4228 text = self.preparer.format_alias(cte, cte_pre_alias_name) 

4229 if self.preparer._requires_quotes(cte_name): 

4230 cte_name = self.preparer.quote(cte_name) 

4231 text += self.get_render_as_alias_suffix(cte_name) 

4232 return text # type: ignore[no-any-return] 

4233 else: 

4234 return self.preparer.format_alias(cte, cte_name) 

4235 

4236 return None 

4237 

4238 def visit_table_valued_alias(self, element, **kw): 

4239 if element.joins_implicitly: 

4240 kw["from_linter"] = None 

4241 if element._is_lateral: 

4242 return self.visit_lateral(element, **kw) 

4243 else: 

4244 return self.visit_alias(element, **kw) 

4245 

4246 def visit_table_valued_column(self, element, **kw): 

4247 return self.visit_column(element, **kw) 

4248 

4249 def visit_alias( 

4250 self, 

4251 alias, 

4252 asfrom=False, 

4253 ashint=False, 

4254 iscrud=False, 

4255 fromhints=None, 

4256 subquery=False, 

4257 lateral=False, 

4258 enclosing_alias=None, 

4259 from_linter=None, 

4260 **kwargs, 

4261 ): 

4262 if lateral: 

4263 if "enclosing_lateral" not in kwargs: 

4264 # if lateral is set and enclosing_lateral is not 

4265 # present, we assume we are being called directly 

4266 # from visit_lateral() and we need to set enclosing_lateral. 

4267 assert alias._is_lateral 

4268 kwargs["enclosing_lateral"] = alias 

4269 

4270 # for lateral objects, we track a second from_linter that is... 

4271 # lateral! to the level above us. 

4272 if ( 

4273 from_linter 

4274 and "lateral_from_linter" not in kwargs 

4275 and "enclosing_lateral" in kwargs 

4276 ): 

4277 kwargs["lateral_from_linter"] = from_linter 

4278 

4279 if enclosing_alias is not None and enclosing_alias.element is alias: 

4280 inner = alias.element._compiler_dispatch( 

4281 self, 

4282 asfrom=asfrom, 

4283 ashint=ashint, 

4284 iscrud=iscrud, 

4285 fromhints=fromhints, 

4286 lateral=lateral, 

4287 enclosing_alias=alias, 

4288 **kwargs, 

4289 ) 

4290 if subquery and (asfrom or lateral): 

4291 inner = "(%s)" % (inner,) 

4292 return inner 

4293 else: 

4294 kwargs["enclosing_alias"] = alias 

4295 

4296 if asfrom or ashint: 

4297 if isinstance(alias.name, elements._truncated_label): 

4298 alias_name = self._truncated_identifier("alias", alias.name) 

4299 else: 

4300 alias_name = alias.name 

4301 

4302 if ashint: 

4303 return self.preparer.format_alias(alias, alias_name) 

4304 elif asfrom: 

4305 if from_linter: 

4306 from_linter.froms[alias._de_clone()] = alias_name 

4307 

4308 inner = alias.element._compiler_dispatch( 

4309 self, asfrom=True, lateral=lateral, **kwargs 

4310 ) 

4311 if subquery: 

4312 inner = "(%s)" % (inner,) 

4313 

4314 ret = inner + self.get_render_as_alias_suffix( 

4315 self.preparer.format_alias(alias, alias_name) 

4316 ) 

4317 

4318 if alias._supports_derived_columns and alias._render_derived: 

4319 ret += "(%s)" % ( 

4320 ", ".join( 

4321 "%s%s" 

4322 % ( 

4323 self.preparer.quote(col.name), 

4324 ( 

4325 " %s" 

4326 % self.dialect.type_compiler_instance.process( 

4327 col.type, **kwargs 

4328 ) 

4329 if alias._render_derived_w_types 

4330 else "" 

4331 ), 

4332 ) 

4333 for col in alias.c 

4334 ) 

4335 ) 

4336 

4337 if fromhints and alias in fromhints: 

4338 ret = self.format_from_hint_text( 

4339 ret, alias, fromhints[alias], iscrud 

4340 ) 

4341 

4342 return ret 

4343 else: 

4344 # note we cancel the "subquery" flag here as well 

4345 return alias.element._compiler_dispatch( 

4346 self, lateral=lateral, **kwargs 

4347 ) 

4348 

4349 def visit_subquery(self, subquery, **kw): 

4350 kw["subquery"] = True 

4351 return self.visit_alias(subquery, **kw) 

4352 

4353 def visit_lateral(self, lateral_, **kw): 

4354 kw["lateral"] = True 

4355 return "LATERAL %s" % self.visit_alias(lateral_, **kw) 

4356 

4357 def visit_tablesample(self, tablesample, asfrom=False, **kw): 

4358 text = "%s TABLESAMPLE %s" % ( 

4359 self.visit_alias(tablesample, asfrom=True, **kw), 

4360 tablesample._get_method()._compiler_dispatch(self, **kw), 

4361 ) 

4362 

4363 if tablesample.seed is not None: 

4364 text += " REPEATABLE (%s)" % ( 

4365 tablesample.seed._compiler_dispatch(self, **kw) 

4366 ) 

4367 

4368 return text 

4369 

4370 def _render_values(self, element, **kw): 

4371 kw.setdefault("literal_binds", element.literal_binds) 

4372 tuples = ", ".join( 

4373 self.process( 

4374 elements.Tuple( 

4375 types=element._column_types, *elem 

4376 ).self_group(), 

4377 **kw, 

4378 ) 

4379 for chunk in element._data 

4380 for elem in chunk 

4381 ) 

4382 return f"VALUES {tuples}" 

4383 

4384 def visit_values( 

4385 self, element, asfrom=False, from_linter=None, visiting_cte=None, **kw 

4386 ): 

4387 

4388 if element._independent_ctes: 

4389 self._dispatch_independent_ctes(element, kw) 

4390 

4391 v = self._render_values(element, **kw) 

4392 

4393 if element._unnamed: 

4394 name = None 

4395 elif isinstance(element.name, elements._truncated_label): 

4396 name = self._truncated_identifier("values", element.name) 

4397 else: 

4398 name = element.name 

4399 

4400 if element._is_lateral: 

4401 lateral = "LATERAL " 

4402 else: 

4403 lateral = "" 

4404 

4405 if asfrom: 

4406 if from_linter: 

4407 from_linter.froms[element._de_clone()] = ( 

4408 name if name is not None else "(unnamed VALUES element)" 

4409 ) 

4410 

4411 if visiting_cte is not None and visiting_cte.element is element: 

4412 if element._is_lateral: 

4413 raise exc.CompileError( 

4414 "Can't use a LATERAL VALUES expression inside of a CTE" 

4415 ) 

4416 elif name: 

4417 kw["include_table"] = False 

4418 v = "%s(%s)%s (%s)" % ( 

4419 lateral, 

4420 v, 

4421 self.get_render_as_alias_suffix(self.preparer.quote(name)), 

4422 ( 

4423 ", ".join( 

4424 c._compiler_dispatch(self, **kw) 

4425 for c in element.columns 

4426 ) 

4427 ), 

4428 ) 

4429 else: 

4430 v = "%s(%s)" % (lateral, v) 

4431 return v 

4432 

4433 def visit_scalar_values(self, element, **kw): 

4434 return f"({self._render_values(element, **kw)})" 

4435 

4436 def get_render_as_alias_suffix(self, alias_name_text): 

4437 return " AS " + alias_name_text 

4438 

4439 def _add_to_result_map( 

4440 self, 

4441 keyname: str, 

4442 name: str, 

4443 objects: Tuple[Any, ...], 

4444 type_: TypeEngine[Any], 

4445 ) -> None: 

4446 

4447 # note objects must be non-empty for cursor.py to handle the 

4448 # collection properly 

4449 assert objects 

4450 

4451 if keyname is None or keyname == "*": 

4452 self._ordered_columns = False 

4453 self._ad_hoc_textual = True 

4454 if type_._is_tuple_type: 

4455 raise exc.CompileError( 

4456 "Most backends don't support SELECTing " 

4457 "from a tuple() object. If this is an ORM query, " 

4458 "consider using the Bundle object." 

4459 ) 

4460 self._result_columns.append( 

4461 ResultColumnsEntry(keyname, name, objects, type_) 

4462 ) 

4463 

4464 def _label_returning_column( 

4465 self, stmt, column, populate_result_map, column_clause_args=None, **kw 

4466 ): 

4467 """Render a column with necessary labels inside of a RETURNING clause. 

4468 

4469 This method is provided for individual dialects in place of calling 

4470 the _label_select_column method directly, so that the two use cases 

4471 of RETURNING vs. SELECT can be disambiguated going forward. 

4472 

4473 .. versionadded:: 1.4.21 

4474 

4475 """ 

4476 return self._label_select_column( 

4477 None, 

4478 column, 

4479 populate_result_map, 

4480 False, 

4481 {} if column_clause_args is None else column_clause_args, 

4482 **kw, 

4483 ) 

4484 

4485 def _label_select_column( 

4486 self, 

4487 select, 

4488 column, 

4489 populate_result_map, 

4490 asfrom, 

4491 column_clause_args, 

4492 name=None, 

4493 proxy_name=None, 

4494 fallback_label_name=None, 

4495 within_columns_clause=True, 

4496 column_is_repeated=False, 

4497 need_column_expressions=False, 

4498 include_table=True, 

4499 ): 

4500 """produce labeled columns present in a select().""" 

4501 impl = column.type.dialect_impl(self.dialect) 

4502 

4503 if impl._has_column_expression and ( 

4504 need_column_expressions or populate_result_map 

4505 ): 

4506 col_expr = impl.column_expression(column) 

4507 else: 

4508 col_expr = column 

4509 

4510 if populate_result_map: 

4511 # pass an "add_to_result_map" callable into the compilation 

4512 # of embedded columns. this collects information about the 

4513 # column as it will be fetched in the result and is coordinated 

4514 # with cursor.description when the query is executed. 

4515 add_to_result_map = self._add_to_result_map 

4516 

4517 # if the SELECT statement told us this column is a repeat, 

4518 # wrap the callable with one that prevents the addition of the 

4519 # targets 

4520 if column_is_repeated: 

4521 _add_to_result_map = add_to_result_map 

4522 

4523 def add_to_result_map(keyname, name, objects, type_): 

4524 _add_to_result_map(keyname, name, (keyname,), type_) 

4525 

4526 # if we redefined col_expr for type expressions, wrap the 

4527 # callable with one that adds the original column to the targets 

4528 elif col_expr is not column: 

4529 _add_to_result_map = add_to_result_map 

4530 

4531 def add_to_result_map(keyname, name, objects, type_): 

4532 _add_to_result_map( 

4533 keyname, name, (column,) + objects, type_ 

4534 ) 

4535 

4536 else: 

4537 add_to_result_map = None 

4538 

4539 # this method is used by some of the dialects for RETURNING, 

4540 # which has different inputs. _label_returning_column was added 

4541 # as the better target for this now however for 1.4 we will keep 

4542 # _label_select_column directly compatible with this use case. 

4543 # these assertions right now set up the current expected inputs 

4544 assert within_columns_clause, ( 

4545 "_label_select_column is only relevant within " 

4546 "the columns clause of a SELECT or RETURNING" 

4547 ) 

4548 if isinstance(column, elements.Label): 

4549 if col_expr is not column: 

4550 result_expr = _CompileLabel( 

4551 col_expr, column.name, alt_names=(column.element,) 

4552 ) 

4553 else: 

4554 result_expr = col_expr 

4555 

4556 elif name: 

4557 # here, _columns_plus_names has determined there's an explicit 

4558 # label name we need to use. this is the default for 

4559 # tablenames_plus_columnnames as well as when columns are being 

4560 # deduplicated on name 

4561 

4562 assert ( 

4563 proxy_name is not None 

4564 ), "proxy_name is required if 'name' is passed" 

4565 

4566 result_expr = _CompileLabel( 

4567 col_expr, 

4568 name, 

4569 alt_names=( 

4570 proxy_name, 

4571 # this is a hack to allow legacy result column lookups 

4572 # to work as they did before; this goes away in 2.0. 

4573 # TODO: this only seems to be tested indirectly 

4574 # via test/orm/test_deprecations.py. should be a 

4575 # resultset test for this 

4576 column._tq_label, 

4577 ), 

4578 ) 

4579 else: 

4580 # determine here whether this column should be rendered in 

4581 # a labelled context or not, as we were given no required label 

4582 # name from the caller. Here we apply heuristics based on the kind 

4583 # of SQL expression involved. 

4584 

4585 if col_expr is not column: 

4586 # type-specific expression wrapping the given column, 

4587 # so we render a label 

4588 render_with_label = True 

4589 elif isinstance(column, elements.ColumnClause): 

4590 # table-bound column, we render its name as a label if we are 

4591 # inside of a subquery only 

4592 render_with_label = ( 

4593 asfrom 

4594 and not column.is_literal 

4595 and column.table is not None 

4596 ) 

4597 elif isinstance(column, elements.TextClause): 

4598 render_with_label = False 

4599 elif isinstance(column, elements.UnaryExpression): 

4600 # unary expression. notes added as of #12681 

4601 # 

4602 # By convention, the visit_unary() method 

4603 # itself does not add an entry to the result map, and relies 

4604 # upon either the inner expression creating a result map 

4605 # entry, or if not, by creating a label here that produces 

4606 # the result map entry. Where that happens is based on whether 

4607 # or not the element immediately inside the unary is a 

4608 # NamedColumn subclass or not. 

4609 # 

4610 # Now, this also impacts how the SELECT is written; if 

4611 # we decide to generate a label here, we get the usual 

4612 # "~(x+y) AS anon_1" thing in the columns clause. If we 

4613 # don't, we don't get an AS at all, we get like 

4614 # "~table.column". 

4615 # 

4616 # But here is the important thing as of modernish (like 1.4) 

4617 # versions of SQLAlchemy - **whether or not the AS <label> 

4618 # is present in the statement is not actually important**. 

4619 # We target result columns **positionally** for a fully 

4620 # compiled ``Select()`` object; before 1.4 we needed those 

4621 # labels to match in cursor.description etc etc but now it 

4622 # really doesn't matter. 

4623 # So really, we could set render_with_label True in all cases. 

4624 # Or we could just have visit_unary() populate the result map 

4625 # in all cases. 

4626 # 

4627 # What we're doing here is strictly trying to not rock the 

4628 # boat too much with when we do/don't render "AS label"; 

4629 # labels being present helps in the edge cases that we 

4630 # "fall back" to named cursor.description matching, labels 

4631 # not being present for columns keeps us from having awkward 

4632 # phrases like "SELECT DISTINCT table.x AS x". 

4633 render_with_label = ( 

4634 ( 

4635 # exception case to detect if we render "not boolean" 

4636 # as "not <col>" for native boolean or "<col> = 1" 

4637 # for non-native boolean. this is controlled by 

4638 # visit_is_<true|false>_unary_operator 

4639 column.operator 

4640 in (operators.is_false, operators.is_true) 

4641 and not self.dialect.supports_native_boolean 

4642 ) 

4643 or column._wraps_unnamed_column() 

4644 or asfrom 

4645 ) 

4646 elif ( 

4647 # general class of expressions that don't have a SQL-column 

4648 # addressable name. includes scalar selects, bind parameters, 

4649 # SQL functions, others 

4650 not isinstance(column, elements.NamedColumn) 

4651 # deeper check that indicates there's no natural "name" to 

4652 # this element, which accommodates for custom SQL constructs 

4653 # that might have a ".name" attribute (but aren't SQL 

4654 # functions) but are not implementing this more recently added 

4655 # base class. in theory the "NamedColumn" check should be 

4656 # enough, however here we seek to maintain legacy behaviors 

4657 # as well. 

4658 and column._non_anon_label is None 

4659 ): 

4660 render_with_label = True 

4661 else: 

4662 render_with_label = False 

4663 

4664 if render_with_label: 

4665 if not fallback_label_name: 

4666 # used by the RETURNING case right now. we generate it 

4667 # here as 3rd party dialects may be referring to 

4668 # _label_select_column method directly instead of the 

4669 # just-added _label_returning_column method 

4670 assert not column_is_repeated 

4671 fallback_label_name = column._anon_name_label 

4672 

4673 fallback_label_name = ( 

4674 elements._truncated_label(fallback_label_name) 

4675 if not isinstance( 

4676 fallback_label_name, elements._truncated_label 

4677 ) 

4678 else fallback_label_name 

4679 ) 

4680 

4681 result_expr = _CompileLabel( 

4682 col_expr, fallback_label_name, alt_names=(proxy_name,) 

4683 ) 

4684 else: 

4685 result_expr = col_expr 

4686 

4687 column_clause_args.update( 

4688 within_columns_clause=within_columns_clause, 

4689 add_to_result_map=add_to_result_map, 

4690 include_table=include_table, 

4691 ) 

4692 return result_expr._compiler_dispatch(self, **column_clause_args) 

4693 

4694 def format_from_hint_text(self, sqltext, table, hint, iscrud): 

4695 hinttext = self.get_from_hint_text(table, hint) 

4696 if hinttext: 

4697 sqltext += " " + hinttext 

4698 return sqltext 

4699 

4700 def get_select_hint_text(self, byfroms): 

4701 return None 

4702 

4703 def get_from_hint_text( 

4704 self, table: FromClause, text: Optional[str] 

4705 ) -> Optional[str]: 

4706 return None 

4707 

4708 def get_crud_hint_text(self, table, text): 

4709 return None 

4710 

4711 def get_statement_hint_text(self, hint_texts): 

4712 return " ".join(hint_texts) 

4713 

4714 _default_stack_entry: _CompilerStackEntry 

4715 

4716 if not typing.TYPE_CHECKING: 

4717 _default_stack_entry = util.immutabledict( 

4718 [("correlate_froms", frozenset()), ("asfrom_froms", frozenset())] 

4719 ) 

4720 

4721 def _display_froms_for_select( 

4722 self, select_stmt, asfrom, lateral=False, **kw 

4723 ): 

4724 # utility method to help external dialects 

4725 # get the correct from list for a select. 

4726 # specifically the oracle dialect needs this feature 

4727 # right now. 

4728 toplevel = not self.stack 

4729 entry = self._default_stack_entry if toplevel else self.stack[-1] 

4730 

4731 compile_state = select_stmt._compile_state_factory(select_stmt, self) 

4732 

4733 correlate_froms = entry["correlate_froms"] 

4734 asfrom_froms = entry["asfrom_froms"] 

4735 

4736 if asfrom and not lateral: 

4737 froms = compile_state._get_display_froms( 

4738 explicit_correlate_froms=correlate_froms.difference( 

4739 asfrom_froms 

4740 ), 

4741 implicit_correlate_froms=(), 

4742 ) 

4743 else: 

4744 froms = compile_state._get_display_froms( 

4745 explicit_correlate_froms=correlate_froms, 

4746 implicit_correlate_froms=asfrom_froms, 

4747 ) 

4748 return froms 

4749 

4750 translate_select_structure: Any = None 

4751 """if not ``None``, should be a callable which accepts ``(select_stmt, 

4752 **kw)`` and returns a select object. this is used for structural changes 

4753 mostly to accommodate for LIMIT/OFFSET schemes 

4754 

4755 """ 

4756 

4757 def visit_select( 

4758 self, 

4759 select_stmt, 

4760 asfrom=False, 

4761 insert_into=False, 

4762 fromhints=None, 

4763 compound_index=None, 

4764 select_wraps_for=None, 

4765 lateral=False, 

4766 from_linter=None, 

4767 **kwargs, 

4768 ): 

4769 assert select_wraps_for is None, ( 

4770 "SQLAlchemy 1.4 requires use of " 

4771 "the translate_select_structure hook for structural " 

4772 "translations of SELECT objects" 

4773 ) 

4774 

4775 # initial setup of SELECT. the compile_state_factory may now 

4776 # be creating a totally different SELECT from the one that was 

4777 # passed in. for ORM use this will convert from an ORM-state 

4778 # SELECT to a regular "Core" SELECT. other composed operations 

4779 # such as computation of joins will be performed. 

4780 

4781 kwargs["within_columns_clause"] = False 

4782 

4783 compile_state = select_stmt._compile_state_factory( 

4784 select_stmt, self, **kwargs 

4785 ) 

4786 kwargs["ambiguous_table_name_map"] = ( 

4787 compile_state._ambiguous_table_name_map 

4788 ) 

4789 

4790 select_stmt = compile_state.statement 

4791 

4792 toplevel = not self.stack 

4793 

4794 if toplevel and not self.compile_state: 

4795 self.compile_state = compile_state 

4796 

4797 is_embedded_select = compound_index is not None or insert_into 

4798 

4799 # translate step for Oracle, SQL Server which often need to 

4800 # restructure the SELECT to allow for LIMIT/OFFSET and possibly 

4801 # other conditions 

4802 if self.translate_select_structure: 

4803 new_select_stmt = self.translate_select_structure( 

4804 select_stmt, asfrom=asfrom, **kwargs 

4805 ) 

4806 

4807 # if SELECT was restructured, maintain a link to the originals 

4808 # and assemble a new compile state 

4809 if new_select_stmt is not select_stmt: 

4810 compile_state_wraps_for = compile_state 

4811 select_wraps_for = select_stmt 

4812 select_stmt = new_select_stmt 

4813 

4814 compile_state = select_stmt._compile_state_factory( 

4815 select_stmt, self, **kwargs 

4816 ) 

4817 select_stmt = compile_state.statement 

4818 

4819 entry = self._default_stack_entry if toplevel else self.stack[-1] 

4820 

4821 populate_result_map = need_column_expressions = ( 

4822 toplevel 

4823 or entry.get("need_result_map_for_compound", False) 

4824 or entry.get("need_result_map_for_nested", False) 

4825 ) 

4826 

4827 # indicates there is a CompoundSelect in play and we are not the 

4828 # first select 

4829 if compound_index: 

4830 populate_result_map = False 

4831 

4832 # this was first proposed as part of #3372; however, it is not 

4833 # reached in current tests and could possibly be an assertion 

4834 # instead. 

4835 if not populate_result_map and "add_to_result_map" in kwargs: 

4836 del kwargs["add_to_result_map"] 

4837 

4838 froms = self._setup_select_stack( 

4839 select_stmt, compile_state, entry, asfrom, lateral, compound_index 

4840 ) 

4841 

4842 column_clause_args = kwargs.copy() 

4843 column_clause_args.update( 

4844 {"within_label_clause": False, "within_columns_clause": False} 

4845 ) 

4846 

4847 text = "SELECT " # we're off to a good start ! 

4848 

4849 if select_stmt._hints: 

4850 hint_text, byfrom = self._setup_select_hints(select_stmt) 

4851 if hint_text: 

4852 text += hint_text + " " 

4853 else: 

4854 byfrom = None 

4855 

4856 if select_stmt._independent_ctes: 

4857 self._dispatch_independent_ctes(select_stmt, kwargs) 

4858 

4859 if select_stmt._prefixes: 

4860 text += self._generate_prefixes( 

4861 select_stmt, select_stmt._prefixes, **kwargs 

4862 ) 

4863 

4864 text += self.get_select_precolumns(select_stmt, **kwargs) 

4865 # the actual list of columns to print in the SELECT column list. 

4866 inner_columns = [ 

4867 c 

4868 for c in [ 

4869 self._label_select_column( 

4870 select_stmt, 

4871 column, 

4872 populate_result_map, 

4873 asfrom, 

4874 column_clause_args, 

4875 name=name, 

4876 proxy_name=proxy_name, 

4877 fallback_label_name=fallback_label_name, 

4878 column_is_repeated=repeated, 

4879 need_column_expressions=need_column_expressions, 

4880 ) 

4881 for ( 

4882 name, 

4883 proxy_name, 

4884 fallback_label_name, 

4885 column, 

4886 repeated, 

4887 ) in compile_state.columns_plus_names 

4888 ] 

4889 if c is not None 

4890 ] 

4891 

4892 if populate_result_map and select_wraps_for is not None: 

4893 # if this select was generated from translate_select, 

4894 # rewrite the targeted columns in the result map 

4895 

4896 translate = dict( 

4897 zip( 

4898 [ 

4899 name 

4900 for ( 

4901 key, 

4902 proxy_name, 

4903 fallback_label_name, 

4904 name, 

4905 repeated, 

4906 ) in compile_state.columns_plus_names 

4907 ], 

4908 [ 

4909 name 

4910 for ( 

4911 key, 

4912 proxy_name, 

4913 fallback_label_name, 

4914 name, 

4915 repeated, 

4916 ) in compile_state_wraps_for.columns_plus_names 

4917 ], 

4918 ) 

4919 ) 

4920 

4921 self._result_columns = [ 

4922 ResultColumnsEntry( 

4923 key, name, tuple(translate.get(o, o) for o in obj), type_ 

4924 ) 

4925 for key, name, obj, type_ in self._result_columns 

4926 ] 

4927 

4928 text = self._compose_select_body( 

4929 text, 

4930 select_stmt, 

4931 compile_state, 

4932 inner_columns, 

4933 froms, 

4934 byfrom, 

4935 toplevel, 

4936 kwargs, 

4937 ) 

4938 

4939 if select_stmt._statement_hints: 

4940 per_dialect = [ 

4941 ht 

4942 for (dialect_name, ht) in select_stmt._statement_hints 

4943 if dialect_name in ("*", self.dialect.name) 

4944 ] 

4945 if per_dialect: 

4946 text += " " + self.get_statement_hint_text(per_dialect) 

4947 

4948 # In compound query, CTEs are shared at the compound level 

4949 if self.ctes and (not is_embedded_select or toplevel): 

4950 nesting_level = len(self.stack) if not toplevel else None 

4951 text = self._render_cte_clause(nesting_level=nesting_level) + text 

4952 

4953 if select_stmt._suffixes: 

4954 text += " " + self._generate_prefixes( 

4955 select_stmt, select_stmt._suffixes, **kwargs 

4956 ) 

4957 

4958 self.stack.pop(-1) 

4959 

4960 return text 

4961 

4962 def _setup_select_hints( 

4963 self, select: Select[Any] 

4964 ) -> Tuple[str, _FromHintsType]: 

4965 byfrom = { 

4966 from_: hinttext 

4967 % {"name": from_._compiler_dispatch(self, ashint=True)} 

4968 for (from_, dialect), hinttext in select._hints.items() 

4969 if dialect in ("*", self.dialect.name) 

4970 } 

4971 hint_text = self.get_select_hint_text(byfrom) 

4972 return hint_text, byfrom 

4973 

4974 def _setup_select_stack( 

4975 self, select, compile_state, entry, asfrom, lateral, compound_index 

4976 ): 

4977 correlate_froms = entry["correlate_froms"] 

4978 asfrom_froms = entry["asfrom_froms"] 

4979 

4980 if compound_index == 0: 

4981 entry["select_0"] = select 

4982 elif compound_index: 

4983 select_0 = entry["select_0"] 

4984 numcols = len(select_0._all_selected_columns) 

4985 

4986 if len(compile_state.columns_plus_names) != numcols: 

4987 raise exc.CompileError( 

4988 "All selectables passed to " 

4989 "CompoundSelect must have identical numbers of " 

4990 "columns; select #%d has %d columns, select " 

4991 "#%d has %d" 

4992 % ( 

4993 1, 

4994 numcols, 

4995 compound_index + 1, 

4996 len(select._all_selected_columns), 

4997 ) 

4998 ) 

4999 

5000 if asfrom and not lateral: 

5001 froms = compile_state._get_display_froms( 

5002 explicit_correlate_froms=correlate_froms.difference( 

5003 asfrom_froms 

5004 ), 

5005 implicit_correlate_froms=(), 

5006 ) 

5007 else: 

5008 froms = compile_state._get_display_froms( 

5009 explicit_correlate_froms=correlate_froms, 

5010 implicit_correlate_froms=asfrom_froms, 

5011 ) 

5012 

5013 new_correlate_froms = set(_from_objects(*froms)) 

5014 all_correlate_froms = new_correlate_froms.union(correlate_froms) 

5015 

5016 new_entry: _CompilerStackEntry = { 

5017 "asfrom_froms": new_correlate_froms, 

5018 "correlate_froms": all_correlate_froms, 

5019 "selectable": select, 

5020 "compile_state": compile_state, 

5021 } 

5022 self.stack.append(new_entry) 

5023 

5024 return froms 

5025 

5026 def _compose_select_body( 

5027 self, 

5028 text, 

5029 select, 

5030 compile_state, 

5031 inner_columns, 

5032 froms, 

5033 byfrom, 

5034 toplevel, 

5035 kwargs, 

5036 ): 

5037 text += ", ".join(inner_columns) 

5038 

5039 if self.linting & COLLECT_CARTESIAN_PRODUCTS: 

5040 from_linter = FromLinter({}, set()) 

5041 warn_linting = self.linting & WARN_LINTING 

5042 if toplevel: 

5043 self.from_linter = from_linter 

5044 else: 

5045 from_linter = None 

5046 warn_linting = False 

5047 

5048 # adjust the whitespace for no inner columns, part of #9440, 

5049 # so that a no-col SELECT comes out as "SELECT WHERE..." or 

5050 # "SELECT FROM ...". 

5051 # while it would be better to have built the SELECT starting string 

5052 # without trailing whitespace first, then add whitespace only if inner 

5053 # cols were present, this breaks compatibility with various custom 

5054 # compilation schemes that are currently being tested. 

5055 if not inner_columns: 

5056 text = text.rstrip() 

5057 

5058 if froms: 

5059 text += " \nFROM " 

5060 

5061 if select._hints: 

5062 text += ", ".join( 

5063 [ 

5064 f._compiler_dispatch( 

5065 self, 

5066 asfrom=True, 

5067 fromhints=byfrom, 

5068 from_linter=from_linter, 

5069 **kwargs, 

5070 ) 

5071 for f in froms 

5072 ] 

5073 ) 

5074 else: 

5075 text += ", ".join( 

5076 [ 

5077 f._compiler_dispatch( 

5078 self, 

5079 asfrom=True, 

5080 from_linter=from_linter, 

5081 **kwargs, 

5082 ) 

5083 for f in froms 

5084 ] 

5085 ) 

5086 else: 

5087 text += self.default_from() 

5088 

5089 if select._where_criteria: 

5090 t = self._generate_delimited_and_list( 

5091 select._where_criteria, from_linter=from_linter, **kwargs 

5092 ) 

5093 if t: 

5094 text += " \nWHERE " + t 

5095 

5096 if warn_linting: 

5097 assert from_linter is not None 

5098 from_linter.warn() 

5099 

5100 if select._group_by_clauses: 

5101 text += self.group_by_clause(select, **kwargs) 

5102 

5103 if select._having_criteria: 

5104 t = self._generate_delimited_and_list( 

5105 select._having_criteria, **kwargs 

5106 ) 

5107 if t: 

5108 text += " \nHAVING " + t 

5109 

5110 if select._order_by_clauses: 

5111 text += self.order_by_clause(select, **kwargs) 

5112 

5113 if select._has_row_limiting_clause: 

5114 text += self._row_limit_clause(select, **kwargs) 

5115 

5116 if select._for_update_arg is not None: 

5117 text += self.for_update_clause(select, **kwargs) 

5118 

5119 return text 

5120 

5121 def _generate_prefixes(self, stmt, prefixes, **kw): 

5122 clause = " ".join( 

5123 prefix._compiler_dispatch(self, **kw) 

5124 for prefix, dialect_name in prefixes 

5125 if dialect_name in (None, "*") or dialect_name == self.dialect.name 

5126 ) 

5127 if clause: 

5128 clause += " " 

5129 return clause 

5130 

5131 def _render_cte_clause( 

5132 self, 

5133 nesting_level=None, 

5134 include_following_stack=False, 

5135 ): 

5136 """ 

5137 include_following_stack 

5138 Also render the nesting CTEs on the next stack. Useful for 

5139 SQL structures like UNION or INSERT that can wrap SELECT 

5140 statements containing nesting CTEs. 

5141 """ 

5142 if not self.ctes: 

5143 return "" 

5144 

5145 ctes: MutableMapping[CTE, str] 

5146 

5147 if nesting_level and nesting_level > 1: 

5148 ctes = util.OrderedDict() 

5149 for cte in list(self.ctes.keys()): 

5150 cte_level, cte_name, cte_opts = self.level_name_by_cte[ 

5151 cte._get_reference_cte() 

5152 ] 

5153 nesting = cte.nesting or cte_opts.nesting 

5154 is_rendered_level = cte_level == nesting_level or ( 

5155 include_following_stack and cte_level == nesting_level + 1 

5156 ) 

5157 if not (nesting and is_rendered_level): 

5158 continue 

5159 

5160 ctes[cte] = self.ctes[cte] 

5161 

5162 else: 

5163 ctes = self.ctes 

5164 

5165 if not ctes: 

5166 return "" 

5167 ctes_recursive = any([cte.recursive for cte in ctes]) 

5168 

5169 cte_text = self.get_cte_preamble(ctes_recursive) + " " 

5170 cte_text += ", \n".join([txt for txt in ctes.values()]) 

5171 cte_text += "\n " 

5172 

5173 if nesting_level and nesting_level > 1: 

5174 for cte in list(ctes.keys()): 

5175 cte_level, cte_name, cte_opts = self.level_name_by_cte[ 

5176 cte._get_reference_cte() 

5177 ] 

5178 del self.ctes[cte] 

5179 del self.ctes_by_level_name[(cte_level, cte_name)] 

5180 del self.level_name_by_cte[cte._get_reference_cte()] 

5181 

5182 return cte_text 

5183 

5184 def get_cte_preamble(self, recursive): 

5185 if recursive: 

5186 return "WITH RECURSIVE" 

5187 else: 

5188 return "WITH" 

5189 

5190 def get_select_precolumns(self, select: Select[Any], **kw: Any) -> str: 

5191 """Called when building a ``SELECT`` statement, position is just 

5192 before column list. 

5193 

5194 """ 

5195 if select._distinct_on: 

5196 util.warn_deprecated( 

5197 "DISTINCT ON is currently supported only by the PostgreSQL " 

5198 "dialect. Use of DISTINCT ON for other backends is currently " 

5199 "silently ignored, however this usage is deprecated, and will " 

5200 "raise CompileError in a future release for all backends " 

5201 "that do not support this syntax.", 

5202 version="1.4", 

5203 ) 

5204 return "DISTINCT " if select._distinct else "" 

5205 

5206 def group_by_clause(self, select, **kw): 

5207 """allow dialects to customize how GROUP BY is rendered.""" 

5208 

5209 group_by = self._generate_delimited_list( 

5210 select._group_by_clauses, OPERATORS[operators.comma_op], **kw 

5211 ) 

5212 if group_by: 

5213 return " GROUP BY " + group_by 

5214 else: 

5215 return "" 

5216 

5217 def order_by_clause(self, select, **kw): 

5218 """allow dialects to customize how ORDER BY is rendered.""" 

5219 

5220 order_by = self._generate_delimited_list( 

5221 select._order_by_clauses, OPERATORS[operators.comma_op], **kw 

5222 ) 

5223 

5224 if order_by: 

5225 return " ORDER BY " + order_by 

5226 else: 

5227 return "" 

5228 

5229 def for_update_clause(self, select, **kw): 

5230 return " FOR UPDATE" 

5231 

5232 def returning_clause( 

5233 self, 

5234 stmt: UpdateBase, 

5235 returning_cols: Sequence[_ColumnsClauseElement], 

5236 *, 

5237 populate_result_map: bool, 

5238 **kw: Any, 

5239 ) -> str: 

5240 columns = [ 

5241 self._label_returning_column( 

5242 stmt, 

5243 column, 

5244 populate_result_map, 

5245 fallback_label_name=fallback_label_name, 

5246 column_is_repeated=repeated, 

5247 name=name, 

5248 proxy_name=proxy_name, 

5249 **kw, 

5250 ) 

5251 for ( 

5252 name, 

5253 proxy_name, 

5254 fallback_label_name, 

5255 column, 

5256 repeated, 

5257 ) in stmt._generate_columns_plus_names( 

5258 True, cols=base._select_iterables(returning_cols) 

5259 ) 

5260 ] 

5261 

5262 return "RETURNING " + ", ".join(columns) 

5263 

5264 def limit_clause(self, select, **kw): 

5265 text = "" 

5266 if select._limit_clause is not None: 

5267 text += "\n LIMIT " + self.process(select._limit_clause, **kw) 

5268 if select._offset_clause is not None: 

5269 if select._limit_clause is None: 

5270 text += "\n LIMIT -1" 

5271 text += " OFFSET " + self.process(select._offset_clause, **kw) 

5272 return text 

5273 

5274 def fetch_clause( 

5275 self, 

5276 select, 

5277 fetch_clause=None, 

5278 require_offset=False, 

5279 use_literal_execute_for_simple_int=False, 

5280 **kw, 

5281 ): 

5282 if fetch_clause is None: 

5283 fetch_clause = select._fetch_clause 

5284 fetch_clause_options = select._fetch_clause_options 

5285 else: 

5286 fetch_clause_options = {"percent": False, "with_ties": False} 

5287 

5288 text = "" 

5289 

5290 if select._offset_clause is not None: 

5291 offset_clause = select._offset_clause 

5292 if ( 

5293 use_literal_execute_for_simple_int 

5294 and select._simple_int_clause(offset_clause) 

5295 ): 

5296 offset_clause = offset_clause.render_literal_execute() 

5297 offset_str = self.process(offset_clause, **kw) 

5298 text += "\n OFFSET %s ROWS" % offset_str 

5299 elif require_offset: 

5300 text += "\n OFFSET 0 ROWS" 

5301 

5302 if fetch_clause is not None: 

5303 if ( 

5304 use_literal_execute_for_simple_int 

5305 and select._simple_int_clause(fetch_clause) 

5306 ): 

5307 fetch_clause = fetch_clause.render_literal_execute() 

5308 text += "\n FETCH FIRST %s%s ROWS %s" % ( 

5309 self.process(fetch_clause, **kw), 

5310 " PERCENT" if fetch_clause_options["percent"] else "", 

5311 "WITH TIES" if fetch_clause_options["with_ties"] else "ONLY", 

5312 ) 

5313 return text 

5314 

5315 def visit_table( 

5316 self, 

5317 table, 

5318 asfrom=False, 

5319 iscrud=False, 

5320 ashint=False, 

5321 fromhints=None, 

5322 use_schema=True, 

5323 from_linter=None, 

5324 ambiguous_table_name_map=None, 

5325 enclosing_alias=None, 

5326 **kwargs, 

5327 ): 

5328 if from_linter: 

5329 from_linter.froms[table] = table.fullname 

5330 

5331 if asfrom or ashint: 

5332 effective_schema = self.preparer.schema_for_object(table) 

5333 

5334 if use_schema and effective_schema: 

5335 ret = ( 

5336 self.preparer.quote_schema(effective_schema) 

5337 + "." 

5338 + self.preparer.quote(table.name) 

5339 ) 

5340 else: 

5341 ret = self.preparer.quote(table.name) 

5342 

5343 if ( 

5344 ( 

5345 enclosing_alias is None 

5346 or enclosing_alias.element is not table 

5347 ) 

5348 and not effective_schema 

5349 and ambiguous_table_name_map 

5350 and table.name in ambiguous_table_name_map 

5351 ): 

5352 anon_name = self._truncated_identifier( 

5353 "alias", ambiguous_table_name_map[table.name] 

5354 ) 

5355 

5356 ret = ret + self.get_render_as_alias_suffix( 

5357 self.preparer.format_alias(None, anon_name) 

5358 ) 

5359 

5360 if fromhints and table in fromhints: 

5361 ret = self.format_from_hint_text( 

5362 ret, table, fromhints[table], iscrud 

5363 ) 

5364 return ret 

5365 else: 

5366 return "" 

5367 

5368 def visit_join(self, join, asfrom=False, from_linter=None, **kwargs): 

5369 if from_linter: 

5370 from_linter.edges.update( 

5371 itertools.product( 

5372 _de_clone(join.left._from_objects), 

5373 _de_clone(join.right._from_objects), 

5374 ) 

5375 ) 

5376 

5377 if join.full: 

5378 join_type = " FULL OUTER JOIN " 

5379 elif join.isouter: 

5380 join_type = " LEFT OUTER JOIN " 

5381 else: 

5382 join_type = " JOIN " 

5383 return ( 

5384 join.left._compiler_dispatch( 

5385 self, asfrom=True, from_linter=from_linter, **kwargs 

5386 ) 

5387 + join_type 

5388 + join.right._compiler_dispatch( 

5389 self, asfrom=True, from_linter=from_linter, **kwargs 

5390 ) 

5391 + " ON " 

5392 # TODO: likely need asfrom=True here? 

5393 + join.onclause._compiler_dispatch( 

5394 self, from_linter=from_linter, **kwargs 

5395 ) 

5396 ) 

5397 

5398 def _setup_crud_hints(self, stmt, table_text): 

5399 dialect_hints = { 

5400 table: hint_text 

5401 for (table, dialect), hint_text in stmt._hints.items() 

5402 if dialect in ("*", self.dialect.name) 

5403 } 

5404 if stmt.table in dialect_hints: 

5405 table_text = self.format_from_hint_text( 

5406 table_text, stmt.table, dialect_hints[stmt.table], True 

5407 ) 

5408 return dialect_hints, table_text 

5409 

5410 # within the realm of "insertmanyvalues sentinel columns", 

5411 # these lookups match different kinds of Column() configurations 

5412 # to specific backend capabilities. they are broken into two 

5413 # lookups, one for autoincrement columns and the other for non 

5414 # autoincrement columns 

5415 _sentinel_col_non_autoinc_lookup = util.immutabledict( 

5416 { 

5417 _SentinelDefaultCharacterization.CLIENTSIDE: ( 

5418 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT 

5419 ), 

5420 _SentinelDefaultCharacterization.SENTINEL_DEFAULT: ( 

5421 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT 

5422 ), 

5423 _SentinelDefaultCharacterization.NONE: ( 

5424 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT 

5425 ), 

5426 _SentinelDefaultCharacterization.IDENTITY: ( 

5427 InsertmanyvaluesSentinelOpts.IDENTITY 

5428 ), 

5429 _SentinelDefaultCharacterization.SEQUENCE: ( 

5430 InsertmanyvaluesSentinelOpts.SEQUENCE 

5431 ), 

5432 } 

5433 ) 

5434 _sentinel_col_autoinc_lookup = _sentinel_col_non_autoinc_lookup.union( 

5435 { 

5436 _SentinelDefaultCharacterization.NONE: ( 

5437 InsertmanyvaluesSentinelOpts.AUTOINCREMENT 

5438 ), 

5439 } 

5440 ) 

5441 

5442 def _get_sentinel_column_for_table( 

5443 self, table: Table 

5444 ) -> Optional[Sequence[Column[Any]]]: 

5445 """given a :class:`.Table`, return a usable sentinel column or 

5446 columns for this dialect if any. 

5447 

5448 Return None if no sentinel columns could be identified, or raise an 

5449 error if a column was marked as a sentinel explicitly but isn't 

5450 compatible with this dialect. 

5451 

5452 """ 

5453 

5454 sentinel_opts = self.dialect.insertmanyvalues_implicit_sentinel 

5455 sentinel_characteristics = table._sentinel_column_characteristics 

5456 

5457 sent_cols = sentinel_characteristics.columns 

5458 

5459 if sent_cols is None: 

5460 return None 

5461 

5462 if sentinel_characteristics.is_autoinc: 

5463 bitmask = self._sentinel_col_autoinc_lookup.get( 

5464 sentinel_characteristics.default_characterization, 0 

5465 ) 

5466 else: 

5467 bitmask = self._sentinel_col_non_autoinc_lookup.get( 

5468 sentinel_characteristics.default_characterization, 0 

5469 ) 

5470 

5471 if sentinel_opts & bitmask: 

5472 return sent_cols 

5473 

5474 if sentinel_characteristics.is_explicit: 

5475 # a column was explicitly marked as insert_sentinel=True, 

5476 # however it is not compatible with this dialect. they should 

5477 # not indicate this column as a sentinel if they need to include 

5478 # this dialect. 

5479 

5480 # TODO: do we want non-primary key explicit sentinel cols 

5481 # that can gracefully degrade for some backends? 

5482 # insert_sentinel="degrade" perhaps. not for the initial release. 

5483 # I am hoping people are generally not dealing with this sentinel 

5484 # business at all. 

5485 

5486 # if is_explicit is True, there will be only one sentinel column. 

5487 

5488 raise exc.InvalidRequestError( 

5489 f"Column {sent_cols[0]} can't be explicitly " 

5490 "marked as a sentinel column when using the " 

5491 f"{self.dialect.name} dialect, as the " 

5492 "particular type of default generation on this column is " 

5493 "not currently compatible with this dialect's specific " 

5494 f"INSERT..RETURNING syntax which can receive the " 

5495 "server-generated value in " 

5496 "a deterministic way. To remove this error, remove " 

5497 "insert_sentinel=True from primary key autoincrement " 

5498 "columns; these columns are automatically used as " 

5499 "sentinels for supported dialects in any case." 

5500 ) 

5501 

5502 return None 

5503 

5504 def _deliver_insertmanyvalues_batches( 

5505 self, 

5506 statement: str, 

5507 parameters: _DBAPIMultiExecuteParams, 

5508 compiled_parameters: List[_MutableCoreSingleExecuteParams], 

5509 generic_setinputsizes: Optional[_GenericSetInputSizesType], 

5510 batch_size: int, 

5511 sort_by_parameter_order: bool, 

5512 schema_translate_map: Optional[SchemaTranslateMapType], 

5513 ) -> Iterator[_InsertManyValuesBatch]: 

5514 imv = self._insertmanyvalues 

5515 assert imv is not None 

5516 

5517 if not imv.sentinel_param_keys: 

5518 _sentinel_from_params = None 

5519 else: 

5520 _sentinel_from_params = operator.itemgetter( 

5521 *imv.sentinel_param_keys 

5522 ) 

5523 

5524 lenparams = len(parameters) 

5525 if imv.is_default_expr and not self.dialect.supports_default_metavalue: 

5526 # backend doesn't support 

5527 # INSERT INTO table (pk_col) VALUES (DEFAULT), (DEFAULT), ... 

5528 # at the moment this is basically SQL Server due to 

5529 # not being able to use DEFAULT for identity column 

5530 # just yield out that many single statements! still 

5531 # faster than a whole connection.execute() call ;) 

5532 # 

5533 # note we still are taking advantage of the fact that we know 

5534 # we are using RETURNING. The generalized approach of fetching 

5535 # cursor.lastrowid etc. still goes through the more heavyweight 

5536 # "ExecutionContext per statement" system as it isn't usable 

5537 # as a generic "RETURNING" approach 

5538 use_row_at_a_time = True 

5539 downgraded = False 

5540 elif not self.dialect.supports_multivalues_insert or ( 

5541 sort_by_parameter_order 

5542 and self._result_columns 

5543 and (imv.sentinel_columns is None or imv.includes_upsert_behaviors) 

5544 ): 

5545 # deterministic order was requested and the compiler could 

5546 # not organize sentinel columns for this dialect/statement. 

5547 # use row at a time 

5548 use_row_at_a_time = True 

5549 downgraded = True 

5550 else: 

5551 use_row_at_a_time = False 

5552 downgraded = False 

5553 

5554 if use_row_at_a_time: 

5555 for batchnum, (param, compiled_param) in enumerate( 

5556 cast( 

5557 "Sequence[Tuple[_DBAPISingleExecuteParams, _MutableCoreSingleExecuteParams]]", # noqa: E501 

5558 zip(parameters, compiled_parameters), 

5559 ), 

5560 1, 

5561 ): 

5562 yield _InsertManyValuesBatch( 

5563 statement, 

5564 param, 

5565 generic_setinputsizes, 

5566 [param], 

5567 ( 

5568 [_sentinel_from_params(compiled_param)] 

5569 if _sentinel_from_params 

5570 else [] 

5571 ), 

5572 1, 

5573 batchnum, 

5574 lenparams, 

5575 sort_by_parameter_order, 

5576 downgraded, 

5577 ) 

5578 return 

5579 

5580 if schema_translate_map: 

5581 rst = functools.partial( 

5582 self.preparer._render_schema_translates, 

5583 schema_translate_map=schema_translate_map, 

5584 ) 

5585 else: 

5586 rst = None 

5587 

5588 imv_single_values_expr = imv.single_values_expr 

5589 if rst: 

5590 imv_single_values_expr = rst(imv_single_values_expr) 

5591 

5592 executemany_values = f"({imv_single_values_expr})" 

5593 statement = statement.replace(executemany_values, "__EXECMANY_TOKEN__") 

5594 

5595 # Use optional insertmanyvalues_max_parameters 

5596 # to further shrink the batch size so that there are no more than 

5597 # insertmanyvalues_max_parameters params. 

5598 # Currently used by SQL Server, which limits statements to 2100 bound 

5599 # parameters (actually 2099). 

5600 max_params = self.dialect.insertmanyvalues_max_parameters 

5601 if max_params: 

5602 total_num_of_params = len(self.bind_names) 

5603 num_params_per_batch = len(imv.insert_crud_params) 

5604 num_params_outside_of_batch = ( 

5605 total_num_of_params - num_params_per_batch 

5606 ) 

5607 batch_size = min( 

5608 batch_size, 

5609 ( 

5610 (max_params - num_params_outside_of_batch) 

5611 // num_params_per_batch 

5612 ), 

5613 ) 

5614 

5615 batches = cast("List[Sequence[Any]]", list(parameters)) 

5616 compiled_batches = cast( 

5617 "List[Sequence[Any]]", list(compiled_parameters) 

5618 ) 

5619 

5620 processed_setinputsizes: Optional[_GenericSetInputSizesType] = None 

5621 batchnum = 1 

5622 total_batches = lenparams // batch_size + ( 

5623 1 if lenparams % batch_size else 0 

5624 ) 

5625 

5626 insert_crud_params = imv.insert_crud_params 

5627 assert insert_crud_params is not None 

5628 

5629 if rst: 

5630 insert_crud_params = [ 

5631 (col, key, rst(expr), st) 

5632 for col, key, expr, st in insert_crud_params 

5633 ] 

5634 

5635 escaped_bind_names: Mapping[str, str] 

5636 expand_pos_lower_index = expand_pos_upper_index = 0 

5637 

5638 if not self.positional: 

5639 if self.escaped_bind_names: 

5640 escaped_bind_names = self.escaped_bind_names 

5641 else: 

5642 escaped_bind_names = {} 

5643 

5644 all_keys = set(parameters[0]) 

5645 

5646 def apply_placeholders(keys, formatted): 

5647 for key in keys: 

5648 key = escaped_bind_names.get(key, key) 

5649 formatted = formatted.replace( 

5650 self.bindtemplate % {"name": key}, 

5651 self.bindtemplate 

5652 % {"name": f"{key}__EXECMANY_INDEX__"}, 

5653 ) 

5654 return formatted 

5655 

5656 if imv.embed_values_counter: 

5657 imv_values_counter = ", _IMV_VALUES_COUNTER" 

5658 else: 

5659 imv_values_counter = "" 

5660 formatted_values_clause = f"""({', '.join( 

5661 apply_placeholders(bind_keys, formatted) 

5662 for _, _, formatted, bind_keys in insert_crud_params 

5663 )}{imv_values_counter})""" 

5664 

5665 keys_to_replace = all_keys.intersection( 

5666 escaped_bind_names.get(key, key) 

5667 for _, _, _, bind_keys in insert_crud_params 

5668 for key in bind_keys 

5669 ) 

5670 base_parameters = { 

5671 key: parameters[0][key] 

5672 for key in all_keys.difference(keys_to_replace) 

5673 } 

5674 executemany_values_w_comma = "" 

5675 else: 

5676 formatted_values_clause = "" 

5677 keys_to_replace = set() 

5678 base_parameters = {} 

5679 

5680 if imv.embed_values_counter: 

5681 executemany_values_w_comma = ( 

5682 f"({imv_single_values_expr}, _IMV_VALUES_COUNTER), " 

5683 ) 

5684 else: 

5685 executemany_values_w_comma = f"({imv_single_values_expr}), " 

5686 

5687 all_names_we_will_expand: Set[str] = set() 

5688 for elem in imv.insert_crud_params: 

5689 all_names_we_will_expand.update(elem[3]) 

5690 

5691 # get the start and end position in a particular list 

5692 # of parameters where we will be doing the "expanding". 

5693 # statements can have params on either side or both sides, 

5694 # given RETURNING and CTEs 

5695 if all_names_we_will_expand: 

5696 positiontup = self.positiontup 

5697 assert positiontup is not None 

5698 

5699 all_expand_positions = { 

5700 idx 

5701 for idx, name in enumerate(positiontup) 

5702 if name in all_names_we_will_expand 

5703 } 

5704 expand_pos_lower_index = min(all_expand_positions) 

5705 expand_pos_upper_index = max(all_expand_positions) + 1 

5706 assert ( 

5707 len(all_expand_positions) 

5708 == expand_pos_upper_index - expand_pos_lower_index 

5709 ) 

5710 

5711 if self._numeric_binds: 

5712 escaped = re.escape(self._numeric_binds_identifier_char) 

5713 executemany_values_w_comma = re.sub( 

5714 rf"{escaped}\d+", "%s", executemany_values_w_comma 

5715 ) 

5716 

5717 while batches: 

5718 batch = batches[0:batch_size] 

5719 compiled_batch = compiled_batches[0:batch_size] 

5720 

5721 batches[0:batch_size] = [] 

5722 compiled_batches[0:batch_size] = [] 

5723 

5724 if batches: 

5725 current_batch_size = batch_size 

5726 else: 

5727 current_batch_size = len(batch) 

5728 

5729 if generic_setinputsizes: 

5730 # if setinputsizes is present, expand this collection to 

5731 # suit the batch length as well 

5732 # currently this will be mssql+pyodbc for internal dialects 

5733 processed_setinputsizes = [ 

5734 (new_key, len_, typ) 

5735 for new_key, len_, typ in ( 

5736 (f"{key}_{index}", len_, typ) 

5737 for index in range(current_batch_size) 

5738 for key, len_, typ in generic_setinputsizes 

5739 ) 

5740 ] 

5741 

5742 replaced_parameters: Any 

5743 if self.positional: 

5744 num_ins_params = imv.num_positional_params_counted 

5745 

5746 batch_iterator: Iterable[Sequence[Any]] 

5747 extra_params_left: Sequence[Any] 

5748 extra_params_right: Sequence[Any] 

5749 

5750 if num_ins_params == len(batch[0]): 

5751 extra_params_left = extra_params_right = () 

5752 batch_iterator = batch 

5753 else: 

5754 extra_params_left = batch[0][:expand_pos_lower_index] 

5755 extra_params_right = batch[0][expand_pos_upper_index:] 

5756 batch_iterator = ( 

5757 b[expand_pos_lower_index:expand_pos_upper_index] 

5758 for b in batch 

5759 ) 

5760 

5761 if imv.embed_values_counter: 

5762 expanded_values_string = ( 

5763 "".join( 

5764 executemany_values_w_comma.replace( 

5765 "_IMV_VALUES_COUNTER", str(i) 

5766 ) 

5767 for i, _ in enumerate(batch) 

5768 ) 

5769 )[:-2] 

5770 else: 

5771 expanded_values_string = ( 

5772 (executemany_values_w_comma * current_batch_size) 

5773 )[:-2] 

5774 

5775 if self._numeric_binds and num_ins_params > 0: 

5776 # numeric will always number the parameters inside of 

5777 # VALUES (and thus order self.positiontup) to be higher 

5778 # than non-VALUES parameters, no matter where in the 

5779 # statement those non-VALUES parameters appear (this is 

5780 # ensured in _process_numeric by numbering first all 

5781 # params that are not in _values_bindparam) 

5782 # therefore all extra params are always 

5783 # on the left side and numbered lower than the VALUES 

5784 # parameters 

5785 assert not extra_params_right 

5786 

5787 start = expand_pos_lower_index + 1 

5788 end = num_ins_params * (current_batch_size) + start 

5789 

5790 # need to format here, since statement may contain 

5791 # unescaped %, while values_string contains just (%s, %s) 

5792 positions = tuple( 

5793 f"{self._numeric_binds_identifier_char}{i}" 

5794 for i in range(start, end) 

5795 ) 

5796 expanded_values_string = expanded_values_string % positions 

5797 

5798 replaced_statement = statement.replace( 

5799 "__EXECMANY_TOKEN__", expanded_values_string 

5800 ) 

5801 

5802 replaced_parameters = tuple( 

5803 itertools.chain.from_iterable(batch_iterator) 

5804 ) 

5805 

5806 replaced_parameters = ( 

5807 extra_params_left 

5808 + replaced_parameters 

5809 + extra_params_right 

5810 ) 

5811 

5812 else: 

5813 replaced_values_clauses = [] 

5814 replaced_parameters = base_parameters.copy() 

5815 

5816 for i, param in enumerate(batch): 

5817 fmv = formatted_values_clause.replace( 

5818 "EXECMANY_INDEX__", str(i) 

5819 ) 

5820 if imv.embed_values_counter: 

5821 fmv = fmv.replace("_IMV_VALUES_COUNTER", str(i)) 

5822 

5823 replaced_values_clauses.append(fmv) 

5824 replaced_parameters.update( 

5825 {f"{key}__{i}": param[key] for key in keys_to_replace} 

5826 ) 

5827 

5828 replaced_statement = statement.replace( 

5829 "__EXECMANY_TOKEN__", 

5830 ", ".join(replaced_values_clauses), 

5831 ) 

5832 

5833 yield _InsertManyValuesBatch( 

5834 replaced_statement, 

5835 replaced_parameters, 

5836 processed_setinputsizes, 

5837 batch, 

5838 ( 

5839 [_sentinel_from_params(cb) for cb in compiled_batch] 

5840 if _sentinel_from_params 

5841 else [] 

5842 ), 

5843 current_batch_size, 

5844 batchnum, 

5845 total_batches, 

5846 sort_by_parameter_order, 

5847 False, 

5848 ) 

5849 batchnum += 1 

5850 

5851 def visit_insert( 

5852 self, insert_stmt, visited_bindparam=None, visiting_cte=None, **kw 

5853 ): 

5854 compile_state = insert_stmt._compile_state_factory( 

5855 insert_stmt, self, **kw 

5856 ) 

5857 insert_stmt = compile_state.statement 

5858 

5859 if visiting_cte is not None: 

5860 kw["visiting_cte"] = visiting_cte 

5861 toplevel = False 

5862 else: 

5863 toplevel = not self.stack 

5864 

5865 if toplevel: 

5866 self.isinsert = True 

5867 if not self.dml_compile_state: 

5868 self.dml_compile_state = compile_state 

5869 if not self.compile_state: 

5870 self.compile_state = compile_state 

5871 

5872 self.stack.append( 

5873 { 

5874 "correlate_froms": set(), 

5875 "asfrom_froms": set(), 

5876 "selectable": insert_stmt, 

5877 } 

5878 ) 

5879 

5880 counted_bindparam = 0 

5881 

5882 # reset any incoming "visited_bindparam" collection 

5883 visited_bindparam = None 

5884 

5885 # for positional, insertmanyvalues needs to know how many 

5886 # bound parameters are in the VALUES sequence; there's no simple 

5887 # rule because default expressions etc. can have zero or more 

5888 # params inside them. After multiple attempts to figure this out, 

5889 # this very simplistic "count after" works and is 

5890 # likely the least amount of callcounts, though looks clumsy 

5891 if self.positional and visiting_cte is None: 

5892 # if we are inside a CTE, don't count parameters 

5893 # here since they won't be for insertmanyvalues. keep 

5894 # visited_bindparam at None so no counting happens. 

5895 # see #9173 

5896 visited_bindparam = [] 

5897 

5898 crud_params_struct = crud._get_crud_params( 

5899 self, 

5900 insert_stmt, 

5901 compile_state, 

5902 toplevel, 

5903 visited_bindparam=visited_bindparam, 

5904 **kw, 

5905 ) 

5906 

5907 if self.positional and visited_bindparam is not None: 

5908 counted_bindparam = len(visited_bindparam) 

5909 if self._numeric_binds: 

5910 if self._values_bindparam is not None: 

5911 self._values_bindparam += visited_bindparam 

5912 else: 

5913 self._values_bindparam = visited_bindparam 

5914 

5915 crud_params_single = crud_params_struct.single_params 

5916 

5917 if ( 

5918 not crud_params_single 

5919 and not self.dialect.supports_default_values 

5920 and not self.dialect.supports_default_metavalue 

5921 and not self.dialect.supports_empty_insert 

5922 ): 

5923 raise exc.CompileError( 

5924 "The '%s' dialect with current database " 

5925 "version settings does not support empty " 

5926 "inserts." % self.dialect.name 

5927 ) 

5928 

5929 if compile_state._has_multi_parameters: 

5930 if not self.dialect.supports_multivalues_insert: 

5931 raise exc.CompileError( 

5932 "The '%s' dialect with current database " 

5933 "version settings does not support " 

5934 "in-place multirow inserts." % self.dialect.name 

5935 ) 

5936 elif ( 

5937 self.implicit_returning or insert_stmt._returning 

5938 ) and insert_stmt._sort_by_parameter_order: 

5939 raise exc.CompileError( 

5940 "RETURNING cannot be deterministically sorted when " 

5941 "using an INSERT which includes multi-row values()." 

5942 ) 

5943 crud_params_single = crud_params_struct.single_params 

5944 else: 

5945 crud_params_single = crud_params_struct.single_params 

5946 

5947 preparer = self.preparer 

5948 supports_default_values = self.dialect.supports_default_values 

5949 

5950 text = "INSERT " 

5951 

5952 if insert_stmt._prefixes: 

5953 text += self._generate_prefixes( 

5954 insert_stmt, insert_stmt._prefixes, **kw 

5955 ) 

5956 

5957 text += "INTO " 

5958 table_text = preparer.format_table(insert_stmt.table) 

5959 

5960 if insert_stmt._hints: 

5961 _, table_text = self._setup_crud_hints(insert_stmt, table_text) 

5962 

5963 if insert_stmt._independent_ctes: 

5964 self._dispatch_independent_ctes(insert_stmt, kw) 

5965 

5966 text += table_text 

5967 

5968 if crud_params_single or not supports_default_values: 

5969 text += " (%s)" % ", ".join( 

5970 [expr for _, expr, _, _ in crud_params_single] 

5971 ) 

5972 

5973 # look for insertmanyvalues attributes that would have been configured 

5974 # by crud.py as it scanned through the columns to be part of the 

5975 # INSERT 

5976 use_insertmanyvalues = crud_params_struct.use_insertmanyvalues 

5977 named_sentinel_params: Optional[Sequence[str]] = None 

5978 add_sentinel_cols = None 

5979 implicit_sentinel = False 

5980 

5981 returning_cols = self.implicit_returning or insert_stmt._returning 

5982 if returning_cols: 

5983 add_sentinel_cols = crud_params_struct.use_sentinel_columns 

5984 if add_sentinel_cols is not None: 

5985 assert use_insertmanyvalues 

5986 

5987 # search for the sentinel column explicitly present 

5988 # in the INSERT columns list, and additionally check that 

5989 # this column has a bound parameter name set up that's in the 

5990 # parameter list. If both of these cases are present, it means 

5991 # we will have a client side value for the sentinel in each 

5992 # parameter set. 

5993 

5994 _params_by_col = { 

5995 col: param_names 

5996 for col, _, _, param_names in crud_params_single 

5997 } 

5998 named_sentinel_params = [] 

5999 for _add_sentinel_col in add_sentinel_cols: 

6000 if _add_sentinel_col not in _params_by_col: 

6001 named_sentinel_params = None 

6002 break 

6003 param_name = self._within_exec_param_key_getter( 

6004 _add_sentinel_col 

6005 ) 

6006 if param_name not in _params_by_col[_add_sentinel_col]: 

6007 named_sentinel_params = None 

6008 break 

6009 named_sentinel_params.append(param_name) 

6010 

6011 if named_sentinel_params is None: 

6012 # if we are not going to have a client side value for 

6013 # the sentinel in the parameter set, that means it's 

6014 # an autoincrement, an IDENTITY, or a server-side SQL 

6015 # expression like nextval('seqname'). So this is 

6016 # an "implicit" sentinel; we will look for it in 

6017 # RETURNING 

6018 # only, and then sort on it. For this case on PG, 

6019 # SQL Server we have to use a special INSERT form 

6020 # that guarantees the server side function lines up with 

6021 # the entries in the VALUES. 

6022 if ( 

6023 self.dialect.insertmanyvalues_implicit_sentinel 

6024 & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT 

6025 ): 

6026 implicit_sentinel = True 

6027 else: 

6028 # here, we are not using a sentinel at all 

6029 # and we are likely the SQLite dialect. 

6030 # The first add_sentinel_col that we have should not 

6031 # be marked as "insert_sentinel=True". if it was, 

6032 # an error should have been raised in 

6033 # _get_sentinel_column_for_table. 

6034 assert not add_sentinel_cols[0]._insert_sentinel, ( 

6035 "sentinel selection rules should have prevented " 

6036 "us from getting here for this dialect" 

6037 ) 

6038 

6039 # always put the sentinel columns last. even if they are 

6040 # in the returning list already, they will be there twice 

6041 # then. 

6042 returning_cols = list(returning_cols) + list(add_sentinel_cols) 

6043 

6044 returning_clause = self.returning_clause( 

6045 insert_stmt, 

6046 returning_cols, 

6047 populate_result_map=toplevel, 

6048 ) 

6049 

6050 if self.returning_precedes_values: 

6051 text += " " + returning_clause 

6052 

6053 else: 

6054 returning_clause = None 

6055 

6056 if insert_stmt.select is not None: 

6057 # placed here by crud.py 

6058 select_text = self.process( 

6059 self.stack[-1]["insert_from_select"], insert_into=True, **kw 

6060 ) 

6061 

6062 if self.ctes and self.dialect.cte_follows_insert: 

6063 nesting_level = len(self.stack) if not toplevel else None 

6064 text += " %s%s" % ( 

6065 self._render_cte_clause( 

6066 nesting_level=nesting_level, 

6067 include_following_stack=True, 

6068 ), 

6069 select_text, 

6070 ) 

6071 else: 

6072 text += " %s" % select_text 

6073 elif not crud_params_single and supports_default_values: 

6074 text += " DEFAULT VALUES" 

6075 if use_insertmanyvalues: 

6076 self._insertmanyvalues = _InsertManyValues( 

6077 True, 

6078 self.dialect.default_metavalue_token, 

6079 crud_params_single, 

6080 counted_bindparam, 

6081 sort_by_parameter_order=( 

6082 insert_stmt._sort_by_parameter_order 

6083 ), 

6084 includes_upsert_behaviors=( 

6085 insert_stmt._post_values_clause is not None 

6086 ), 

6087 sentinel_columns=add_sentinel_cols, 

6088 num_sentinel_columns=( 

6089 len(add_sentinel_cols) if add_sentinel_cols else 0 

6090 ), 

6091 implicit_sentinel=implicit_sentinel, 

6092 ) 

6093 elif compile_state._has_multi_parameters: 

6094 text += " VALUES %s" % ( 

6095 ", ".join( 

6096 "(%s)" 

6097 % (", ".join(value for _, _, value, _ in crud_param_set)) 

6098 for crud_param_set in crud_params_struct.all_multi_params 

6099 ), 

6100 ) 

6101 elif use_insertmanyvalues: 

6102 if ( 

6103 implicit_sentinel 

6104 and ( 

6105 self.dialect.insertmanyvalues_implicit_sentinel 

6106 & InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT 

6107 ) 

6108 # this is checking if we have 

6109 # INSERT INTO table (id) VALUES (DEFAULT). 

6110 and not (crud_params_struct.is_default_metavalue_only) 

6111 ): 

6112 # if we have a sentinel column that is server generated, 

6113 # then for selected backends render the VALUES list as a 

6114 # subquery. This is the orderable form supported by 

6115 # PostgreSQL and in fewer cases SQL Server 

6116 embed_sentinel_value = True 

6117 

6118 render_bind_casts = ( 

6119 self.dialect.insertmanyvalues_implicit_sentinel 

6120 & InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS 

6121 ) 

6122 

6123 add_sentinel_set = add_sentinel_cols or () 

6124 

6125 insert_single_values_expr = ", ".join( 

6126 [ 

6127 value 

6128 for col, _, value, _ in crud_params_single 

6129 if col not in add_sentinel_set 

6130 ] 

6131 ) 

6132 

6133 colnames = ", ".join( 

6134 f"p{i}" 

6135 for i, cp in enumerate(crud_params_single) 

6136 if cp[0] not in add_sentinel_set 

6137 ) 

6138 

6139 if render_bind_casts: 

6140 # render casts for the SELECT list. For PG, we are 

6141 # already rendering bind casts in the parameter list, 

6142 # selectively for the more "tricky" types like ARRAY. 

6143 # however, even for the "easy" types, if the parameter 

6144 # is NULL for every entry, PG gives up and says 

6145 # "it must be TEXT", which fails for other easy types 

6146 # like ints. So we cast on this side too. 

6147 colnames_w_cast = ", ".join( 

6148 ( 

6149 self.render_bind_cast( 

6150 col.type, 

6151 col.type._unwrapped_dialect_impl(self.dialect), 

6152 f"p{i}", 

6153 ) 

6154 if col not in add_sentinel_set 

6155 else expr 

6156 ) 

6157 for i, (col, _, expr, _) in enumerate( 

6158 crud_params_single 

6159 ) 

6160 ) 

6161 else: 

6162 colnames_w_cast = ", ".join( 

6163 (f"p{i}" if col not in add_sentinel_set else expr) 

6164 for i, (col, _, expr, _) in enumerate( 

6165 crud_params_single 

6166 ) 

6167 ) 

6168 

6169 insert_crud_params = [ 

6170 elem 

6171 for elem in crud_params_single 

6172 if elem[0] not in add_sentinel_set 

6173 ] 

6174 

6175 text += ( 

6176 f" SELECT {colnames_w_cast} FROM " 

6177 f"(VALUES ({insert_single_values_expr})) " 

6178 f"AS imp_sen({colnames}, sen_counter) " 

6179 "ORDER BY sen_counter" 

6180 ) 

6181 

6182 else: 

6183 # otherwise, if no sentinel or backend doesn't support 

6184 # orderable subquery form, use a plain VALUES list 

6185 embed_sentinel_value = False 

6186 insert_crud_params = crud_params_single 

6187 insert_single_values_expr = ", ".join( 

6188 [value for _, _, value, _ in crud_params_single] 

6189 ) 

6190 

6191 text += f" VALUES ({insert_single_values_expr})" 

6192 

6193 self._insertmanyvalues = _InsertManyValues( 

6194 is_default_expr=False, 

6195 single_values_expr=insert_single_values_expr, 

6196 insert_crud_params=insert_crud_params, 

6197 num_positional_params_counted=counted_bindparam, 

6198 sort_by_parameter_order=(insert_stmt._sort_by_parameter_order), 

6199 includes_upsert_behaviors=( 

6200 insert_stmt._post_values_clause is not None 

6201 ), 

6202 sentinel_columns=add_sentinel_cols, 

6203 num_sentinel_columns=( 

6204 len(add_sentinel_cols) if add_sentinel_cols else 0 

6205 ), 

6206 sentinel_param_keys=named_sentinel_params, 

6207 implicit_sentinel=implicit_sentinel, 

6208 embed_values_counter=embed_sentinel_value, 

6209 ) 

6210 

6211 else: 

6212 insert_single_values_expr = ", ".join( 

6213 [value for _, _, value, _ in crud_params_single] 

6214 ) 

6215 

6216 text += f" VALUES ({insert_single_values_expr})" 

6217 

6218 if insert_stmt._post_values_clause is not None: 

6219 post_values_clause = self.process( 

6220 insert_stmt._post_values_clause, **kw 

6221 ) 

6222 if post_values_clause: 

6223 text += " " + post_values_clause 

6224 

6225 if returning_clause and not self.returning_precedes_values: 

6226 text += " " + returning_clause 

6227 

6228 if self.ctes and not self.dialect.cte_follows_insert: 

6229 nesting_level = len(self.stack) if not toplevel else None 

6230 text = ( 

6231 self._render_cte_clause( 

6232 nesting_level=nesting_level, 

6233 include_following_stack=True, 

6234 ) 

6235 + text 

6236 ) 

6237 

6238 self.stack.pop(-1) 

6239 

6240 return text 

6241 

6242 def update_limit_clause(self, update_stmt): 

6243 """Provide a hook for MySQL to add LIMIT to the UPDATE""" 

6244 return None 

6245 

6246 def delete_limit_clause(self, delete_stmt): 

6247 """Provide a hook for MySQL to add LIMIT to the DELETE""" 

6248 return None 

6249 

6250 def update_tables_clause(self, update_stmt, from_table, extra_froms, **kw): 

6251 """Provide a hook to override the initial table clause 

6252 in an UPDATE statement. 

6253 

6254 MySQL overrides this. 

6255 

6256 """ 

6257 kw["asfrom"] = True 

6258 return from_table._compiler_dispatch(self, iscrud=True, **kw) 

6259 

6260 def update_from_clause( 

6261 self, update_stmt, from_table, extra_froms, from_hints, **kw 

6262 ): 

6263 """Provide a hook to override the generation of an 

6264 UPDATE..FROM clause. 

6265 

6266 MySQL and MSSQL override this. 

6267 

6268 """ 

6269 raise NotImplementedError( 

6270 "This backend does not support multiple-table " 

6271 "criteria within UPDATE" 

6272 ) 

6273 

6274 def visit_update( 

6275 self, 

6276 update_stmt: Update, 

6277 visiting_cte: Optional[CTE] = None, 

6278 **kw: Any, 

6279 ) -> str: 

6280 compile_state = update_stmt._compile_state_factory( 

6281 update_stmt, self, **kw 

6282 ) 

6283 if TYPE_CHECKING: 

6284 assert isinstance(compile_state, UpdateDMLState) 

6285 update_stmt = compile_state.statement # type: ignore[assignment] 

6286 

6287 if visiting_cte is not None: 

6288 kw["visiting_cte"] = visiting_cte 

6289 toplevel = False 

6290 else: 

6291 toplevel = not self.stack 

6292 

6293 if toplevel: 

6294 self.isupdate = True 

6295 if not self.dml_compile_state: 

6296 self.dml_compile_state = compile_state 

6297 if not self.compile_state: 

6298 self.compile_state = compile_state 

6299 

6300 if self.linting & COLLECT_CARTESIAN_PRODUCTS: 

6301 from_linter = FromLinter({}, set()) 

6302 warn_linting = self.linting & WARN_LINTING 

6303 if toplevel: 

6304 self.from_linter = from_linter 

6305 else: 

6306 from_linter = None 

6307 warn_linting = False 

6308 

6309 extra_froms = compile_state._extra_froms 

6310 is_multitable = bool(extra_froms) 

6311 

6312 if is_multitable: 

6313 # main table might be a JOIN 

6314 main_froms = set(_from_objects(update_stmt.table)) 

6315 render_extra_froms = [ 

6316 f for f in extra_froms if f not in main_froms 

6317 ] 

6318 correlate_froms = main_froms.union(extra_froms) 

6319 else: 

6320 render_extra_froms = [] 

6321 correlate_froms = {update_stmt.table} 

6322 

6323 self.stack.append( 

6324 { 

6325 "correlate_froms": correlate_froms, 

6326 "asfrom_froms": correlate_froms, 

6327 "selectable": update_stmt, 

6328 } 

6329 ) 

6330 

6331 text = "UPDATE " 

6332 

6333 if update_stmt._prefixes: 

6334 text += self._generate_prefixes( 

6335 update_stmt, update_stmt._prefixes, **kw 

6336 ) 

6337 

6338 table_text = self.update_tables_clause( 

6339 update_stmt, 

6340 update_stmt.table, 

6341 render_extra_froms, 

6342 from_linter=from_linter, 

6343 **kw, 

6344 ) 

6345 crud_params_struct = crud._get_crud_params( 

6346 self, update_stmt, compile_state, toplevel, **kw 

6347 ) 

6348 crud_params = crud_params_struct.single_params 

6349 

6350 if update_stmt._hints: 

6351 dialect_hints, table_text = self._setup_crud_hints( 

6352 update_stmt, table_text 

6353 ) 

6354 else: 

6355 dialect_hints = None 

6356 

6357 if update_stmt._independent_ctes: 

6358 self._dispatch_independent_ctes(update_stmt, kw) 

6359 

6360 text += table_text 

6361 

6362 text += " SET " 

6363 text += ", ".join( 

6364 expr + "=" + value 

6365 for _, expr, value, _ in cast( 

6366 "List[Tuple[Any, str, str, Any]]", crud_params 

6367 ) 

6368 ) 

6369 

6370 if self.implicit_returning or update_stmt._returning: 

6371 if self.returning_precedes_values: 

6372 text += " " + self.returning_clause( 

6373 update_stmt, 

6374 self.implicit_returning or update_stmt._returning, 

6375 populate_result_map=toplevel, 

6376 ) 

6377 

6378 if extra_froms: 

6379 extra_from_text = self.update_from_clause( 

6380 update_stmt, 

6381 update_stmt.table, 

6382 render_extra_froms, 

6383 dialect_hints, 

6384 from_linter=from_linter, 

6385 **kw, 

6386 ) 

6387 if extra_from_text: 

6388 text += " " + extra_from_text 

6389 

6390 if update_stmt._where_criteria: 

6391 t = self._generate_delimited_and_list( 

6392 update_stmt._where_criteria, from_linter=from_linter, **kw 

6393 ) 

6394 if t: 

6395 text += " WHERE " + t 

6396 

6397 limit_clause = self.update_limit_clause(update_stmt) 

6398 if limit_clause: 

6399 text += " " + limit_clause 

6400 

6401 if ( 

6402 self.implicit_returning or update_stmt._returning 

6403 ) and not self.returning_precedes_values: 

6404 text += " " + self.returning_clause( 

6405 update_stmt, 

6406 self.implicit_returning or update_stmt._returning, 

6407 populate_result_map=toplevel, 

6408 ) 

6409 

6410 if self.ctes: 

6411 nesting_level = len(self.stack) if not toplevel else None 

6412 text = self._render_cte_clause(nesting_level=nesting_level) + text 

6413 

6414 if warn_linting: 

6415 assert from_linter is not None 

6416 from_linter.warn(stmt_type="UPDATE") 

6417 

6418 self.stack.pop(-1) 

6419 

6420 return text # type: ignore[no-any-return] 

6421 

6422 def delete_extra_from_clause( 

6423 self, delete_stmt, from_table, extra_froms, from_hints, **kw 

6424 ): 

6425 """Provide a hook to override the generation of an 

6426 DELETE..FROM clause. 

6427 

6428 This can be used to implement DELETE..USING for example. 

6429 

6430 MySQL and MSSQL override this. 

6431 

6432 """ 

6433 raise NotImplementedError( 

6434 "This backend does not support multiple-table " 

6435 "criteria within DELETE" 

6436 ) 

6437 

6438 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw): 

6439 return from_table._compiler_dispatch( 

6440 self, asfrom=True, iscrud=True, **kw 

6441 ) 

6442 

6443 def visit_delete(self, delete_stmt, visiting_cte=None, **kw): 

6444 compile_state = delete_stmt._compile_state_factory( 

6445 delete_stmt, self, **kw 

6446 ) 

6447 delete_stmt = compile_state.statement 

6448 

6449 if visiting_cte is not None: 

6450 kw["visiting_cte"] = visiting_cte 

6451 toplevel = False 

6452 else: 

6453 toplevel = not self.stack 

6454 

6455 if toplevel: 

6456 self.isdelete = True 

6457 if not self.dml_compile_state: 

6458 self.dml_compile_state = compile_state 

6459 if not self.compile_state: 

6460 self.compile_state = compile_state 

6461 

6462 if self.linting & COLLECT_CARTESIAN_PRODUCTS: 

6463 from_linter = FromLinter({}, set()) 

6464 warn_linting = self.linting & WARN_LINTING 

6465 if toplevel: 

6466 self.from_linter = from_linter 

6467 else: 

6468 from_linter = None 

6469 warn_linting = False 

6470 

6471 extra_froms = compile_state._extra_froms 

6472 

6473 correlate_froms = {delete_stmt.table}.union(extra_froms) 

6474 self.stack.append( 

6475 { 

6476 "correlate_froms": correlate_froms, 

6477 "asfrom_froms": correlate_froms, 

6478 "selectable": delete_stmt, 

6479 } 

6480 ) 

6481 

6482 text = "DELETE " 

6483 

6484 if delete_stmt._prefixes: 

6485 text += self._generate_prefixes( 

6486 delete_stmt, delete_stmt._prefixes, **kw 

6487 ) 

6488 

6489 text += "FROM " 

6490 

6491 try: 

6492 table_text = self.delete_table_clause( 

6493 delete_stmt, 

6494 delete_stmt.table, 

6495 extra_froms, 

6496 from_linter=from_linter, 

6497 ) 

6498 except TypeError: 

6499 # anticipate 3rd party dialects that don't include **kw 

6500 # TODO: remove in 2.1 

6501 table_text = self.delete_table_clause( 

6502 delete_stmt, delete_stmt.table, extra_froms 

6503 ) 

6504 if from_linter: 

6505 _ = self.process(delete_stmt.table, from_linter=from_linter) 

6506 

6507 crud._get_crud_params(self, delete_stmt, compile_state, toplevel, **kw) 

6508 

6509 if delete_stmt._hints: 

6510 dialect_hints, table_text = self._setup_crud_hints( 

6511 delete_stmt, table_text 

6512 ) 

6513 else: 

6514 dialect_hints = None 

6515 

6516 if delete_stmt._independent_ctes: 

6517 self._dispatch_independent_ctes(delete_stmt, kw) 

6518 

6519 text += table_text 

6520 

6521 if ( 

6522 self.implicit_returning or delete_stmt._returning 

6523 ) and self.returning_precedes_values: 

6524 text += " " + self.returning_clause( 

6525 delete_stmt, 

6526 self.implicit_returning or delete_stmt._returning, 

6527 populate_result_map=toplevel, 

6528 ) 

6529 

6530 if extra_froms: 

6531 extra_from_text = self.delete_extra_from_clause( 

6532 delete_stmt, 

6533 delete_stmt.table, 

6534 extra_froms, 

6535 dialect_hints, 

6536 from_linter=from_linter, 

6537 **kw, 

6538 ) 

6539 if extra_from_text: 

6540 text += " " + extra_from_text 

6541 

6542 if delete_stmt._where_criteria: 

6543 t = self._generate_delimited_and_list( 

6544 delete_stmt._where_criteria, from_linter=from_linter, **kw 

6545 ) 

6546 if t: 

6547 text += " WHERE " + t 

6548 

6549 limit_clause = self.delete_limit_clause(delete_stmt) 

6550 if limit_clause: 

6551 text += " " + limit_clause 

6552 

6553 if ( 

6554 self.implicit_returning or delete_stmt._returning 

6555 ) and not self.returning_precedes_values: 

6556 text += " " + self.returning_clause( 

6557 delete_stmt, 

6558 self.implicit_returning or delete_stmt._returning, 

6559 populate_result_map=toplevel, 

6560 ) 

6561 

6562 if self.ctes: 

6563 nesting_level = len(self.stack) if not toplevel else None 

6564 text = self._render_cte_clause(nesting_level=nesting_level) + text 

6565 

6566 if warn_linting: 

6567 assert from_linter is not None 

6568 from_linter.warn(stmt_type="DELETE") 

6569 

6570 self.stack.pop(-1) 

6571 

6572 return text 

6573 

6574 def visit_savepoint(self, savepoint_stmt, **kw): 

6575 return "SAVEPOINT %s" % self.preparer.format_savepoint(savepoint_stmt) 

6576 

6577 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw): 

6578 return "ROLLBACK TO SAVEPOINT %s" % self.preparer.format_savepoint( 

6579 savepoint_stmt 

6580 ) 

6581 

6582 def visit_release_savepoint(self, savepoint_stmt, **kw): 

6583 return "RELEASE SAVEPOINT %s" % self.preparer.format_savepoint( 

6584 savepoint_stmt 

6585 ) 

6586 

6587 

6588class StrSQLCompiler(SQLCompiler): 

6589 """A :class:`.SQLCompiler` subclass which allows a small selection 

6590 of non-standard SQL features to render into a string value. 

6591 

6592 The :class:`.StrSQLCompiler` is invoked whenever a Core expression 

6593 element is directly stringified without calling upon the 

6594 :meth:`_expression.ClauseElement.compile` method. 

6595 It can render a limited set 

6596 of non-standard SQL constructs to assist in basic stringification, 

6597 however for more substantial custom or dialect-specific SQL constructs, 

6598 it will be necessary to make use of 

6599 :meth:`_expression.ClauseElement.compile` 

6600 directly. 

6601 

6602 .. seealso:: 

6603 

6604 :ref:`faq_sql_expression_string` 

6605 

6606 """ 

6607 

6608 def _fallback_column_name(self, column): 

6609 return "<name unknown>" 

6610 

6611 @util.preload_module("sqlalchemy.engine.url") 

6612 def visit_unsupported_compilation(self, element, err, **kw): 

6613 if element.stringify_dialect != "default": 

6614 url = util.preloaded.engine_url 

6615 dialect = url.URL.create(element.stringify_dialect).get_dialect()() 

6616 

6617 compiler = dialect.statement_compiler( 

6618 dialect, None, _supporting_against=self 

6619 ) 

6620 if not isinstance(compiler, StrSQLCompiler): 

6621 return compiler.process(element, **kw) 

6622 

6623 return super().visit_unsupported_compilation(element, err) 

6624 

6625 def visit_getitem_binary(self, binary, operator, **kw): 

6626 return "%s[%s]" % ( 

6627 self.process(binary.left, **kw), 

6628 self.process(binary.right, **kw), 

6629 ) 

6630 

6631 def visit_json_getitem_op_binary(self, binary, operator, **kw): 

6632 return self.visit_getitem_binary(binary, operator, **kw) 

6633 

6634 def visit_json_path_getitem_op_binary(self, binary, operator, **kw): 

6635 return self.visit_getitem_binary(binary, operator, **kw) 

6636 

6637 def visit_sequence(self, sequence, **kw): 

6638 return ( 

6639 f"<next sequence value: {self.preparer.format_sequence(sequence)}>" 

6640 ) 

6641 

6642 def returning_clause( 

6643 self, 

6644 stmt: UpdateBase, 

6645 returning_cols: Sequence[_ColumnsClauseElement], 

6646 *, 

6647 populate_result_map: bool, 

6648 **kw: Any, 

6649 ) -> str: 

6650 columns = [ 

6651 self._label_select_column(None, c, True, False, {}) 

6652 for c in base._select_iterables(returning_cols) 

6653 ] 

6654 return "RETURNING " + ", ".join(columns) 

6655 

6656 def update_from_clause( 

6657 self, update_stmt, from_table, extra_froms, from_hints, **kw 

6658 ): 

6659 kw["asfrom"] = True 

6660 return "FROM " + ", ".join( 

6661 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

6662 for t in extra_froms 

6663 ) 

6664 

6665 def delete_extra_from_clause( 

6666 self, delete_stmt, from_table, extra_froms, from_hints, **kw 

6667 ): 

6668 kw["asfrom"] = True 

6669 return ", " + ", ".join( 

6670 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

6671 for t in extra_froms 

6672 ) 

6673 

6674 def visit_empty_set_expr(self, element_types, **kw): 

6675 return "SELECT 1 WHERE 1!=1" 

6676 

6677 def get_from_hint_text(self, table, text): 

6678 return "[%s]" % text 

6679 

6680 def visit_regexp_match_op_binary(self, binary, operator, **kw): 

6681 return self._generate_generic_binary(binary, " <regexp> ", **kw) 

6682 

6683 def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 

6684 return self._generate_generic_binary(binary, " <not regexp> ", **kw) 

6685 

6686 def visit_regexp_replace_op_binary(self, binary, operator, **kw): 

6687 return "<regexp replace>(%s, %s)" % ( 

6688 binary.left._compiler_dispatch(self, **kw), 

6689 binary.right._compiler_dispatch(self, **kw), 

6690 ) 

6691 

6692 def visit_try_cast(self, cast, **kwargs): 

6693 return "TRY_CAST(%s AS %s)" % ( 

6694 cast.clause._compiler_dispatch(self, **kwargs), 

6695 cast.typeclause._compiler_dispatch(self, **kwargs), 

6696 ) 

6697 

6698 

6699class DDLCompiler(Compiled): 

6700 is_ddl = True 

6701 

6702 if TYPE_CHECKING: 

6703 

6704 def __init__( 

6705 self, 

6706 dialect: Dialect, 

6707 statement: ExecutableDDLElement, 

6708 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

6709 render_schema_translate: bool = ..., 

6710 compile_kwargs: Mapping[str, Any] = ..., 

6711 ): ... 

6712 

6713 @util.ro_memoized_property 

6714 def sql_compiler(self) -> SQLCompiler: 

6715 return self.dialect.statement_compiler( 

6716 self.dialect, None, schema_translate_map=self.schema_translate_map 

6717 ) 

6718 

6719 @util.memoized_property 

6720 def type_compiler(self): 

6721 return self.dialect.type_compiler_instance 

6722 

6723 def construct_params( 

6724 self, 

6725 params: Optional[_CoreSingleExecuteParams] = None, 

6726 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None, 

6727 escape_names: bool = True, 

6728 ) -> Optional[_MutableCoreSingleExecuteParams]: 

6729 return None 

6730 

6731 def visit_ddl(self, ddl, **kwargs): 

6732 # table events can substitute table and schema name 

6733 context = ddl.context 

6734 if isinstance(ddl.target, schema.Table): 

6735 context = context.copy() 

6736 

6737 preparer = self.preparer 

6738 path = preparer.format_table_seq(ddl.target) 

6739 if len(path) == 1: 

6740 table, sch = path[0], "" 

6741 else: 

6742 table, sch = path[-1], path[0] 

6743 

6744 context.setdefault("table", table) 

6745 context.setdefault("schema", sch) 

6746 context.setdefault("fullname", preparer.format_table(ddl.target)) 

6747 

6748 return self.sql_compiler.post_process_text(ddl.statement % context) 

6749 

6750 def visit_create_schema(self, create, **kw): 

6751 text = "CREATE SCHEMA " 

6752 if create.if_not_exists: 

6753 text += "IF NOT EXISTS " 

6754 return text + self.preparer.format_schema(create.element) 

6755 

6756 def visit_drop_schema(self, drop, **kw): 

6757 text = "DROP SCHEMA " 

6758 if drop.if_exists: 

6759 text += "IF EXISTS " 

6760 text += self.preparer.format_schema(drop.element) 

6761 if drop.cascade: 

6762 text += " CASCADE" 

6763 return text 

6764 

6765 def visit_create_table(self, create, **kw): 

6766 table = create.element 

6767 preparer = self.preparer 

6768 

6769 text = "\nCREATE " 

6770 if table._prefixes: 

6771 text += " ".join(table._prefixes) + " " 

6772 

6773 text += "TABLE " 

6774 if create.if_not_exists: 

6775 text += "IF NOT EXISTS " 

6776 

6777 text += preparer.format_table(table) + " " 

6778 

6779 create_table_suffix = self.create_table_suffix(table) 

6780 if create_table_suffix: 

6781 text += create_table_suffix + " " 

6782 

6783 text += "(" 

6784 

6785 separator = "\n" 

6786 

6787 # if only one primary key, specify it along with the column 

6788 first_pk = False 

6789 for create_column in create.columns: 

6790 column = create_column.element 

6791 try: 

6792 processed = self.process( 

6793 create_column, first_pk=column.primary_key and not first_pk 

6794 ) 

6795 if processed is not None: 

6796 text += separator 

6797 separator = ", \n" 

6798 text += "\t" + processed 

6799 if column.primary_key: 

6800 first_pk = True 

6801 except exc.CompileError as ce: 

6802 raise exc.CompileError( 

6803 "(in table '%s', column '%s'): %s" 

6804 % (table.description, column.name, ce.args[0]) 

6805 ) from ce 

6806 

6807 const = self.create_table_constraints( 

6808 table, 

6809 _include_foreign_key_constraints=create.include_foreign_key_constraints, # noqa 

6810 ) 

6811 if const: 

6812 text += separator + "\t" + const 

6813 

6814 text += "\n)%s\n\n" % self.post_create_table(table) 

6815 return text 

6816 

6817 def visit_create_column(self, create, first_pk=False, **kw): 

6818 column = create.element 

6819 

6820 if column.system: 

6821 return None 

6822 

6823 text = self.get_column_specification(column, first_pk=first_pk) 

6824 const = " ".join( 

6825 self.process(constraint) for constraint in column.constraints 

6826 ) 

6827 if const: 

6828 text += " " + const 

6829 

6830 return text 

6831 

6832 def create_table_constraints( 

6833 self, table, _include_foreign_key_constraints=None, **kw 

6834 ): 

6835 # On some DB order is significant: visit PK first, then the 

6836 # other constraints (engine.ReflectionTest.testbasic failed on FB2) 

6837 constraints = [] 

6838 if table.primary_key: 

6839 constraints.append(table.primary_key) 

6840 

6841 all_fkcs = table.foreign_key_constraints 

6842 if _include_foreign_key_constraints is not None: 

6843 omit_fkcs = all_fkcs.difference(_include_foreign_key_constraints) 

6844 else: 

6845 omit_fkcs = set() 

6846 

6847 constraints.extend( 

6848 [ 

6849 c 

6850 for c in table._sorted_constraints 

6851 if c is not table.primary_key and c not in omit_fkcs 

6852 ] 

6853 ) 

6854 

6855 return ", \n\t".join( 

6856 p 

6857 for p in ( 

6858 self.process(constraint) 

6859 for constraint in constraints 

6860 if (constraint._should_create_for_compiler(self)) 

6861 and ( 

6862 not self.dialect.supports_alter 

6863 or not getattr(constraint, "use_alter", False) 

6864 ) 

6865 ) 

6866 if p is not None 

6867 ) 

6868 

6869 def visit_drop_table(self, drop, **kw): 

6870 text = "\nDROP TABLE " 

6871 if drop.if_exists: 

6872 text += "IF EXISTS " 

6873 return text + self.preparer.format_table(drop.element) 

6874 

6875 def visit_drop_view(self, drop, **kw): 

6876 return "\nDROP VIEW " + self.preparer.format_table(drop.element) 

6877 

6878 def _verify_index_table(self, index: Index) -> None: 

6879 if index.table is None: 

6880 raise exc.CompileError( 

6881 "Index '%s' is not associated with any table." % index.name 

6882 ) 

6883 

6884 def visit_create_index( 

6885 self, create, include_schema=False, include_table_schema=True, **kw 

6886 ): 

6887 index = create.element 

6888 self._verify_index_table(index) 

6889 preparer = self.preparer 

6890 text = "CREATE " 

6891 if index.unique: 

6892 text += "UNIQUE " 

6893 if index.name is None: 

6894 raise exc.CompileError( 

6895 "CREATE INDEX requires that the index have a name" 

6896 ) 

6897 

6898 text += "INDEX " 

6899 if create.if_not_exists: 

6900 text += "IF NOT EXISTS " 

6901 

6902 text += "%s ON %s (%s)" % ( 

6903 self._prepared_index_name(index, include_schema=include_schema), 

6904 preparer.format_table( 

6905 index.table, use_schema=include_table_schema 

6906 ), 

6907 ", ".join( 

6908 self.sql_compiler.process( 

6909 expr, include_table=False, literal_binds=True 

6910 ) 

6911 for expr in index.expressions 

6912 ), 

6913 ) 

6914 return text 

6915 

6916 def visit_drop_index(self, drop, **kw): 

6917 index = drop.element 

6918 

6919 if index.name is None: 

6920 raise exc.CompileError( 

6921 "DROP INDEX requires that the index have a name" 

6922 ) 

6923 text = "\nDROP INDEX " 

6924 if drop.if_exists: 

6925 text += "IF EXISTS " 

6926 

6927 return text + self._prepared_index_name(index, include_schema=True) 

6928 

6929 def _prepared_index_name( 

6930 self, index: Index, include_schema: bool = False 

6931 ) -> str: 

6932 if index.table is not None: 

6933 effective_schema = self.preparer.schema_for_object(index.table) 

6934 else: 

6935 effective_schema = None 

6936 if include_schema and effective_schema: 

6937 schema_name = self.preparer.quote_schema(effective_schema) 

6938 else: 

6939 schema_name = None 

6940 

6941 index_name: str = self.preparer.format_index(index) 

6942 

6943 if schema_name: 

6944 index_name = schema_name + "." + index_name 

6945 return index_name 

6946 

6947 def visit_add_constraint(self, create, **kw): 

6948 return "ALTER TABLE %s ADD %s" % ( 

6949 self.preparer.format_table(create.element.table), 

6950 self.process(create.element), 

6951 ) 

6952 

6953 def visit_set_table_comment(self, create, **kw): 

6954 return "COMMENT ON TABLE %s IS %s" % ( 

6955 self.preparer.format_table(create.element), 

6956 self.sql_compiler.render_literal_value( 

6957 create.element.comment, sqltypes.String() 

6958 ), 

6959 ) 

6960 

6961 def visit_drop_table_comment(self, drop, **kw): 

6962 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table( 

6963 drop.element 

6964 ) 

6965 

6966 def visit_set_column_comment(self, create, **kw): 

6967 return "COMMENT ON COLUMN %s IS %s" % ( 

6968 self.preparer.format_column( 

6969 create.element, use_table=True, use_schema=True 

6970 ), 

6971 self.sql_compiler.render_literal_value( 

6972 create.element.comment, sqltypes.String() 

6973 ), 

6974 ) 

6975 

6976 def visit_drop_column_comment(self, drop, **kw): 

6977 return "COMMENT ON COLUMN %s IS NULL" % self.preparer.format_column( 

6978 drop.element, use_table=True 

6979 ) 

6980 

6981 def visit_set_constraint_comment(self, create, **kw): 

6982 raise exc.UnsupportedCompilationError(self, type(create)) 

6983 

6984 def visit_drop_constraint_comment(self, drop, **kw): 

6985 raise exc.UnsupportedCompilationError(self, type(drop)) 

6986 

6987 def get_identity_options(self, identity_options: IdentityOptions) -> str: 

6988 text = [] 

6989 if identity_options.increment is not None: 

6990 text.append("INCREMENT BY %d" % identity_options.increment) 

6991 if identity_options.start is not None: 

6992 text.append("START WITH %d" % identity_options.start) 

6993 if identity_options.minvalue is not None: 

6994 text.append("MINVALUE %d" % identity_options.minvalue) 

6995 if identity_options.maxvalue is not None: 

6996 text.append("MAXVALUE %d" % identity_options.maxvalue) 

6997 if identity_options.nominvalue is not None: 

6998 text.append("NO MINVALUE") 

6999 if identity_options.nomaxvalue is not None: 

7000 text.append("NO MAXVALUE") 

7001 if identity_options.cache is not None: 

7002 text.append("CACHE %d" % identity_options.cache) 

7003 if identity_options.cycle is not None: 

7004 text.append("CYCLE" if identity_options.cycle else "NO CYCLE") 

7005 return " ".join(text) 

7006 

7007 def visit_create_sequence(self, create, prefix=None, **kw): 

7008 text = "CREATE SEQUENCE " 

7009 if create.if_not_exists: 

7010 text += "IF NOT EXISTS " 

7011 text += self.preparer.format_sequence(create.element) 

7012 

7013 if prefix: 

7014 text += prefix 

7015 options = self.get_identity_options(create.element) 

7016 if options: 

7017 text += " " + options 

7018 return text 

7019 

7020 def visit_drop_sequence(self, drop, **kw): 

7021 text = "DROP SEQUENCE " 

7022 if drop.if_exists: 

7023 text += "IF EXISTS " 

7024 return text + self.preparer.format_sequence(drop.element) 

7025 

7026 def visit_drop_constraint(self, drop, **kw): 

7027 constraint = drop.element 

7028 if constraint.name is not None: 

7029 formatted_name = self.preparer.format_constraint(constraint) 

7030 else: 

7031 formatted_name = None 

7032 

7033 if formatted_name is None: 

7034 raise exc.CompileError( 

7035 "Can't emit DROP CONSTRAINT for constraint %r; " 

7036 "it has no name" % drop.element 

7037 ) 

7038 return "ALTER TABLE %s DROP CONSTRAINT %s%s%s" % ( 

7039 self.preparer.format_table(drop.element.table), 

7040 "IF EXISTS " if drop.if_exists else "", 

7041 formatted_name, 

7042 " CASCADE" if drop.cascade else "", 

7043 ) 

7044 

7045 def get_column_specification(self, column, **kwargs): 

7046 colspec = ( 

7047 self.preparer.format_column(column) 

7048 + " " 

7049 + self.dialect.type_compiler_instance.process( 

7050 column.type, type_expression=column 

7051 ) 

7052 ) 

7053 default = self.get_column_default_string(column) 

7054 if default is not None: 

7055 colspec += " DEFAULT " + default 

7056 

7057 if column.computed is not None: 

7058 colspec += " " + self.process(column.computed) 

7059 

7060 if ( 

7061 column.identity is not None 

7062 and self.dialect.supports_identity_columns 

7063 ): 

7064 colspec += " " + self.process(column.identity) 

7065 

7066 if not column.nullable and ( 

7067 not column.identity or not self.dialect.supports_identity_columns 

7068 ): 

7069 colspec += " NOT NULL" 

7070 return colspec 

7071 

7072 def create_table_suffix(self, table): 

7073 return "" 

7074 

7075 def post_create_table(self, table): 

7076 return "" 

7077 

7078 def get_column_default_string(self, column: Column[Any]) -> Optional[str]: 

7079 if isinstance(column.server_default, schema.DefaultClause): 

7080 return self.render_default_string(column.server_default.arg) 

7081 else: 

7082 return None 

7083 

7084 def render_default_string(self, default: Union[Visitable, str]) -> str: 

7085 if isinstance(default, str): 

7086 return self.sql_compiler.render_literal_value( 

7087 default, sqltypes.STRINGTYPE 

7088 ) 

7089 else: 

7090 return self.sql_compiler.process(default, literal_binds=True) 

7091 

7092 def visit_table_or_column_check_constraint(self, constraint, **kw): 

7093 if constraint.is_column_level: 

7094 return self.visit_column_check_constraint(constraint) 

7095 else: 

7096 return self.visit_check_constraint(constraint) 

7097 

7098 def visit_check_constraint(self, constraint, **kw): 

7099 text = self.define_constraint_preamble(constraint, **kw) 

7100 text += self.define_check_body(constraint, **kw) 

7101 text += self.define_constraint_deferrability(constraint) 

7102 return text 

7103 

7104 def visit_column_check_constraint(self, constraint, **kw): 

7105 text = self.define_constraint_preamble(constraint, **kw) 

7106 text += self.define_check_body(constraint, **kw) 

7107 text += self.define_constraint_deferrability(constraint) 

7108 return text 

7109 

7110 def visit_primary_key_constraint( 

7111 self, constraint: PrimaryKeyConstraint, **kw: Any 

7112 ) -> str: 

7113 if len(constraint) == 0: 

7114 return "" 

7115 text = self.define_constraint_preamble(constraint, **kw) 

7116 text += self.define_primary_key_body(constraint, **kw) 

7117 text += self.define_constraint_deferrability(constraint) 

7118 return text 

7119 

7120 def visit_foreign_key_constraint( 

7121 self, constraint: ForeignKeyConstraint, **kw: Any 

7122 ) -> str: 

7123 text = self.define_constraint_preamble(constraint, **kw) 

7124 text += self.define_foreign_key_body(constraint, **kw) 

7125 text += self.define_constraint_match(constraint) 

7126 text += self.define_constraint_cascades(constraint) 

7127 text += self.define_constraint_deferrability(constraint) 

7128 return text 

7129 

7130 def define_constraint_remote_table(self, constraint, table, preparer): 

7131 """Format the remote table clause of a CREATE CONSTRAINT clause.""" 

7132 

7133 return preparer.format_table(table) 

7134 

7135 def visit_unique_constraint( 

7136 self, constraint: UniqueConstraint, **kw: Any 

7137 ) -> str: 

7138 if len(constraint) == 0: 

7139 return "" 

7140 text = self.define_constraint_preamble(constraint, **kw) 

7141 text += self.define_unique_body(constraint, **kw) 

7142 text += self.define_constraint_deferrability(constraint) 

7143 return text 

7144 

7145 def define_constraint_preamble( 

7146 self, constraint: Constraint, **kw: Any 

7147 ) -> str: 

7148 text = "" 

7149 if constraint.name is not None: 

7150 formatted_name = self.preparer.format_constraint(constraint) 

7151 if formatted_name is not None: 

7152 text += "CONSTRAINT %s " % formatted_name 

7153 return text 

7154 

7155 def define_primary_key_body( 

7156 self, constraint: PrimaryKeyConstraint, **kw: Any 

7157 ) -> str: 

7158 text = "" 

7159 text += "PRIMARY KEY " 

7160 text += "(%s)" % ", ".join( 

7161 self.preparer.quote(c.name) 

7162 for c in ( 

7163 constraint.columns_autoinc_first 

7164 if constraint._implicit_generated 

7165 else constraint.columns 

7166 ) 

7167 ) 

7168 return text 

7169 

7170 def define_foreign_key_body( 

7171 self, constraint: ForeignKeyConstraint, **kw: Any 

7172 ) -> str: 

7173 preparer = self.preparer 

7174 remote_table = list(constraint.elements)[0].column.table 

7175 text = "FOREIGN KEY(%s) REFERENCES %s (%s)" % ( 

7176 ", ".join( 

7177 preparer.quote(f.parent.name) for f in constraint.elements 

7178 ), 

7179 self.define_constraint_remote_table( 

7180 constraint, remote_table, preparer 

7181 ), 

7182 ", ".join( 

7183 preparer.quote(f.column.name) for f in constraint.elements 

7184 ), 

7185 ) 

7186 return text 

7187 

7188 def define_unique_body( 

7189 self, constraint: UniqueConstraint, **kw: Any 

7190 ) -> str: 

7191 text = "UNIQUE %s(%s)" % ( 

7192 self.define_unique_constraint_distinct(constraint, **kw), 

7193 ", ".join(self.preparer.quote(c.name) for c in constraint), 

7194 ) 

7195 return text 

7196 

7197 def define_check_body(self, constraint: CheckConstraint, **kw: Any) -> str: 

7198 text = "CHECK (%s)" % self.sql_compiler.process( 

7199 constraint.sqltext, include_table=False, literal_binds=True 

7200 ) 

7201 return text 

7202 

7203 def define_unique_constraint_distinct( 

7204 self, constraint: UniqueConstraint, **kw: Any 

7205 ) -> str: 

7206 return "" 

7207 

7208 def define_constraint_cascades( 

7209 self, constraint: ForeignKeyConstraint 

7210 ) -> str: 

7211 text = "" 

7212 if constraint.ondelete is not None: 

7213 text += self.define_constraint_ondelete_cascade(constraint) 

7214 

7215 if constraint.onupdate is not None: 

7216 text += self.define_constraint_onupdate_cascade(constraint) 

7217 return text 

7218 

7219 def define_constraint_ondelete_cascade( 

7220 self, constraint: ForeignKeyConstraint 

7221 ) -> str: 

7222 return " ON DELETE %s" % self.preparer.validate_sql_phrase( 

7223 constraint.ondelete, FK_ON_DELETE 

7224 ) 

7225 

7226 def define_constraint_onupdate_cascade( 

7227 self, constraint: ForeignKeyConstraint 

7228 ) -> str: 

7229 return " ON UPDATE %s" % self.preparer.validate_sql_phrase( 

7230 constraint.onupdate, FK_ON_UPDATE 

7231 ) 

7232 

7233 def define_constraint_deferrability(self, constraint: Constraint) -> str: 

7234 text = "" 

7235 if constraint.deferrable is not None: 

7236 if constraint.deferrable: 

7237 text += " DEFERRABLE" 

7238 else: 

7239 text += " NOT DEFERRABLE" 

7240 if constraint.initially is not None: 

7241 text += " INITIALLY %s" % self.preparer.validate_sql_phrase( 

7242 constraint.initially, FK_INITIALLY 

7243 ) 

7244 return text 

7245 

7246 def define_constraint_match(self, constraint: ForeignKeyConstraint) -> str: 

7247 text = "" 

7248 if constraint.match is not None: 

7249 text += " MATCH %s" % constraint.match 

7250 return text 

7251 

7252 def visit_computed_column(self, generated, **kw): 

7253 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process( 

7254 generated.sqltext, include_table=False, literal_binds=True 

7255 ) 

7256 if generated.persisted is True: 

7257 text += " STORED" 

7258 elif generated.persisted is False: 

7259 text += " VIRTUAL" 

7260 return text 

7261 

7262 def visit_identity_column(self, identity, **kw): 

7263 text = "GENERATED %s AS IDENTITY" % ( 

7264 "ALWAYS" if identity.always else "BY DEFAULT", 

7265 ) 

7266 options = self.get_identity_options(identity) 

7267 if options: 

7268 text += " (%s)" % options 

7269 return text 

7270 

7271 

7272class GenericTypeCompiler(TypeCompiler): 

7273 def visit_FLOAT(self, type_: sqltypes.Float[Any], **kw: Any) -> str: 

7274 return "FLOAT" 

7275 

7276 def visit_DOUBLE(self, type_: sqltypes.Double[Any], **kw: Any) -> str: 

7277 return "DOUBLE" 

7278 

7279 def visit_DOUBLE_PRECISION( 

7280 self, type_: sqltypes.DOUBLE_PRECISION[Any], **kw: Any 

7281 ) -> str: 

7282 return "DOUBLE PRECISION" 

7283 

7284 def visit_REAL(self, type_: sqltypes.REAL[Any], **kw: Any) -> str: 

7285 return "REAL" 

7286 

7287 def visit_NUMERIC(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str: 

7288 if type_.precision is None: 

7289 return "NUMERIC" 

7290 elif type_.scale is None: 

7291 return "NUMERIC(%(precision)s)" % {"precision": type_.precision} 

7292 else: 

7293 return "NUMERIC(%(precision)s, %(scale)s)" % { 

7294 "precision": type_.precision, 

7295 "scale": type_.scale, 

7296 } 

7297 

7298 def visit_DECIMAL(self, type_: sqltypes.DECIMAL[Any], **kw: Any) -> str: 

7299 if type_.precision is None: 

7300 return "DECIMAL" 

7301 elif type_.scale is None: 

7302 return "DECIMAL(%(precision)s)" % {"precision": type_.precision} 

7303 else: 

7304 return "DECIMAL(%(precision)s, %(scale)s)" % { 

7305 "precision": type_.precision, 

7306 "scale": type_.scale, 

7307 } 

7308 

7309 def visit_INTEGER(self, type_: sqltypes.Integer, **kw: Any) -> str: 

7310 return "INTEGER" 

7311 

7312 def visit_SMALLINT(self, type_: sqltypes.SmallInteger, **kw: Any) -> str: 

7313 return "SMALLINT" 

7314 

7315 def visit_BIGINT(self, type_: sqltypes.BigInteger, **kw: Any) -> str: 

7316 return "BIGINT" 

7317 

7318 def visit_TIMESTAMP(self, type_: sqltypes.TIMESTAMP, **kw: Any) -> str: 

7319 return "TIMESTAMP" 

7320 

7321 def visit_DATETIME(self, type_: sqltypes.DateTime, **kw: Any) -> str: 

7322 return "DATETIME" 

7323 

7324 def visit_DATE(self, type_: sqltypes.Date, **kw: Any) -> str: 

7325 return "DATE" 

7326 

7327 def visit_TIME(self, type_: sqltypes.Time, **kw: Any) -> str: 

7328 return "TIME" 

7329 

7330 def visit_CLOB(self, type_: sqltypes.CLOB, **kw: Any) -> str: 

7331 return "CLOB" 

7332 

7333 def visit_NCLOB(self, type_: sqltypes.Text, **kw: Any) -> str: 

7334 return "NCLOB" 

7335 

7336 def _render_string_type( 

7337 self, name: str, length: Optional[int], collation: Optional[str] 

7338 ) -> str: 

7339 text = name 

7340 if length: 

7341 text += f"({length})" 

7342 if collation: 

7343 text += f' COLLATE "{collation}"' 

7344 return text 

7345 

7346 def visit_CHAR(self, type_: sqltypes.CHAR, **kw: Any) -> str: 

7347 return self._render_string_type("CHAR", type_.length, type_.collation) 

7348 

7349 def visit_NCHAR(self, type_: sqltypes.NCHAR, **kw: Any) -> str: 

7350 return self._render_string_type("NCHAR", type_.length, type_.collation) 

7351 

7352 def visit_VARCHAR(self, type_: sqltypes.String, **kw: Any) -> str: 

7353 return self._render_string_type( 

7354 "VARCHAR", type_.length, type_.collation 

7355 ) 

7356 

7357 def visit_NVARCHAR(self, type_: sqltypes.NVARCHAR, **kw: Any) -> str: 

7358 return self._render_string_type( 

7359 "NVARCHAR", type_.length, type_.collation 

7360 ) 

7361 

7362 def visit_TEXT(self, type_: sqltypes.Text, **kw: Any) -> str: 

7363 return self._render_string_type("TEXT", type_.length, type_.collation) 

7364 

7365 def visit_UUID(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str: 

7366 return "UUID" 

7367 

7368 def visit_BLOB(self, type_: sqltypes.LargeBinary, **kw: Any) -> str: 

7369 return "BLOB" 

7370 

7371 def visit_BINARY(self, type_: sqltypes.BINARY, **kw: Any) -> str: 

7372 return "BINARY" + (type_.length and "(%d)" % type_.length or "") 

7373 

7374 def visit_VARBINARY(self, type_: sqltypes.VARBINARY, **kw: Any) -> str: 

7375 return "VARBINARY" + (type_.length and "(%d)" % type_.length or "") 

7376 

7377 def visit_BOOLEAN(self, type_: sqltypes.Boolean, **kw: Any) -> str: 

7378 return "BOOLEAN" 

7379 

7380 def visit_uuid(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str: 

7381 if not type_.native_uuid or not self.dialect.supports_native_uuid: 

7382 return self._render_string_type("CHAR", length=32, collation=None) 

7383 else: 

7384 return self.visit_UUID(type_, **kw) 

7385 

7386 def visit_large_binary( 

7387 self, type_: sqltypes.LargeBinary, **kw: Any 

7388 ) -> str: 

7389 return self.visit_BLOB(type_, **kw) 

7390 

7391 def visit_boolean(self, type_: sqltypes.Boolean, **kw: Any) -> str: 

7392 return self.visit_BOOLEAN(type_, **kw) 

7393 

7394 def visit_time(self, type_: sqltypes.Time, **kw: Any) -> str: 

7395 return self.visit_TIME(type_, **kw) 

7396 

7397 def visit_datetime(self, type_: sqltypes.DateTime, **kw: Any) -> str: 

7398 return self.visit_DATETIME(type_, **kw) 

7399 

7400 def visit_date(self, type_: sqltypes.Date, **kw: Any) -> str: 

7401 return self.visit_DATE(type_, **kw) 

7402 

7403 def visit_big_integer(self, type_: sqltypes.BigInteger, **kw: Any) -> str: 

7404 return self.visit_BIGINT(type_, **kw) 

7405 

7406 def visit_small_integer( 

7407 self, type_: sqltypes.SmallInteger, **kw: Any 

7408 ) -> str: 

7409 return self.visit_SMALLINT(type_, **kw) 

7410 

7411 def visit_integer(self, type_: sqltypes.Integer, **kw: Any) -> str: 

7412 return self.visit_INTEGER(type_, **kw) 

7413 

7414 def visit_real(self, type_: sqltypes.REAL[Any], **kw: Any) -> str: 

7415 return self.visit_REAL(type_, **kw) 

7416 

7417 def visit_float(self, type_: sqltypes.Float[Any], **kw: Any) -> str: 

7418 return self.visit_FLOAT(type_, **kw) 

7419 

7420 def visit_double(self, type_: sqltypes.Double[Any], **kw: Any) -> str: 

7421 return self.visit_DOUBLE(type_, **kw) 

7422 

7423 def visit_numeric(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str: 

7424 return self.visit_NUMERIC(type_, **kw) 

7425 

7426 def visit_string(self, type_: sqltypes.String, **kw: Any) -> str: 

7427 return self.visit_VARCHAR(type_, **kw) 

7428 

7429 def visit_unicode(self, type_: sqltypes.Unicode, **kw: Any) -> str: 

7430 return self.visit_VARCHAR(type_, **kw) 

7431 

7432 def visit_text(self, type_: sqltypes.Text, **kw: Any) -> str: 

7433 return self.visit_TEXT(type_, **kw) 

7434 

7435 def visit_unicode_text( 

7436 self, type_: sqltypes.UnicodeText, **kw: Any 

7437 ) -> str: 

7438 return self.visit_TEXT(type_, **kw) 

7439 

7440 def visit_enum(self, type_: sqltypes.Enum, **kw: Any) -> str: 

7441 return self.visit_VARCHAR(type_, **kw) 

7442 

7443 def visit_null(self, type_, **kw): 

7444 raise exc.CompileError( 

7445 "Can't generate DDL for %r; " 

7446 "did you forget to specify a " 

7447 "type on this Column?" % type_ 

7448 ) 

7449 

7450 def visit_type_decorator( 

7451 self, type_: TypeDecorator[Any], **kw: Any 

7452 ) -> str: 

7453 return self.process(type_.type_engine(self.dialect), **kw) 

7454 

7455 def visit_user_defined( 

7456 self, type_: UserDefinedType[Any], **kw: Any 

7457 ) -> str: 

7458 return type_.get_col_spec(**kw) 

7459 

7460 

7461class StrSQLTypeCompiler(GenericTypeCompiler): 

7462 def process(self, type_, **kw): 

7463 try: 

7464 _compiler_dispatch = type_._compiler_dispatch 

7465 except AttributeError: 

7466 return self._visit_unknown(type_, **kw) 

7467 else: 

7468 return _compiler_dispatch(self, **kw) 

7469 

7470 def __getattr__(self, key): 

7471 if key.startswith("visit_"): 

7472 return self._visit_unknown 

7473 else: 

7474 raise AttributeError(key) 

7475 

7476 def _visit_unknown(self, type_, **kw): 

7477 if type_.__class__.__name__ == type_.__class__.__name__.upper(): 

7478 return type_.__class__.__name__ 

7479 else: 

7480 return repr(type_) 

7481 

7482 def visit_null(self, type_, **kw): 

7483 return "NULL" 

7484 

7485 def visit_user_defined(self, type_, **kw): 

7486 try: 

7487 get_col_spec = type_.get_col_spec 

7488 except AttributeError: 

7489 return repr(type_) 

7490 else: 

7491 return get_col_spec(**kw) 

7492 

7493 

7494class _SchemaForObjectCallable(Protocol): 

7495 def __call__(self, __obj: Any) -> str: ... 

7496 

7497 

7498class _BindNameForColProtocol(Protocol): 

7499 def __call__(self, col: ColumnClause[Any]) -> str: ... 

7500 

7501 

7502class IdentifierPreparer: 

7503 """Handle quoting and case-folding of identifiers based on options.""" 

7504 

7505 reserved_words = RESERVED_WORDS 

7506 

7507 legal_characters = LEGAL_CHARACTERS 

7508 

7509 illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS 

7510 

7511 initial_quote: str 

7512 

7513 final_quote: str 

7514 

7515 _strings: MutableMapping[str, str] 

7516 

7517 schema_for_object: _SchemaForObjectCallable = operator.attrgetter("schema") 

7518 """Return the .schema attribute for an object. 

7519 

7520 For the default IdentifierPreparer, the schema for an object is always 

7521 the value of the ".schema" attribute. if the preparer is replaced 

7522 with one that has a non-empty schema_translate_map, the value of the 

7523 ".schema" attribute is rendered a symbol that will be converted to a 

7524 real schema name from the mapping post-compile. 

7525 

7526 """ 

7527 

7528 _includes_none_schema_translate: bool = False 

7529 

7530 def __init__( 

7531 self, 

7532 dialect: Dialect, 

7533 initial_quote: str = '"', 

7534 final_quote: Optional[str] = None, 

7535 escape_quote: str = '"', 

7536 quote_case_sensitive_collations: bool = True, 

7537 omit_schema: bool = False, 

7538 ): 

7539 """Construct a new ``IdentifierPreparer`` object. 

7540 

7541 initial_quote 

7542 Character that begins a delimited identifier. 

7543 

7544 final_quote 

7545 Character that ends a delimited identifier. Defaults to 

7546 `initial_quote`. 

7547 

7548 omit_schema 

7549 Prevent prepending schema name. Useful for databases that do 

7550 not support schemae. 

7551 """ 

7552 

7553 self.dialect = dialect 

7554 self.initial_quote = initial_quote 

7555 self.final_quote = final_quote or self.initial_quote 

7556 self.escape_quote = escape_quote 

7557 self.escape_to_quote = self.escape_quote * 2 

7558 self.omit_schema = omit_schema 

7559 self.quote_case_sensitive_collations = quote_case_sensitive_collations 

7560 self._strings = {} 

7561 self._double_percents = self.dialect.paramstyle in ( 

7562 "format", 

7563 "pyformat", 

7564 ) 

7565 

7566 def _with_schema_translate(self, schema_translate_map): 

7567 prep = self.__class__.__new__(self.__class__) 

7568 prep.__dict__.update(self.__dict__) 

7569 

7570 includes_none = None in schema_translate_map 

7571 

7572 def symbol_getter(obj): 

7573 name = obj.schema 

7574 if obj._use_schema_map and (name is not None or includes_none): 

7575 if name is not None and ("[" in name or "]" in name): 

7576 raise exc.CompileError( 

7577 "Square bracket characters ([]) not supported " 

7578 "in schema translate name '%s'" % name 

7579 ) 

7580 return quoted_name( 

7581 "__[SCHEMA_%s]" % (name or "_none"), quote=False 

7582 ) 

7583 else: 

7584 return obj.schema 

7585 

7586 prep.schema_for_object = symbol_getter 

7587 prep._includes_none_schema_translate = includes_none 

7588 return prep 

7589 

7590 def _render_schema_translates( 

7591 self, statement: str, schema_translate_map: SchemaTranslateMapType 

7592 ) -> str: 

7593 d = schema_translate_map 

7594 if None in d: 

7595 if not self._includes_none_schema_translate: 

7596 raise exc.InvalidRequestError( 

7597 "schema translate map which previously did not have " 

7598 "`None` present as a key now has `None` present; compiled " 

7599 "statement may lack adequate placeholders. Please use " 

7600 "consistent keys in successive " 

7601 "schema_translate_map dictionaries." 

7602 ) 

7603 

7604 d["_none"] = d[None] # type: ignore[index] 

7605 

7606 def replace(m): 

7607 name = m.group(2) 

7608 if name in d: 

7609 effective_schema = d[name] 

7610 else: 

7611 if name in (None, "_none"): 

7612 raise exc.InvalidRequestError( 

7613 "schema translate map which previously had `None` " 

7614 "present as a key now no longer has it present; don't " 

7615 "know how to apply schema for compiled statement. " 

7616 "Please use consistent keys in successive " 

7617 "schema_translate_map dictionaries." 

7618 ) 

7619 effective_schema = name 

7620 

7621 if not effective_schema: 

7622 effective_schema = self.dialect.default_schema_name 

7623 if not effective_schema: 

7624 # TODO: no coverage here 

7625 raise exc.CompileError( 

7626 "Dialect has no default schema name; can't " 

7627 "use None as dynamic schema target." 

7628 ) 

7629 return self.quote_schema(effective_schema) 

7630 

7631 return re.sub(r"(__\[SCHEMA_([^\]]+)\])", replace, statement) 

7632 

7633 def _escape_identifier(self, value: str) -> str: 

7634 """Escape an identifier. 

7635 

7636 Subclasses should override this to provide database-dependent 

7637 escaping behavior. 

7638 """ 

7639 

7640 value = value.replace(self.escape_quote, self.escape_to_quote) 

7641 if self._double_percents: 

7642 value = value.replace("%", "%%") 

7643 return value 

7644 

7645 def _unescape_identifier(self, value: str) -> str: 

7646 """Canonicalize an escaped identifier. 

7647 

7648 Subclasses should override this to provide database-dependent 

7649 unescaping behavior that reverses _escape_identifier. 

7650 """ 

7651 

7652 return value.replace(self.escape_to_quote, self.escape_quote) 

7653 

7654 def validate_sql_phrase(self, element, reg): 

7655 """keyword sequence filter. 

7656 

7657 a filter for elements that are intended to represent keyword sequences, 

7658 such as "INITIALLY", "INITIALLY DEFERRED", etc. no special characters 

7659 should be present. 

7660 

7661 .. versionadded:: 1.3 

7662 

7663 """ 

7664 

7665 if element is not None and not reg.match(element): 

7666 raise exc.CompileError( 

7667 "Unexpected SQL phrase: %r (matching against %r)" 

7668 % (element, reg.pattern) 

7669 ) 

7670 return element 

7671 

7672 def quote_identifier(self, value: str) -> str: 

7673 """Quote an identifier. 

7674 

7675 Subclasses should override this to provide database-dependent 

7676 quoting behavior. 

7677 """ 

7678 

7679 return ( 

7680 self.initial_quote 

7681 + self._escape_identifier(value) 

7682 + self.final_quote 

7683 ) 

7684 

7685 def _requires_quotes(self, value: str) -> bool: 

7686 """Return True if the given identifier requires quoting.""" 

7687 lc_value = value.lower() 

7688 return ( 

7689 lc_value in self.reserved_words 

7690 or value[0] in self.illegal_initial_characters 

7691 or not self.legal_characters.match(str(value)) 

7692 or (lc_value != value) 

7693 ) 

7694 

7695 def _requires_quotes_illegal_chars(self, value): 

7696 """Return True if the given identifier requires quoting, but 

7697 not taking case convention into account.""" 

7698 return not self.legal_characters.match(str(value)) 

7699 

7700 def quote_schema(self, schema: str, force: Any = None) -> str: 

7701 """Conditionally quote a schema name. 

7702 

7703 

7704 The name is quoted if it is a reserved word, contains quote-necessary 

7705 characters, or is an instance of :class:`.quoted_name` which includes 

7706 ``quote`` set to ``True``. 

7707 

7708 Subclasses can override this to provide database-dependent 

7709 quoting behavior for schema names. 

7710 

7711 :param schema: string schema name 

7712 :param force: unused 

7713 

7714 .. deprecated:: 0.9 

7715 

7716 The :paramref:`.IdentifierPreparer.quote_schema.force` 

7717 parameter is deprecated and will be removed in a future 

7718 release. This flag has no effect on the behavior of the 

7719 :meth:`.IdentifierPreparer.quote` method; please refer to 

7720 :class:`.quoted_name`. 

7721 

7722 """ 

7723 if force is not None: 

7724 # not using the util.deprecated_params() decorator in this 

7725 # case because of the additional function call overhead on this 

7726 # very performance-critical spot. 

7727 util.warn_deprecated( 

7728 "The IdentifierPreparer.quote_schema.force parameter is " 

7729 "deprecated and will be removed in a future release. This " 

7730 "flag has no effect on the behavior of the " 

7731 "IdentifierPreparer.quote method; please refer to " 

7732 "quoted_name().", 

7733 # deprecated 0.9. warning from 1.3 

7734 version="0.9", 

7735 ) 

7736 

7737 return self.quote(schema) 

7738 

7739 def quote(self, ident: str, force: Any = None) -> str: 

7740 """Conditionally quote an identifier. 

7741 

7742 The identifier is quoted if it is a reserved word, contains 

7743 quote-necessary characters, or is an instance of 

7744 :class:`.quoted_name` which includes ``quote`` set to ``True``. 

7745 

7746 Subclasses can override this to provide database-dependent 

7747 quoting behavior for identifier names. 

7748 

7749 :param ident: string identifier 

7750 :param force: unused 

7751 

7752 .. deprecated:: 0.9 

7753 

7754 The :paramref:`.IdentifierPreparer.quote.force` 

7755 parameter is deprecated and will be removed in a future 

7756 release. This flag has no effect on the behavior of the 

7757 :meth:`.IdentifierPreparer.quote` method; please refer to 

7758 :class:`.quoted_name`. 

7759 

7760 """ 

7761 if force is not None: 

7762 # not using the util.deprecated_params() decorator in this 

7763 # case because of the additional function call overhead on this 

7764 # very performance-critical spot. 

7765 util.warn_deprecated( 

7766 "The IdentifierPreparer.quote.force parameter is " 

7767 "deprecated and will be removed in a future release. This " 

7768 "flag has no effect on the behavior of the " 

7769 "IdentifierPreparer.quote method; please refer to " 

7770 "quoted_name().", 

7771 # deprecated 0.9. warning from 1.3 

7772 version="0.9", 

7773 ) 

7774 

7775 force = getattr(ident, "quote", None) 

7776 

7777 if force is None: 

7778 if ident in self._strings: 

7779 return self._strings[ident] 

7780 else: 

7781 if self._requires_quotes(ident): 

7782 self._strings[ident] = self.quote_identifier(ident) 

7783 else: 

7784 self._strings[ident] = ident 

7785 return self._strings[ident] 

7786 elif force: 

7787 return self.quote_identifier(ident) 

7788 else: 

7789 return ident 

7790 

7791 def format_collation(self, collation_name): 

7792 if self.quote_case_sensitive_collations: 

7793 return self.quote(collation_name) 

7794 else: 

7795 return collation_name 

7796 

7797 def format_sequence( 

7798 self, sequence: schema.Sequence, use_schema: bool = True 

7799 ) -> str: 

7800 name = self.quote(sequence.name) 

7801 

7802 effective_schema = self.schema_for_object(sequence) 

7803 

7804 if ( 

7805 not self.omit_schema 

7806 and use_schema 

7807 and effective_schema is not None 

7808 ): 

7809 name = self.quote_schema(effective_schema) + "." + name 

7810 return name 

7811 

7812 def format_label( 

7813 self, label: Label[Any], name: Optional[str] = None 

7814 ) -> str: 

7815 return self.quote(name or label.name) 

7816 

7817 def format_alias( 

7818 self, alias: Optional[AliasedReturnsRows], name: Optional[str] = None 

7819 ) -> str: 

7820 if name is None: 

7821 assert alias is not None 

7822 return self.quote(alias.name) 

7823 else: 

7824 return self.quote(name) 

7825 

7826 def format_savepoint(self, savepoint, name=None): 

7827 # Running the savepoint name through quoting is unnecessary 

7828 # for all known dialects. This is here to support potential 

7829 # third party use cases 

7830 ident = name or savepoint.ident 

7831 if self._requires_quotes(ident): 

7832 ident = self.quote_identifier(ident) 

7833 return ident 

7834 

7835 @util.preload_module("sqlalchemy.sql.naming") 

7836 def format_constraint( 

7837 self, constraint: Union[Constraint, Index], _alembic_quote: bool = True 

7838 ) -> Optional[str]: 

7839 naming = util.preloaded.sql_naming 

7840 

7841 if constraint.name is _NONE_NAME: 

7842 name = naming._constraint_name_for_table( 

7843 constraint, constraint.table 

7844 ) 

7845 

7846 if name is None: 

7847 return None 

7848 else: 

7849 name = constraint.name 

7850 

7851 assert name is not None 

7852 if constraint.__visit_name__ == "index": 

7853 return self.truncate_and_render_index_name( 

7854 name, _alembic_quote=_alembic_quote 

7855 ) 

7856 else: 

7857 return self.truncate_and_render_constraint_name( 

7858 name, _alembic_quote=_alembic_quote 

7859 ) 

7860 

7861 def truncate_and_render_index_name( 

7862 self, name: str, _alembic_quote: bool = True 

7863 ) -> str: 

7864 # calculate these at format time so that ad-hoc changes 

7865 # to dialect.max_identifier_length etc. can be reflected 

7866 # as IdentifierPreparer is long lived 

7867 max_ = ( 

7868 self.dialect.max_index_name_length 

7869 or self.dialect.max_identifier_length 

7870 ) 

7871 return self._truncate_and_render_maxlen_name( 

7872 name, max_, _alembic_quote 

7873 ) 

7874 

7875 def truncate_and_render_constraint_name( 

7876 self, name: str, _alembic_quote: bool = True 

7877 ) -> str: 

7878 # calculate these at format time so that ad-hoc changes 

7879 # to dialect.max_identifier_length etc. can be reflected 

7880 # as IdentifierPreparer is long lived 

7881 max_ = ( 

7882 self.dialect.max_constraint_name_length 

7883 or self.dialect.max_identifier_length 

7884 ) 

7885 return self._truncate_and_render_maxlen_name( 

7886 name, max_, _alembic_quote 

7887 ) 

7888 

7889 def _truncate_and_render_maxlen_name( 

7890 self, name: str, max_: int, _alembic_quote: bool 

7891 ) -> str: 

7892 if isinstance(name, elements._truncated_label): 

7893 if len(name) > max_: 

7894 name = name[0 : max_ - 8] + "_" + util.md5_hex(name)[-4:] 

7895 else: 

7896 self.dialect.validate_identifier(name) 

7897 

7898 if not _alembic_quote: 

7899 return name 

7900 else: 

7901 return self.quote(name) 

7902 

7903 def format_index(self, index: Index) -> str: 

7904 name = self.format_constraint(index) 

7905 assert name is not None 

7906 return name 

7907 

7908 def format_table( 

7909 self, 

7910 table: FromClause, 

7911 use_schema: bool = True, 

7912 name: Optional[str] = None, 

7913 ) -> str: 

7914 """Prepare a quoted table and schema name.""" 

7915 if name is None: 

7916 if TYPE_CHECKING: 

7917 assert isinstance(table, NamedFromClause) 

7918 name = table.name 

7919 

7920 result = self.quote(name) 

7921 

7922 effective_schema = self.schema_for_object(table) 

7923 

7924 if not self.omit_schema and use_schema and effective_schema: 

7925 result = self.quote_schema(effective_schema) + "." + result 

7926 return result 

7927 

7928 def format_schema(self, name): 

7929 """Prepare a quoted schema name.""" 

7930 

7931 return self.quote(name) 

7932 

7933 def format_label_name( 

7934 self, 

7935 name, 

7936 anon_map=None, 

7937 ): 

7938 """Prepare a quoted column name.""" 

7939 

7940 if anon_map is not None and isinstance( 

7941 name, elements._truncated_label 

7942 ): 

7943 name = name.apply_map(anon_map) 

7944 

7945 return self.quote(name) 

7946 

7947 def format_column( 

7948 self, 

7949 column: ColumnElement[Any], 

7950 use_table: bool = False, 

7951 name: Optional[str] = None, 

7952 table_name: Optional[str] = None, 

7953 use_schema: bool = False, 

7954 anon_map: Optional[Mapping[str, Any]] = None, 

7955 ) -> str: 

7956 """Prepare a quoted column name.""" 

7957 

7958 if name is None: 

7959 name = column.name 

7960 assert name is not None 

7961 

7962 if anon_map is not None and isinstance( 

7963 name, elements._truncated_label 

7964 ): 

7965 name = name.apply_map(anon_map) 

7966 

7967 if not getattr(column, "is_literal", False): 

7968 if use_table: 

7969 return ( 

7970 self.format_table( 

7971 column.table, use_schema=use_schema, name=table_name 

7972 ) 

7973 + "." 

7974 + self.quote(name) 

7975 ) 

7976 else: 

7977 return self.quote(name) 

7978 else: 

7979 # literal textual elements get stuck into ColumnClause a lot, 

7980 # which shouldn't get quoted 

7981 

7982 if use_table: 

7983 return ( 

7984 self.format_table( 

7985 column.table, use_schema=use_schema, name=table_name 

7986 ) 

7987 + "." 

7988 + name 

7989 ) 

7990 else: 

7991 return name 

7992 

7993 def format_table_seq(self, table, use_schema=True): 

7994 """Format table name and schema as a tuple.""" 

7995 

7996 # Dialects with more levels in their fully qualified references 

7997 # ('database', 'owner', etc.) could override this and return 

7998 # a longer sequence. 

7999 

8000 effective_schema = self.schema_for_object(table) 

8001 

8002 if not self.omit_schema and use_schema and effective_schema: 

8003 return ( 

8004 self.quote_schema(effective_schema), 

8005 self.format_table(table, use_schema=False), 

8006 ) 

8007 else: 

8008 return (self.format_table(table, use_schema=False),) 

8009 

8010 @util.memoized_property 

8011 def _r_identifiers(self): 

8012 initial, final, escaped_final = ( 

8013 re.escape(s) 

8014 for s in ( 

8015 self.initial_quote, 

8016 self.final_quote, 

8017 self._escape_identifier(self.final_quote), 

8018 ) 

8019 ) 

8020 r = re.compile( 

8021 r"(?:" 

8022 r"(?:%(initial)s((?:%(escaped)s|[^%(final)s])+)%(final)s" 

8023 r"|([^\.]+))(?=\.|$))+" 

8024 % {"initial": initial, "final": final, "escaped": escaped_final} 

8025 ) 

8026 return r 

8027 

8028 def unformat_identifiers(self, identifiers: str) -> Sequence[str]: 

8029 """Unpack 'schema.table.column'-like strings into components.""" 

8030 

8031 r = self._r_identifiers 

8032 return [ 

8033 self._unescape_identifier(i) 

8034 for i in [a or b for a, b in r.findall(identifiers)] 

8035 ]