Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlparse/sql.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

345 statements  

1# 

2# Copyright (C) 2009-2020 the sqlparse authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of python-sqlparse and is released under 

6# the BSD License: https://opensource.org/licenses/BSD-3-Clause 

7 

8"""This module contains classes representing syntactical elements of SQL.""" 

9 

10import re 

11 

12from sqlparse import tokens as T 

13from sqlparse.utils import imt, remove_quotes 

14 

15 

16class NameAliasMixin: 

17 """Implements get_real_name and get_alias.""" 

18 

19 def get_real_name(self): 

20 """Returns the real name (object name) of this identifier.""" 

21 # a.b 

22 dot_idx, _ = self.token_next_by(m=(T.Punctuation, '.')) 

23 return self._get_first_name(dot_idx, real_name=True) 

24 

25 def get_alias(self): 

26 """Returns the alias for this identifier or ``None``.""" 

27 

28 # "name AS alias" 

29 kw_idx, kw = self.token_next_by(m=(T.Keyword, 'AS')) 

30 if kw is not None: 

31 return self._get_first_name(kw_idx + 1, keywords=True) 

32 

33 # "name alias" or "complicated column expression alias" 

34 _, ws = self.token_next_by(t=T.Whitespace) 

35 if len(self.tokens) > 2 and ws is not None: 

36 return self._get_first_name(reverse=True) 

37 

38 

39class Token: 

40 """Base class for all other classes in this module. 

41 

42 It represents a single token and has two instance attributes: 

43 ``value`` is the unchanged value of the token and ``ttype`` is 

44 the type of the token. 

45 """ 

46 

47 __slots__ = ('value', 'ttype', 'parent', 'normalized', 'is_keyword', 

48 'is_group', 'is_whitespace', 'is_newline') 

49 

50 def __init__(self, ttype, value): 

51 value = str(value) 

52 self.value = value 

53 self.ttype = ttype 

54 self.parent = None 

55 self.is_group = False 

56 self.is_keyword = ttype in T.Keyword 

57 self.is_whitespace = self.ttype in T.Whitespace 

58 self.is_newline = self.ttype in T.Newline 

59 self.normalized = value.upper() if self.is_keyword else value 

60 

61 def __str__(self): 

62 return self.value 

63 

64 # Pending tokenlist __len__ bug fix 

65 # def __len__(self): 

66 # return len(self.value) 

67 

68 def __repr__(self): 

69 cls = self._get_repr_name() 

70 value = self._get_repr_value() 

71 

72 q = '"' if value.startswith("'") and value.endswith("'") else "'" 

73 return "<{cls} {q}{value}{q} at 0x{id:2X}>".format( 

74 id=id(self), **locals()) 

75 

76 def _get_repr_name(self): 

77 return str(self.ttype).split('.')[-1] 

78 

79 def _get_repr_value(self): 

80 raw = str(self) 

81 if len(raw) > 7: 

82 raw = raw[:6] + '...' 

83 return re.sub(r'\s+', ' ', raw) 

84 

85 def flatten(self): 

86 """Resolve subgroups.""" 

87 yield self 

88 

89 def match(self, ttype, values, regex=False): 

90 """Checks whether the token matches the given arguments. 

91 

92 *ttype* is a token type. If this token doesn't match the given token 

93 type. 

94 *values* is a list of possible values for this token. The values 

95 are OR'ed together so if only one of the values matches ``True`` 

96 is returned. Except for keyword tokens the comparison is 

97 case-sensitive. For convenience it's OK to pass in a single string. 

98 If *regex* is ``True`` (default is ``False``) the given values are 

99 treated as regular expressions. 

100 """ 

101 type_matched = self.ttype is ttype 

102 if not type_matched or values is None: 

103 return type_matched 

104 

105 if isinstance(values, str): 

106 values = (values,) 

107 

108 if regex: 

109 # TODO: Add test for regex with is_keyboard = false 

110 flag = re.IGNORECASE if self.is_keyword else 0 

111 values = (re.compile(v, flag) for v in values) 

112 

113 for pattern in values: 

114 if pattern.search(self.normalized): 

115 return True 

116 return False 

117 

118 if self.is_keyword: 

119 values = (v.upper() for v in values) 

120 

121 return self.normalized in values 

122 

123 def within(self, group_cls): 

124 """Returns ``True`` if this token is within *group_cls*. 

125 

126 Use this method for example to check if an identifier is within 

127 a function: ``t.within(sql.Function)``. 

128 """ 

129 parent = self.parent 

130 while parent: 

131 if isinstance(parent, group_cls): 

132 return True 

133 parent = parent.parent 

134 return False 

135 

136 def is_child_of(self, other): 

137 """Returns ``True`` if this token is a direct child of *other*.""" 

138 return self.parent == other 

139 

140 def has_ancestor(self, other): 

141 """Returns ``True`` if *other* is in this tokens ancestry.""" 

142 parent = self.parent 

143 while parent: 

144 if parent == other: 

145 return True 

146 parent = parent.parent 

147 return False 

148 

149 

150class TokenList(Token): 

151 """A group of tokens. 

152 

153 It has an additional instance attribute ``tokens`` which holds a 

154 list of child-tokens. 

155 """ 

156 

157 __slots__ = 'tokens' 

158 

159 def __init__(self, tokens=None): 

160 self.tokens = tokens or [] 

161 [setattr(token, 'parent', self) for token in self.tokens] 

162 super().__init__(None, str(self)) 

163 self.is_group = True 

164 

165 def __str__(self): 

166 return ''.join(token.value for token in self.flatten()) 

167 

168 # weird bug 

169 # def __len__(self): 

170 # return len(self.tokens) 

171 

172 def __iter__(self): 

173 return iter(self.tokens) 

174 

175 def __getitem__(self, item): 

176 return self.tokens[item] 

177 

178 def _get_repr_name(self): 

179 return type(self).__name__ 

180 

181 def _pprint_tree(self, max_depth=None, depth=0, f=None, _pre=''): 

182 """Pretty-print the object tree.""" 

183 token_count = len(self.tokens) 

184 for idx, token in enumerate(self.tokens): 

185 cls = token._get_repr_name() 

186 value = token._get_repr_value() 

187 

188 last = idx == (token_count - 1) 

189 pre = '`- ' if last else '|- ' 

190 

191 q = '"' if value.startswith("'") and value.endswith("'") else "'" 

192 print(f"{_pre}{pre}{idx} {cls} {q}{value}{q}", file=f) 

193 

194 if token.is_group and (max_depth is None or depth < max_depth): 

195 parent_pre = ' ' if last else '| ' 

196 token._pprint_tree(max_depth, depth + 1, f, _pre + parent_pre) 

197 

198 def get_token_at_offset(self, offset): 

199 """Returns the token that is on position offset.""" 

200 idx = 0 

201 for token in self.flatten(): 

202 end = idx + len(token.value) 

203 if idx <= offset < end: 

204 return token 

205 idx = end 

206 

207 def flatten(self): 

208 """Generator yielding ungrouped tokens. 

209 

210 This method is recursively called for all child tokens. 

211 """ 

212 for token in self.tokens: 

213 if token.is_group: 

214 yield from token.flatten() 

215 else: 

216 yield token 

217 

218 def get_sublists(self): 

219 for token in self.tokens: 

220 if token.is_group: 

221 yield token 

222 

223 @property 

224 def _groupable_tokens(self): 

225 return self.tokens 

226 

227 def _token_matching(self, funcs, start=0, end=None, reverse=False): 

228 """next token that match functions""" 

229 if start is None: 

230 return None 

231 

232 if not isinstance(funcs, (list, tuple)): 

233 funcs = (funcs,) 

234 

235 if reverse: 

236 assert end is None 

237 indexes = range(start - 2, -1, -1) 

238 else: 

239 if end is None: 

240 end = len(self.tokens) 

241 indexes = range(start, end) 

242 for idx in indexes: 

243 token = self.tokens[idx] 

244 for func in funcs: 

245 if func(token): 

246 return idx, token 

247 return None, None 

248 

249 def token_first(self, skip_ws=True, skip_cm=False): 

250 """Returns the first child token. 

251 

252 If *skip_ws* is ``True`` (the default), whitespace 

253 tokens are ignored. 

254 

255 if *skip_cm* is ``True`` (default: ``False``), comments are 

256 ignored too. 

257 """ 

258 # this on is inconsistent, using Comment instead of T.Comment... 

259 def matcher(tk): 

260 return not ((skip_ws and tk.is_whitespace) 

261 or (skip_cm and imt(tk, t=T.Comment, i=Comment))) 

262 return self._token_matching(matcher)[1] 

263 

264 def token_next_by(self, i=None, m=None, t=None, idx=-1, end=None): 

265 idx += 1 

266 return self._token_matching(lambda tk: imt(tk, i, m, t), idx, end) 

267 

268 def token_not_matching(self, funcs, idx): 

269 funcs = (funcs,) if not isinstance(funcs, (list, tuple)) else funcs 

270 funcs = [lambda tk: not func(tk) for func in funcs] 

271 return self._token_matching(funcs, idx) 

272 

273 def token_matching(self, funcs, idx): 

274 return self._token_matching(funcs, idx)[1] 

275 

276 def token_prev(self, idx, skip_ws=True, skip_cm=False): 

277 """Returns the previous token relative to *idx*. 

278 

279 If *skip_ws* is ``True`` (the default) whitespace tokens are ignored. 

280 If *skip_cm* is ``True`` comments are ignored. 

281 ``None`` is returned if there's no previous token. 

282 """ 

283 return self.token_next(idx, skip_ws, skip_cm, _reverse=True) 

284 

285 # TODO: May need to re-add default value to idx 

286 def token_next(self, idx, skip_ws=True, skip_cm=False, _reverse=False): 

287 """Returns the next token relative to *idx*. 

288 

289 If *skip_ws* is ``True`` (the default) whitespace tokens are ignored. 

290 If *skip_cm* is ``True`` comments are ignored. 

291 ``None`` is returned if there's no next token. 

292 """ 

293 if idx is None: 

294 return None, None 

295 idx += 1 # alot of code usage current pre-compensates for this 

296 

297 def matcher(tk): 

298 return not ((skip_ws and tk.is_whitespace) 

299 or (skip_cm and imt(tk, t=T.Comment, i=Comment))) 

300 return self._token_matching(matcher, idx, reverse=_reverse) 

301 

302 def token_index(self, token, start=0): 

303 """Return list index of token.""" 

304 start = start if isinstance(start, int) else self.token_index(start) 

305 return start + self.tokens[start:].index(token) 

306 

307 def group_tokens(self, grp_cls, start, end, include_end=True, 

308 extend=False): 

309 """Replace tokens by an instance of *grp_cls*.""" 

310 start_idx = start 

311 start = self.tokens[start_idx] 

312 

313 end_idx = end + include_end 

314 

315 # will be needed later for new group_clauses 

316 # while skip_ws and tokens and tokens[-1].is_whitespace: 

317 # tokens = tokens[:-1] 

318 

319 if extend and isinstance(start, grp_cls): 

320 subtokens = self.tokens[start_idx + 1:end_idx] 

321 

322 grp = start 

323 grp.tokens.extend(subtokens) 

324 del self.tokens[start_idx + 1:end_idx] 

325 grp.value = str(start) 

326 else: 

327 subtokens = self.tokens[start_idx:end_idx] 

328 grp = grp_cls(subtokens) 

329 self.tokens[start_idx:end_idx] = [grp] 

330 grp.parent = self 

331 

332 for token in subtokens: 

333 token.parent = grp 

334 

335 return grp 

336 

337 def insert_before(self, where, token): 

338 """Inserts *token* before *where*.""" 

339 if not isinstance(where, int): 

340 where = self.token_index(where) 

341 token.parent = self 

342 self.tokens.insert(where, token) 

343 

344 def insert_after(self, where, token, skip_ws=True): 

345 """Inserts *token* after *where*.""" 

346 if not isinstance(where, int): 

347 where = self.token_index(where) 

348 nidx, next_ = self.token_next(where, skip_ws=skip_ws) 

349 token.parent = self 

350 if next_ is None: 

351 self.tokens.append(token) 

352 else: 

353 self.tokens.insert(nidx, token) 

354 

355 def has_alias(self): 

356 """Returns ``True`` if an alias is present.""" 

357 return self.get_alias() is not None 

358 

359 def get_alias(self): 

360 """Returns the alias for this identifier or ``None``.""" 

361 return None 

362 

363 def get_name(self): 

364 """Returns the name of this identifier. 

365 

366 This is either it's alias or it's real name. The returned valued can 

367 be considered as the name under which the object corresponding to 

368 this identifier is known within the current statement. 

369 """ 

370 return self.get_alias() or self.get_real_name() 

371 

372 def get_real_name(self): 

373 """Returns the real name (object name) of this identifier.""" 

374 return None 

375 

376 def get_parent_name(self): 

377 """Return name of the parent object if any. 

378 

379 A parent object is identified by the first occurring dot. 

380 """ 

381 dot_idx, _ = self.token_next_by(m=(T.Punctuation, '.')) 

382 _, prev_ = self.token_prev(dot_idx) 

383 return remove_quotes(prev_.value) if prev_ is not None else None 

384 

385 def _get_first_name(self, idx=None, reverse=False, keywords=False, 

386 real_name=False): 

387 """Returns the name of the first token with a name""" 

388 

389 tokens = self.tokens[idx:] if idx else self.tokens 

390 tokens = reversed(tokens) if reverse else tokens 

391 types = [T.Name, T.Wildcard, T.String.Symbol] 

392 

393 if keywords: 

394 types.append(T.Keyword) 

395 

396 for token in tokens: 

397 if token.ttype in types: 

398 return remove_quotes(token.value) 

399 elif isinstance(token, (Identifier, Function)): 

400 return token.get_real_name() if real_name else token.get_name() 

401 

402 

403class Statement(TokenList): 

404 """Represents a SQL statement.""" 

405 

406 def get_type(self): 

407 """Returns the type of a statement. 

408 

409 The returned value is a string holding an upper-cased reprint of 

410 the first DML or DDL keyword. If the first token in this group 

411 isn't a DML or DDL keyword "UNKNOWN" is returned. 

412 

413 Whitespaces and comments at the beginning of the statement 

414 are ignored. 

415 """ 

416 token = self.token_first(skip_cm=True) 

417 if token is None: 

418 # An "empty" statement that either has not tokens at all 

419 # or only whitespace tokens. 

420 return 'UNKNOWN' 

421 

422 elif token.ttype in (T.Keyword.DML, T.Keyword.DDL): 

423 return token.normalized 

424 

425 elif token.ttype == T.Keyword.CTE: 

426 # The WITH keyword should be followed by either an Identifier or 

427 # an IdentifierList containing the CTE definitions; the actual 

428 # DML keyword (e.g. SELECT, INSERT) will follow next. 

429 tidx = self.token_index(token) 

430 while tidx is not None: 

431 tidx, token = self.token_next(tidx, skip_ws=True) 

432 if isinstance(token, (Identifier, IdentifierList)): 

433 tidx, token = self.token_next(tidx, skip_ws=True) 

434 

435 if token is not None \ 

436 and token.ttype == T.Keyword.DML: 

437 return token.normalized 

438 

439 # Hmm, probably invalid syntax, so return unknown. 

440 return 'UNKNOWN' 

441 

442 

443class Identifier(NameAliasMixin, TokenList): 

444 """Represents an identifier. 

445 

446 Identifiers may have aliases or typecasts. 

447 """ 

448 

449 def is_wildcard(self): 

450 """Return ``True`` if this identifier contains a wildcard.""" 

451 _, token = self.token_next_by(t=T.Wildcard) 

452 return token is not None 

453 

454 def get_typecast(self): 

455 """Returns the typecast or ``None`` of this object as a string.""" 

456 midx, marker = self.token_next_by(m=(T.Punctuation, '::')) 

457 nidx, next_ = self.token_next(midx, skip_ws=False) 

458 return next_.value if next_ else None 

459 

460 def get_ordering(self): 

461 """Returns the ordering or ``None`` as uppercase string.""" 

462 _, ordering = self.token_next_by(t=T.Keyword.Order) 

463 return ordering.normalized if ordering else None 

464 

465 def get_array_indices(self): 

466 """Returns an iterator of index token lists""" 

467 

468 for token in self.tokens: 

469 if isinstance(token, SquareBrackets): 

470 # Use [1:-1] index to discard the square brackets 

471 yield token.tokens[1:-1] 

472 

473 

474class IdentifierList(TokenList): 

475 """A list of :class:`~sqlparse.sql.Identifier`\'s.""" 

476 

477 def get_identifiers(self): 

478 """Returns the identifiers. 

479 

480 Whitespaces and punctuations are not included in this generator. 

481 """ 

482 for token in self.tokens: 

483 if not (token.is_whitespace or token.match(T.Punctuation, ',')): 

484 yield token 

485 

486 

487class TypedLiteral(TokenList): 

488 """A typed literal, such as "date '2001-09-28'" or "interval '2 hours'".""" 

489 M_OPEN = [(T.Name.Builtin, None), (T.Keyword, "TIMESTAMP")] 

490 M_CLOSE = T.String.Single, None 

491 M_EXTEND = T.Keyword, ("DAY", "HOUR", "MINUTE", "MONTH", "SECOND", "YEAR") 

492 

493 

494class Parenthesis(TokenList): 

495 """Tokens between parenthesis.""" 

496 M_OPEN = T.Punctuation, '(' 

497 M_CLOSE = T.Punctuation, ')' 

498 

499 @property 

500 def _groupable_tokens(self): 

501 return self.tokens[1:-1] 

502 

503 

504class SquareBrackets(TokenList): 

505 """Tokens between square brackets""" 

506 M_OPEN = T.Punctuation, '[' 

507 M_CLOSE = T.Punctuation, ']' 

508 

509 @property 

510 def _groupable_tokens(self): 

511 return self.tokens[1:-1] 

512 

513 

514class Assignment(TokenList): 

515 """An assignment like 'var := val;'""" 

516 

517 

518class If(TokenList): 

519 """An 'if' clause with possible 'else if' or 'else' parts.""" 

520 M_OPEN = T.Keyword, 'IF' 

521 M_CLOSE = T.Keyword, 'END IF' 

522 

523 

524class For(TokenList): 

525 """A 'FOR' loop.""" 

526 M_OPEN = T.Keyword, ('FOR', 'FOREACH') 

527 M_CLOSE = T.Keyword, 'END LOOP' 

528 

529 

530class Comparison(TokenList): 

531 """A comparison used for example in WHERE clauses.""" 

532 

533 @property 

534 def left(self): 

535 return self.tokens[0] 

536 

537 @property 

538 def right(self): 

539 return self.tokens[-1] 

540 

541 

542class Comment(TokenList): 

543 """A comment.""" 

544 

545 def is_multiline(self): 

546 return self.tokens and self.tokens[0].ttype == T.Comment.Multiline 

547 

548 

549class Where(TokenList): 

550 """A WHERE clause.""" 

551 M_OPEN = T.Keyword, 'WHERE' 

552 M_CLOSE = T.Keyword, ( 

553 'ORDER BY', 'GROUP BY', 'LIMIT', 'UNION', 'UNION ALL', 'EXCEPT', 

554 'HAVING', 'RETURNING', 'INTO') 

555 

556 

557class Over(TokenList): 

558 """An OVER clause.""" 

559 M_OPEN = T.Keyword, 'OVER' 

560 

561 

562class Having(TokenList): 

563 """A HAVING clause.""" 

564 M_OPEN = T.Keyword, 'HAVING' 

565 M_CLOSE = T.Keyword, ('ORDER BY', 'LIMIT') 

566 

567 

568class Case(TokenList): 

569 """A CASE statement with one or more WHEN and possibly an ELSE part.""" 

570 M_OPEN = T.Keyword, 'CASE' 

571 M_CLOSE = T.Keyword, 'END' 

572 

573 def get_cases(self, skip_ws=False): 

574 """Returns a list of 2-tuples (condition, value). 

575 

576 If an ELSE exists condition is None. 

577 """ 

578 CONDITION = 1 

579 VALUE = 2 

580 

581 ret = [] 

582 mode = CONDITION 

583 

584 for token in self.tokens: 

585 # Set mode from the current statement 

586 if token.match(T.Keyword, 'CASE'): 

587 continue 

588 

589 elif skip_ws and token.ttype in T.Whitespace: 

590 continue 

591 

592 elif token.match(T.Keyword, 'WHEN'): 

593 ret.append(([], [])) 

594 mode = CONDITION 

595 

596 elif token.match(T.Keyword, 'THEN'): 

597 mode = VALUE 

598 

599 elif token.match(T.Keyword, 'ELSE'): 

600 ret.append((None, [])) 

601 mode = VALUE 

602 

603 elif token.match(T.Keyword, 'END'): 

604 mode = None 

605 

606 # First condition without preceding WHEN 

607 if mode and not ret: 

608 ret.append(([], [])) 

609 

610 # Append token depending of the current mode 

611 if mode == CONDITION: 

612 ret[-1][0].append(token) 

613 

614 elif mode == VALUE: 

615 ret[-1][1].append(token) 

616 

617 # Return cases list 

618 return ret 

619 

620 

621class Function(NameAliasMixin, TokenList): 

622 """A function or procedure call.""" 

623 

624 def get_parameters(self): 

625 """Return a list of parameters.""" 

626 parenthesis = self.token_next_by(i=Parenthesis)[1] 

627 result = [] 

628 for token in parenthesis.tokens: 

629 if isinstance(token, IdentifierList): 

630 return token.get_identifiers() 

631 elif imt(token, i=(Function, Identifier, TypedLiteral), 

632 t=T.Literal): 

633 result.append(token) 

634 return result 

635 

636 def get_window(self): 

637 """Return the window if it exists.""" 

638 over_clause = self.token_next_by(i=Over) 

639 if not over_clause: 

640 return None 

641 return over_clause[1].tokens[-1] 

642 

643 

644class Begin(TokenList): 

645 """A BEGIN/END block.""" 

646 M_OPEN = T.Keyword, 'BEGIN' 

647 M_CLOSE = T.Keyword, 'END' 

648 

649 

650class Operation(TokenList): 

651 """Grouping of operations""" 

652 

653 

654class Values(TokenList): 

655 """Grouping of values""" 

656 

657 

658class Command(TokenList): 

659 """Grouping of CLI commands."""