Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlparse/sql.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

340 statements  

1# 

2# Copyright (C) 2009-2020 the sqlparse authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of python-sqlparse and is released under 

6# the BSD License: https://opensource.org/licenses/BSD-3-Clause 

7 

8"""This module contains classes representing syntactical elements of SQL.""" 

9 

10import re 

11 

12from sqlparse import tokens as T 

13from sqlparse.utils import imt, remove_quotes 

14 

15 

16class NameAliasMixin: 

17 """Implements get_real_name and get_alias.""" 

18 

19 def get_real_name(self): 

20 """Returns the real name (object name) of this identifier.""" 

21 # a.b 

22 dot_idx, _ = self.token_next_by(m=(T.Punctuation, '.')) 

23 return self._get_first_name(dot_idx, real_name=True) 

24 

25 def get_alias(self): 

26 """Returns the alias for this identifier or ``None``.""" 

27 

28 # "name AS alias" 

29 kw_idx, kw = self.token_next_by(m=(T.Keyword, 'AS')) 

30 if kw is not None: 

31 return self._get_first_name(kw_idx + 1, keywords=True) 

32 

33 # "name alias" or "complicated column expression alias" 

34 _, ws = self.token_next_by(t=T.Whitespace) 

35 if len(self.tokens) > 2 and ws is not None: 

36 return self._get_first_name(reverse=True) 

37 

38 

39class Token: 

40 """Base class for all other classes in this module. 

41 

42 It represents a single token and has two instance attributes: 

43 ``value`` is the unchanged value of the token and ``ttype`` is 

44 the type of the token. 

45 """ 

46 

47 __slots__ = ( 

48 'is_group', 

49 'is_keyword', 

50 'is_newline', 

51 'is_whitespace', 

52 'normalized', 

53 'parent', 

54 'ttype', 

55 'value', 

56 ) 

57 

58 def __init__(self, ttype, value): 

59 value = str(value) 

60 self.value = value 

61 self.ttype = ttype 

62 self.parent = None 

63 self.is_group = False 

64 self.is_keyword = ttype in T.Keyword 

65 self.is_whitespace = self.ttype in T.Whitespace 

66 self.is_newline = self.ttype in T.Newline 

67 self.normalized = value.upper() if self.is_keyword else value 

68 

69 def __str__(self): 

70 return self.value 

71 

72 # Pending tokenlist __len__ bug fix 

73 # def __len__(self): 

74 # return len(self.value) 

75 

76 def __repr__(self): 

77 cls = self._get_repr_name() 

78 value = self._get_repr_value() 

79 

80 q = '"' if value.startswith("'") and value.endswith("'") else "'" 

81 return "<{cls} {q}{value}{q} at 0x{id:2X}>".format( 

82 id=id(self), **locals()) 

83 

84 def _get_repr_name(self): 

85 return str(self.ttype).split('.')[-1] 

86 

87 def _get_repr_value(self): 

88 raw = str(self) 

89 if len(raw) > 7: 

90 raw = raw[:6] + '...' 

91 return re.sub(r'\s+', ' ', raw) 

92 

93 def flatten(self): 

94 """Resolve subgroups.""" 

95 yield self 

96 

97 def match(self, ttype, values, regex=False): 

98 """Checks whether the token matches the given arguments. 

99 

100 *ttype* is a token type as defined in `sqlparse.tokens`. If it does 

101 not match, ``False`` is returned. 

102 *values* is a list of possible values for this token. For match to be 

103 considered valid, the token value needs to be in this list. For tokens 

104 of type ``Keyword`` the comparison is case-insensitive. For 

105 convenience, a single value can be given passed as a string. 

106 If *regex* is ``True``, the given values are treated as regular 

107 expressions. Partial matches are allowed. Defaults to ``False``. 

108 """ 

109 type_matched = self.ttype is ttype 

110 if not type_matched or values is None: 

111 return type_matched 

112 

113 if isinstance(values, str): 

114 values = (values,) 

115 

116 if regex: 

117 # TODO: Add test for regex with is_keyword = false 

118 flag = re.IGNORECASE if self.is_keyword else 0 

119 values = (re.compile(v, flag) for v in values) 

120 

121 return any(pattern.search(self.normalized) for pattern in values) 

122 

123 if self.is_keyword: 

124 values = (v.upper() for v in values) 

125 

126 return self.normalized in values 

127 

128 def within(self, group_cls): 

129 """Returns ``True`` if this token is within *group_cls*. 

130 

131 Use this method for example to check if an identifier is within 

132 a function: ``t.within(sql.Function)``. 

133 """ 

134 parent = self.parent 

135 while parent: 

136 if isinstance(parent, group_cls): 

137 return True 

138 parent = parent.parent 

139 return False 

140 

141 def is_child_of(self, other): 

142 """Returns ``True`` if this token is a direct child of *other*.""" 

143 return self.parent == other 

144 

145 def has_ancestor(self, other): 

146 """Returns ``True`` if *other* is in this tokens ancestry.""" 

147 parent = self.parent 

148 while parent: 

149 if parent == other: 

150 return True 

151 parent = parent.parent 

152 return False 

153 

154 

155class TokenList(Token): 

156 """A group of tokens. 

157 

158 It has an additional instance attribute ``tokens`` which holds a 

159 list of child-tokens. 

160 """ 

161 

162 __slots__ = 'tokens' 

163 

164 def __init__(self, tokens=None): 

165 self.tokens = tokens or [] 

166 [setattr(token, 'parent', self) for token in self.tokens] 

167 super().__init__(None, ''.join(token.value for token in self.tokens)) 

168 self.is_group = True 

169 

170 def __str__(self): 

171 return ''.join(token.value for token in self.flatten()) 

172 

173 # weird bug 

174 # def __len__(self): 

175 # return len(self.tokens) 

176 

177 def __iter__(self): 

178 return iter(self.tokens) 

179 

180 def __getitem__(self, item): 

181 return self.tokens[item] 

182 

183 def _get_repr_name(self): 

184 return type(self).__name__ 

185 

186 def _pprint_tree(self, max_depth=None, depth=0, f=None, _pre=''): 

187 """Pretty-print the object tree.""" 

188 token_count = len(self.tokens) 

189 for idx, token in enumerate(self.tokens): 

190 cls = token._get_repr_name() 

191 value = token._get_repr_value() 

192 

193 last = idx == (token_count - 1) 

194 pre = '`- ' if last else '|- ' 

195 

196 q = '"' if value.startswith("'") and value.endswith("'") else "'" 

197 print(f"{_pre}{pre}{idx} {cls} {q}{value}{q}", file=f) 

198 

199 if token.is_group and (max_depth is None or depth < max_depth): 

200 parent_pre = ' ' if last else '| ' 

201 token._pprint_tree(max_depth, depth + 1, f, _pre + parent_pre) 

202 

203 def get_token_at_offset(self, offset): 

204 """Returns the token that is on position offset.""" 

205 idx = 0 

206 for token in self.flatten(): 

207 end = idx + len(token.value) 

208 if idx <= offset < end: 

209 return token 

210 idx = end 

211 

212 def flatten(self): 

213 """Generator yielding ungrouped tokens. 

214 

215 This method is recursively called for all child tokens. 

216 """ 

217 for token in self.tokens: 

218 if token.is_group: 

219 yield from token.flatten() 

220 else: 

221 yield token 

222 

223 def get_sublists(self): 

224 for token in self.tokens: 

225 if token.is_group: 

226 yield token 

227 

228 @property 

229 def _groupable_tokens(self): 

230 return self.tokens 

231 

232 def _token_matching(self, funcs, start=0, end=None, reverse=False): 

233 """next token that match functions""" 

234 if start is None: 

235 return None 

236 

237 if not isinstance(funcs, (list, tuple)): 

238 funcs = (funcs,) 

239 

240 if reverse: 

241 assert end is None 

242 indexes = range(start - 2, -1, -1) 

243 else: 

244 if end is None: 

245 end = len(self.tokens) 

246 indexes = range(start, end) 

247 for idx in indexes: 

248 token = self.tokens[idx] 

249 for func in funcs: 

250 if func(token): 

251 return idx, token 

252 return None, None 

253 

254 def token_first(self, skip_ws=True, skip_cm=False): 

255 """Returns the first child token. 

256 

257 If *skip_ws* is ``True`` (the default), whitespace 

258 tokens are ignored. 

259 

260 if *skip_cm* is ``True`` (default: ``False``), comments are 

261 ignored too. 

262 """ 

263 # this on is inconsistent, using Comment instead of T.Comment... 

264 def matcher(tk): 

265 return not ((skip_ws and tk.is_whitespace) 

266 or (skip_cm and imt(tk, t=T.Comment, i=Comment))) 

267 return self._token_matching(matcher)[1] 

268 

269 def token_next_by(self, i=None, m=None, t=None, idx=-1, end=None): 

270 idx += 1 

271 return self._token_matching(lambda tk: imt(tk, i, m, t), idx, end) 

272 

273 def token_not_matching(self, funcs, idx): 

274 funcs = (funcs,) if not isinstance(funcs, (list, tuple)) else funcs 

275 funcs = [lambda tk, func=func: not func(tk) for func in funcs] 

276 return self._token_matching(funcs, idx) 

277 

278 def token_matching(self, funcs, idx): 

279 return self._token_matching(funcs, idx)[1] 

280 

281 def token_prev(self, idx, skip_ws=True, skip_cm=False): 

282 """Returns the previous token relative to *idx*. 

283 

284 If *skip_ws* is ``True`` (the default) whitespace tokens are ignored. 

285 If *skip_cm* is ``True`` comments are ignored. 

286 ``None`` is returned if there's no previous token. 

287 """ 

288 return self.token_next(idx, skip_ws, skip_cm, _reverse=True) 

289 

290 # TODO: May need to re-add default value to idx 

291 def token_next(self, idx, skip_ws=True, skip_cm=False, _reverse=False): 

292 """Returns the next token relative to *idx*. 

293 

294 If *skip_ws* is ``True`` (the default) whitespace tokens are ignored. 

295 If *skip_cm* is ``True`` comments are ignored. 

296 ``None`` is returned if there's no next token. 

297 """ 

298 if idx is None: 

299 return None, None 

300 idx += 1 # alot of code usage current pre-compensates for this 

301 

302 def matcher(tk): 

303 return not ((skip_ws and tk.is_whitespace) 

304 or (skip_cm and imt(tk, t=T.Comment, i=Comment))) 

305 return self._token_matching(matcher, idx, reverse=_reverse) 

306 

307 def token_index(self, token, start=0): 

308 """Return list index of token.""" 

309 start = start if isinstance(start, int) else self.token_index(start) 

310 return start + self.tokens[start:].index(token) 

311 

312 def group_tokens(self, grp_cls, start, end, include_end=True, 

313 extend=False): 

314 """Replace tokens by an instance of *grp_cls*.""" 

315 start_idx = start 

316 start = self.tokens[start_idx] 

317 

318 end_idx = end + include_end 

319 

320 # will be needed later for new group_clauses 

321 # while skip_ws and tokens and tokens[-1].is_whitespace: 

322 # tokens = tokens[:-1] 

323 

324 if extend and isinstance(start, grp_cls): 

325 subtokens = self.tokens[start_idx + 1:end_idx] 

326 

327 grp = start 

328 grp.tokens.extend(subtokens) 

329 del self.tokens[start_idx + 1:end_idx] 

330 grp.value += ''.join(token.value for token in subtokens) 

331 else: 

332 subtokens = self.tokens[start_idx:end_idx] 

333 grp = grp_cls(subtokens) 

334 self.tokens[start_idx:end_idx] = [grp] 

335 grp.parent = self 

336 

337 for token in subtokens: 

338 token.parent = grp 

339 

340 return grp 

341 

342 def insert_before(self, where, token): 

343 """Inserts *token* before *where*.""" 

344 if not isinstance(where, int): 

345 where = self.token_index(where) 

346 token.parent = self 

347 self.tokens.insert(where, token) 

348 

349 def insert_after(self, where, token, skip_ws=True): 

350 """Inserts *token* after *where*.""" 

351 if not isinstance(where, int): 

352 where = self.token_index(where) 

353 nidx, next_ = self.token_next(where, skip_ws=skip_ws) 

354 token.parent = self 

355 if next_ is None: 

356 self.tokens.append(token) 

357 else: 

358 self.tokens.insert(nidx, token) 

359 

360 def has_alias(self): 

361 """Returns ``True`` if an alias is present.""" 

362 return self.get_alias() is not None 

363 

364 def get_alias(self): 

365 """Returns the alias for this identifier or ``None``.""" 

366 return None 

367 

368 def get_name(self): 

369 """Returns the name of this identifier. 

370 

371 This is either it's alias or it's real name. The returned valued can 

372 be considered as the name under which the object corresponding to 

373 this identifier is known within the current statement. 

374 """ 

375 return self.get_alias() or self.get_real_name() 

376 

377 def get_real_name(self): 

378 """Returns the real name (object name) of this identifier.""" 

379 return None 

380 

381 def get_parent_name(self): 

382 """Return name of the parent object if any. 

383 

384 A parent object is identified by the first occurring dot. 

385 """ 

386 dot_idx, _ = self.token_next_by(m=(T.Punctuation, '.')) 

387 _, prev_ = self.token_prev(dot_idx) 

388 return remove_quotes(prev_.value) if prev_ is not None else None 

389 

390 def _get_first_name(self, idx=None, reverse=False, keywords=False, 

391 real_name=False): 

392 """Returns the name of the first token with a name""" 

393 

394 tokens = self.tokens[idx:] if idx else self.tokens 

395 tokens = reversed(tokens) if reverse else tokens 

396 types = [T.Name, T.Wildcard, T.String.Symbol] 

397 

398 if keywords: 

399 types.append(T.Keyword) 

400 

401 for token in tokens: 

402 if token.ttype in types: 

403 return remove_quotes(token.value) 

404 elif isinstance(token, (Identifier, Function)): 

405 return token.get_real_name() if real_name else token.get_name() 

406 

407 

408class Statement(TokenList): 

409 """Represents a SQL statement.""" 

410 

411 def get_type(self): 

412 """Returns the type of a statement. 

413 

414 The returned value is a string holding an upper-cased reprint of 

415 the first DML or DDL keyword. If the first token in this group 

416 isn't a DML or DDL keyword "UNKNOWN" is returned. 

417 

418 Whitespaces and comments at the beginning of the statement 

419 are ignored. 

420 """ 

421 token = self.token_first(skip_cm=True) 

422 if token is None: 

423 # An "empty" statement that either has not tokens at all 

424 # or only whitespace tokens. 

425 return 'UNKNOWN' 

426 

427 elif token.ttype in (T.Keyword.DML, T.Keyword.DDL): 

428 return token.normalized 

429 

430 elif token.ttype == T.Keyword.CTE: 

431 # The WITH keyword should be followed by either an Identifier or 

432 # an IdentifierList containing the CTE definitions; the actual 

433 # DML keyword (e.g. SELECT, INSERT) will follow next. 

434 tidx = self.token_index(token) 

435 while tidx is not None: 

436 tidx, token = self.token_next(tidx, skip_ws=True) 

437 if isinstance(token, (Identifier, IdentifierList)): 

438 tidx, token = self.token_next(tidx, skip_ws=True) 

439 

440 if token is not None \ 

441 and token.ttype == T.Keyword.DML: 

442 return token.normalized 

443 

444 # Hmm, probably invalid syntax, so return unknown. 

445 return 'UNKNOWN' 

446 

447 

448class Identifier(NameAliasMixin, TokenList): 

449 """Represents an identifier. 

450 

451 Identifiers may have aliases or typecasts. 

452 """ 

453 

454 def is_wildcard(self): 

455 """Return ``True`` if this identifier contains a wildcard.""" 

456 _, token = self.token_next_by(t=T.Wildcard) 

457 return token is not None 

458 

459 def get_typecast(self): 

460 """Returns the typecast or ``None`` of this object as a string.""" 

461 midx, marker = self.token_next_by(m=(T.Punctuation, '::')) 

462 nidx, next_ = self.token_next(midx, skip_ws=False) 

463 return next_.value if next_ else None 

464 

465 def get_ordering(self): 

466 """Returns the ordering or ``None`` as uppercase string.""" 

467 _, ordering = self.token_next_by(t=T.Keyword.Order) 

468 return ordering.normalized if ordering else None 

469 

470 def get_array_indices(self): 

471 """Returns an iterator of index token lists""" 

472 

473 for token in self.tokens: 

474 if isinstance(token, SquareBrackets): 

475 # Use [1:-1] index to discard the square brackets 

476 yield token.tokens[1:-1] 

477 

478 

479class IdentifierList(TokenList): 

480 """A list of :class:`~sqlparse.sql.Identifier`\'s.""" 

481 

482 def get_identifiers(self): 

483 """Returns the identifiers. 

484 

485 Whitespaces and punctuations are not included in this generator. 

486 """ 

487 for token in self.tokens: 

488 if not (token.is_whitespace or token.match(T.Punctuation, ',')): 

489 yield token 

490 

491 

492class TypedLiteral(TokenList): 

493 """A typed literal, such as "date '2001-09-28'" or "interval '2 hours'".""" 

494 M_OPEN = [(T.Name.Builtin, None), (T.Keyword, "TIMESTAMP")] 

495 M_CLOSE = T.String.Single, None 

496 M_EXTEND = T.Keyword, ("DAY", "HOUR", "MINUTE", "MONTH", "SECOND", "YEAR") 

497 

498 

499class Parenthesis(TokenList): 

500 """Tokens between parenthesis.""" 

501 M_OPEN = T.Punctuation, '(' 

502 M_CLOSE = T.Punctuation, ')' 

503 

504 @property 

505 def _groupable_tokens(self): 

506 return self.tokens[1:-1] 

507 

508 

509class SquareBrackets(TokenList): 

510 """Tokens between square brackets""" 

511 M_OPEN = T.Punctuation, '[' 

512 M_CLOSE = T.Punctuation, ']' 

513 

514 @property 

515 def _groupable_tokens(self): 

516 return self.tokens[1:-1] 

517 

518 

519class Assignment(TokenList): 

520 """An assignment like 'var := val;'""" 

521 

522 

523class If(TokenList): 

524 """An 'if' clause with possible 'else if' or 'else' parts.""" 

525 M_OPEN = T.Keyword, 'IF' 

526 M_CLOSE = T.Keyword, 'END IF' 

527 

528 

529class For(TokenList): 

530 """A 'FOR' loop.""" 

531 M_OPEN = T.Keyword, ('FOR', 'FOREACH') 

532 M_CLOSE = T.Keyword, 'END LOOP' 

533 

534 

535class Comparison(TokenList): 

536 """A comparison used for example in WHERE clauses.""" 

537 

538 @property 

539 def left(self): 

540 return self.tokens[0] 

541 

542 @property 

543 def right(self): 

544 return self.tokens[-1] 

545 

546 

547class Comment(TokenList): 

548 """A comment.""" 

549 

550 def is_multiline(self): 

551 return self.tokens and self.tokens[0].ttype == T.Comment.Multiline 

552 

553 

554class Where(TokenList): 

555 """A WHERE clause.""" 

556 M_OPEN = T.Keyword, 'WHERE' 

557 M_CLOSE = T.Keyword, ( 

558 'ORDER BY', 'GROUP BY', 'LIMIT', 'UNION', 'UNION ALL', 'EXCEPT', 

559 'INTERSECT', 'HAVING', 'RETURNING', 'INTO') 

560 

561 

562class Over(TokenList): 

563 """An OVER clause.""" 

564 M_OPEN = T.Keyword, 'OVER' 

565 

566 

567class Having(TokenList): 

568 """A HAVING clause.""" 

569 M_OPEN = T.Keyword, 'HAVING' 

570 M_CLOSE = T.Keyword, ('ORDER BY', 'LIMIT') 

571 

572 

573class Case(TokenList): 

574 """A CASE statement with one or more WHEN and possibly an ELSE part.""" 

575 M_OPEN = T.Keyword, 'CASE' 

576 M_CLOSE = T.Keyword, 'END' 

577 

578 def get_cases(self, skip_ws=False): 

579 """Returns a list of 2-tuples (condition, value). 

580 

581 If an ELSE exists condition is None. 

582 """ 

583 CONDITION = 1 

584 VALUE = 2 

585 

586 ret = [] 

587 mode = CONDITION 

588 

589 for token in self.tokens: 

590 # Set mode from the current statement 

591 if token.match(T.Keyword, 'CASE') or (skip_ws and token.ttype in T.Whitespace): 

592 continue 

593 

594 elif token.match(T.Keyword, 'WHEN'): 

595 ret.append(([], [])) 

596 mode = CONDITION 

597 

598 elif token.match(T.Keyword, 'THEN'): 

599 mode = VALUE 

600 

601 elif token.match(T.Keyword, 'ELSE'): 

602 ret.append((None, [])) 

603 mode = VALUE 

604 

605 elif token.match(T.Keyword, 'END'): 

606 mode = None 

607 

608 # First condition without preceding WHEN 

609 if mode and not ret: 

610 ret.append(([], [])) 

611 

612 # Append token depending of the current mode 

613 if mode == CONDITION: 

614 ret[-1][0].append(token) 

615 

616 elif mode == VALUE: 

617 ret[-1][1].append(token) 

618 

619 # Return cases list 

620 return ret 

621 

622 

623class Function(NameAliasMixin, TokenList): 

624 """A function or procedure call.""" 

625 

626 def get_parameters(self): 

627 """Return a list of parameters.""" 

628 parenthesis = self.token_next_by(i=Parenthesis)[1] 

629 result = [] 

630 for token in parenthesis.tokens: 

631 if isinstance(token, IdentifierList): 

632 return token.get_identifiers() 

633 elif imt(token, i=(Function, Identifier, TypedLiteral), 

634 t=T.Literal): 

635 result.append(token) 

636 return result 

637 

638 def get_window(self): 

639 """Return the window if it exists.""" 

640 over_clause = self.token_next_by(i=Over) 

641 if not over_clause: 

642 return None 

643 return over_clause[1].tokens[-1] 

644 

645 

646class Begin(TokenList): 

647 """A BEGIN/END block.""" 

648 M_OPEN = T.Keyword, 'BEGIN' 

649 M_CLOSE = T.Keyword, 'END' 

650 

651 

652class Operation(TokenList): 

653 """Grouping of operations""" 

654 

655 

656class Values(TokenList): 

657 """Grouping of values""" 

658 

659 

660class Command(TokenList): 

661 """Grouping of CLI commands."""