Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlparse/sql.py: 54%

336 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-04 06:18 +0000

1# 

2# Copyright (C) 2009-2020 the sqlparse authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of python-sqlparse and is released under 

6# the BSD License: https://opensource.org/licenses/BSD-3-Clause 

7 

8"""This module contains classes representing syntactical elements of SQL.""" 

9 

10import re 

11 

12from sqlparse import tokens as T 

13from sqlparse.utils import imt, remove_quotes 

14 

15 

16class NameAliasMixin: 

17 """Implements get_real_name and get_alias.""" 

18 

19 def get_real_name(self): 

20 """Returns the real name (object name) of this identifier.""" 

21 # a.b 

22 dot_idx, _ = self.token_next_by(m=(T.Punctuation, '.')) 

23 return self._get_first_name(dot_idx, real_name=True) 

24 

25 def get_alias(self): 

26 """Returns the alias for this identifier or ``None``.""" 

27 

28 # "name AS alias" 

29 kw_idx, kw = self.token_next_by(m=(T.Keyword, 'AS')) 

30 if kw is not None: 

31 return self._get_first_name(kw_idx + 1, keywords=True) 

32 

33 # "name alias" or "complicated column expression alias" 

34 _, ws = self.token_next_by(t=T.Whitespace) 

35 if len(self.tokens) > 2 and ws is not None: 

36 return self._get_first_name(reverse=True) 

37 

38 

39class Token: 

40 """Base class for all other classes in this module. 

41 

42 It represents a single token and has two instance attributes: 

43 ``value`` is the unchanged value of the token and ``ttype`` is 

44 the type of the token. 

45 """ 

46 

47 __slots__ = ('value', 'ttype', 'parent', 'normalized', 'is_keyword', 

48 'is_group', 'is_whitespace') 

49 

50 def __init__(self, ttype, value): 

51 value = str(value) 

52 self.value = value 

53 self.ttype = ttype 

54 self.parent = None 

55 self.is_group = False 

56 self.is_keyword = ttype in T.Keyword 

57 self.is_whitespace = self.ttype in T.Whitespace 

58 self.normalized = value.upper() if self.is_keyword else value 

59 

60 def __str__(self): 

61 return self.value 

62 

63 # Pending tokenlist __len__ bug fix 

64 # def __len__(self): 

65 # return len(self.value) 

66 

67 def __repr__(self): 

68 cls = self._get_repr_name() 

69 value = self._get_repr_value() 

70 

71 q = '"' if value.startswith("'") and value.endswith("'") else "'" 

72 return "<{cls} {q}{value}{q} at 0x{id:2X}>".format( 

73 id=id(self), **locals()) 

74 

75 def _get_repr_name(self): 

76 return str(self.ttype).split('.')[-1] 

77 

78 def _get_repr_value(self): 

79 raw = str(self) 

80 if len(raw) > 7: 

81 raw = raw[:6] + '...' 

82 return re.sub(r'\s+', ' ', raw) 

83 

84 def flatten(self): 

85 """Resolve subgroups.""" 

86 yield self 

87 

88 def match(self, ttype, values, regex=False): 

89 """Checks whether the token matches the given arguments. 

90 

91 *ttype* is a token type. If this token doesn't match the given token 

92 type. 

93 *values* is a list of possible values for this token. The values 

94 are OR'ed together so if only one of the values matches ``True`` 

95 is returned. Except for keyword tokens the comparison is 

96 case-sensitive. For convenience it's OK to pass in a single string. 

97 If *regex* is ``True`` (default is ``False``) the given values are 

98 treated as regular expressions. 

99 """ 

100 type_matched = self.ttype is ttype 

101 if not type_matched or values is None: 

102 return type_matched 

103 

104 if isinstance(values, str): 

105 values = (values,) 

106 

107 if regex: 

108 # TODO: Add test for regex with is_keyboard = false 

109 flag = re.IGNORECASE if self.is_keyword else 0 

110 values = (re.compile(v, flag) for v in values) 

111 

112 for pattern in values: 

113 if pattern.search(self.normalized): 

114 return True 

115 return False 

116 

117 if self.is_keyword: 

118 values = (v.upper() for v in values) 

119 

120 return self.normalized in values 

121 

122 def within(self, group_cls): 

123 """Returns ``True`` if this token is within *group_cls*. 

124 

125 Use this method for example to check if an identifier is within 

126 a function: ``t.within(sql.Function)``. 

127 """ 

128 parent = self.parent 

129 while parent: 

130 if isinstance(parent, group_cls): 

131 return True 

132 parent = parent.parent 

133 return False 

134 

135 def is_child_of(self, other): 

136 """Returns ``True`` if this token is a direct child of *other*.""" 

137 return self.parent == other 

138 

139 def has_ancestor(self, other): 

140 """Returns ``True`` if *other* is in this tokens ancestry.""" 

141 parent = self.parent 

142 while parent: 

143 if parent == other: 

144 return True 

145 parent = parent.parent 

146 return False 

147 

148 

149class TokenList(Token): 

150 """A group of tokens. 

151 

152 It has an additional instance attribute ``tokens`` which holds a 

153 list of child-tokens. 

154 """ 

155 

156 __slots__ = 'tokens' 

157 

158 def __init__(self, tokens=None): 

159 self.tokens = tokens or [] 

160 [setattr(token, 'parent', self) for token in self.tokens] 

161 super().__init__(None, str(self)) 

162 self.is_group = True 

163 

164 def __str__(self): 

165 return ''.join(token.value for token in self.flatten()) 

166 

167 # weird bug 

168 # def __len__(self): 

169 # return len(self.tokens) 

170 

171 def __iter__(self): 

172 return iter(self.tokens) 

173 

174 def __getitem__(self, item): 

175 return self.tokens[item] 

176 

177 def _get_repr_name(self): 

178 return type(self).__name__ 

179 

180 def _pprint_tree(self, max_depth=None, depth=0, f=None, _pre=''): 

181 """Pretty-print the object tree.""" 

182 token_count = len(self.tokens) 

183 for idx, token in enumerate(self.tokens): 

184 cls = token._get_repr_name() 

185 value = token._get_repr_value() 

186 

187 last = idx == (token_count - 1) 

188 pre = '`- ' if last else '|- ' 

189 

190 q = '"' if value.startswith("'") and value.endswith("'") else "'" 

191 print("{_pre}{pre}{idx} {cls} {q}{value}{q}" 

192 .format(**locals()), file=f) 

193 

194 if token.is_group and (max_depth is None or depth < max_depth): 

195 parent_pre = ' ' if last else '| ' 

196 token._pprint_tree(max_depth, depth + 1, f, _pre + parent_pre) 

197 

198 def get_token_at_offset(self, offset): 

199 """Returns the token that is on position offset.""" 

200 idx = 0 

201 for token in self.flatten(): 

202 end = idx + len(token.value) 

203 if idx <= offset < end: 

204 return token 

205 idx = end 

206 

207 def flatten(self): 

208 """Generator yielding ungrouped tokens. 

209 

210 This method is recursively called for all child tokens. 

211 """ 

212 for token in self.tokens: 

213 if token.is_group: 

214 yield from token.flatten() 

215 else: 

216 yield token 

217 

218 def get_sublists(self): 

219 for token in self.tokens: 

220 if token.is_group: 

221 yield token 

222 

223 @property 

224 def _groupable_tokens(self): 

225 return self.tokens 

226 

227 def _token_matching(self, funcs, start=0, end=None, reverse=False): 

228 """next token that match functions""" 

229 if start is None: 

230 return None 

231 

232 if not isinstance(funcs, (list, tuple)): 

233 funcs = (funcs,) 

234 

235 if reverse: 

236 assert end is None 

237 indexes = range(start - 2, -1, -1) 

238 else: 

239 if end is None: 

240 end = len(self.tokens) 

241 indexes = range(start, end) 

242 for idx in indexes: 

243 token = self.tokens[idx] 

244 for func in funcs: 

245 if func(token): 

246 return idx, token 

247 return None, None 

248 

249 def token_first(self, skip_ws=True, skip_cm=False): 

250 """Returns the first child token. 

251 

252 If *skip_ws* is ``True`` (the default), whitespace 

253 tokens are ignored. 

254 

255 if *skip_cm* is ``True`` (default: ``False``), comments are 

256 ignored too. 

257 """ 

258 # this on is inconsistent, using Comment instead of T.Comment... 

259 def matcher(tk): 

260 return not ((skip_ws and tk.is_whitespace) 

261 or (skip_cm and imt(tk, t=T.Comment, i=Comment))) 

262 return self._token_matching(matcher)[1] 

263 

264 def token_next_by(self, i=None, m=None, t=None, idx=-1, end=None): 

265 idx += 1 

266 return self._token_matching(lambda tk: imt(tk, i, m, t), idx, end) 

267 

268 def token_not_matching(self, funcs, idx): 

269 funcs = (funcs,) if not isinstance(funcs, (list, tuple)) else funcs 

270 funcs = [lambda tk: not func(tk) for func in funcs] 

271 return self._token_matching(funcs, idx) 

272 

273 def token_matching(self, funcs, idx): 

274 return self._token_matching(funcs, idx)[1] 

275 

276 def token_prev(self, idx, skip_ws=True, skip_cm=False): 

277 """Returns the previous token relative to *idx*. 

278 

279 If *skip_ws* is ``True`` (the default) whitespace tokens are ignored. 

280 If *skip_cm* is ``True`` comments are ignored. 

281 ``None`` is returned if there's no previous token. 

282 """ 

283 return self.token_next(idx, skip_ws, skip_cm, _reverse=True) 

284 

285 # TODO: May need to re-add default value to idx 

286 def token_next(self, idx, skip_ws=True, skip_cm=False, _reverse=False): 

287 """Returns the next token relative to *idx*. 

288 

289 If *skip_ws* is ``True`` (the default) whitespace tokens are ignored. 

290 If *skip_cm* is ``True`` comments are ignored. 

291 ``None`` is returned if there's no next token. 

292 """ 

293 if idx is None: 

294 return None, None 

295 idx += 1 # alot of code usage current pre-compensates for this 

296 

297 def matcher(tk): 

298 return not ((skip_ws and tk.is_whitespace) 

299 or (skip_cm and imt(tk, t=T.Comment, i=Comment))) 

300 return self._token_matching(matcher, idx, reverse=_reverse) 

301 

302 def token_index(self, token, start=0): 

303 """Return list index of token.""" 

304 start = start if isinstance(start, int) else self.token_index(start) 

305 return start + self.tokens[start:].index(token) 

306 

307 def group_tokens(self, grp_cls, start, end, include_end=True, 

308 extend=False): 

309 """Replace tokens by an instance of *grp_cls*.""" 

310 start_idx = start 

311 start = self.tokens[start_idx] 

312 

313 end_idx = end + include_end 

314 

315 # will be needed later for new group_clauses 

316 # while skip_ws and tokens and tokens[-1].is_whitespace: 

317 # tokens = tokens[:-1] 

318 

319 if extend and isinstance(start, grp_cls): 

320 subtokens = self.tokens[start_idx + 1:end_idx] 

321 

322 grp = start 

323 grp.tokens.extend(subtokens) 

324 del self.tokens[start_idx + 1:end_idx] 

325 grp.value = str(start) 

326 else: 

327 subtokens = self.tokens[start_idx:end_idx] 

328 grp = grp_cls(subtokens) 

329 self.tokens[start_idx:end_idx] = [grp] 

330 grp.parent = self 

331 

332 for token in subtokens: 

333 token.parent = grp 

334 

335 return grp 

336 

337 def insert_before(self, where, token): 

338 """Inserts *token* before *where*.""" 

339 if not isinstance(where, int): 

340 where = self.token_index(where) 

341 token.parent = self 

342 self.tokens.insert(where, token) 

343 

344 def insert_after(self, where, token, skip_ws=True): 

345 """Inserts *token* after *where*.""" 

346 if not isinstance(where, int): 

347 where = self.token_index(where) 

348 nidx, next_ = self.token_next(where, skip_ws=skip_ws) 

349 token.parent = self 

350 if next_ is None: 

351 self.tokens.append(token) 

352 else: 

353 self.tokens.insert(nidx, token) 

354 

355 def has_alias(self): 

356 """Returns ``True`` if an alias is present.""" 

357 return self.get_alias() is not None 

358 

359 def get_alias(self): 

360 """Returns the alias for this identifier or ``None``.""" 

361 return None 

362 

363 def get_name(self): 

364 """Returns the name of this identifier. 

365 

366 This is either it's alias or it's real name. The returned valued can 

367 be considered as the name under which the object corresponding to 

368 this identifier is known within the current statement. 

369 """ 

370 return self.get_alias() or self.get_real_name() 

371 

372 def get_real_name(self): 

373 """Returns the real name (object name) of this identifier.""" 

374 return None 

375 

376 def get_parent_name(self): 

377 """Return name of the parent object if any. 

378 

379 A parent object is identified by the first occurring dot. 

380 """ 

381 dot_idx, _ = self.token_next_by(m=(T.Punctuation, '.')) 

382 _, prev_ = self.token_prev(dot_idx) 

383 return remove_quotes(prev_.value) if prev_ is not None else None 

384 

385 def _get_first_name(self, idx=None, reverse=False, keywords=False, 

386 real_name=False): 

387 """Returns the name of the first token with a name""" 

388 

389 tokens = self.tokens[idx:] if idx else self.tokens 

390 tokens = reversed(tokens) if reverse else tokens 

391 types = [T.Name, T.Wildcard, T.String.Symbol] 

392 

393 if keywords: 

394 types.append(T.Keyword) 

395 

396 for token in tokens: 

397 if token.ttype in types: 

398 return remove_quotes(token.value) 

399 elif isinstance(token, (Identifier, Function)): 

400 return token.get_real_name() if real_name else token.get_name() 

401 

402 

403class Statement(TokenList): 

404 """Represents a SQL statement.""" 

405 

406 def get_type(self): 

407 """Returns the type of a statement. 

408 

409 The returned value is a string holding an upper-cased reprint of 

410 the first DML or DDL keyword. If the first token in this group 

411 isn't a DML or DDL keyword "UNKNOWN" is returned. 

412 

413 Whitespaces and comments at the beginning of the statement 

414 are ignored. 

415 """ 

416 token = self.token_first(skip_cm=True) 

417 if token is None: 

418 # An "empty" statement that either has not tokens at all 

419 # or only whitespace tokens. 

420 return 'UNKNOWN' 

421 

422 elif token.ttype in (T.Keyword.DML, T.Keyword.DDL): 

423 return token.normalized 

424 

425 elif token.ttype == T.Keyword.CTE: 

426 # The WITH keyword should be followed by either an Identifier or 

427 # an IdentifierList containing the CTE definitions; the actual 

428 # DML keyword (e.g. SELECT, INSERT) will follow next. 

429 tidx = self.token_index(token) 

430 while tidx is not None: 

431 tidx, token = self.token_next(tidx, skip_ws=True) 

432 if isinstance(token, (Identifier, IdentifierList)): 

433 tidx, token = self.token_next(tidx, skip_ws=True) 

434 

435 if token is not None \ 

436 and token.ttype == T.Keyword.DML: 

437 return token.normalized 

438 

439 # Hmm, probably invalid syntax, so return unknown. 

440 return 'UNKNOWN' 

441 

442 

443class Identifier(NameAliasMixin, TokenList): 

444 """Represents an identifier. 

445 

446 Identifiers may have aliases or typecasts. 

447 """ 

448 

449 def is_wildcard(self): 

450 """Return ``True`` if this identifier contains a wildcard.""" 

451 _, token = self.token_next_by(t=T.Wildcard) 

452 return token is not None 

453 

454 def get_typecast(self): 

455 """Returns the typecast or ``None`` of this object as a string.""" 

456 midx, marker = self.token_next_by(m=(T.Punctuation, '::')) 

457 nidx, next_ = self.token_next(midx, skip_ws=False) 

458 return next_.value if next_ else None 

459 

460 def get_ordering(self): 

461 """Returns the ordering or ``None`` as uppercase string.""" 

462 _, ordering = self.token_next_by(t=T.Keyword.Order) 

463 return ordering.normalized if ordering else None 

464 

465 def get_array_indices(self): 

466 """Returns an iterator of index token lists""" 

467 

468 for token in self.tokens: 

469 if isinstance(token, SquareBrackets): 

470 # Use [1:-1] index to discard the square brackets 

471 yield token.tokens[1:-1] 

472 

473 

474class IdentifierList(TokenList): 

475 """A list of :class:`~sqlparse.sql.Identifier`\'s.""" 

476 

477 def get_identifiers(self): 

478 """Returns the identifiers. 

479 

480 Whitespaces and punctuations are not included in this generator. 

481 """ 

482 for token in self.tokens: 

483 if not (token.is_whitespace or token.match(T.Punctuation, ',')): 

484 yield token 

485 

486 

487class TypedLiteral(TokenList): 

488 """A typed literal, such as "date '2001-09-28'" or "interval '2 hours'".""" 

489 M_OPEN = [(T.Name.Builtin, None), (T.Keyword, "TIMESTAMP")] 

490 M_CLOSE = T.String.Single, None 

491 M_EXTEND = T.Keyword, ("DAY", "HOUR", "MINUTE", "MONTH", "SECOND", "YEAR") 

492 

493 

494class Parenthesis(TokenList): 

495 """Tokens between parenthesis.""" 

496 M_OPEN = T.Punctuation, '(' 

497 M_CLOSE = T.Punctuation, ')' 

498 

499 @property 

500 def _groupable_tokens(self): 

501 return self.tokens[1:-1] 

502 

503 

504class SquareBrackets(TokenList): 

505 """Tokens between square brackets""" 

506 M_OPEN = T.Punctuation, '[' 

507 M_CLOSE = T.Punctuation, ']' 

508 

509 @property 

510 def _groupable_tokens(self): 

511 return self.tokens[1:-1] 

512 

513 

514class Assignment(TokenList): 

515 """An assignment like 'var := val;'""" 

516 

517 

518class If(TokenList): 

519 """An 'if' clause with possible 'else if' or 'else' parts.""" 

520 M_OPEN = T.Keyword, 'IF' 

521 M_CLOSE = T.Keyword, 'END IF' 

522 

523 

524class For(TokenList): 

525 """A 'FOR' loop.""" 

526 M_OPEN = T.Keyword, ('FOR', 'FOREACH') 

527 M_CLOSE = T.Keyword, 'END LOOP' 

528 

529 

530class Comparison(TokenList): 

531 """A comparison used for example in WHERE clauses.""" 

532 

533 @property 

534 def left(self): 

535 return self.tokens[0] 

536 

537 @property 

538 def right(self): 

539 return self.tokens[-1] 

540 

541 

542class Comment(TokenList): 

543 """A comment.""" 

544 

545 def is_multiline(self): 

546 return self.tokens and self.tokens[0].ttype == T.Comment.Multiline 

547 

548 

549class Where(TokenList): 

550 """A WHERE clause.""" 

551 M_OPEN = T.Keyword, 'WHERE' 

552 M_CLOSE = T.Keyword, ( 

553 'ORDER BY', 'GROUP BY', 'LIMIT', 'UNION', 'UNION ALL', 'EXCEPT', 

554 'HAVING', 'RETURNING', 'INTO') 

555 

556 

557class Having(TokenList): 

558 """A HAVING clause.""" 

559 M_OPEN = T.Keyword, 'HAVING' 

560 M_CLOSE = T.Keyword, ('ORDER BY', 'LIMIT') 

561 

562 

563class Case(TokenList): 

564 """A CASE statement with one or more WHEN and possibly an ELSE part.""" 

565 M_OPEN = T.Keyword, 'CASE' 

566 M_CLOSE = T.Keyword, 'END' 

567 

568 def get_cases(self, skip_ws=False): 

569 """Returns a list of 2-tuples (condition, value). 

570 

571 If an ELSE exists condition is None. 

572 """ 

573 CONDITION = 1 

574 VALUE = 2 

575 

576 ret = [] 

577 mode = CONDITION 

578 

579 for token in self.tokens: 

580 # Set mode from the current statement 

581 if token.match(T.Keyword, 'CASE'): 

582 continue 

583 

584 elif skip_ws and token.ttype in T.Whitespace: 

585 continue 

586 

587 elif token.match(T.Keyword, 'WHEN'): 

588 ret.append(([], [])) 

589 mode = CONDITION 

590 

591 elif token.match(T.Keyword, 'THEN'): 

592 mode = VALUE 

593 

594 elif token.match(T.Keyword, 'ELSE'): 

595 ret.append((None, [])) 

596 mode = VALUE 

597 

598 elif token.match(T.Keyword, 'END'): 

599 mode = None 

600 

601 # First condition without preceding WHEN 

602 if mode and not ret: 

603 ret.append(([], [])) 

604 

605 # Append token depending of the current mode 

606 if mode == CONDITION: 

607 ret[-1][0].append(token) 

608 

609 elif mode == VALUE: 

610 ret[-1][1].append(token) 

611 

612 # Return cases list 

613 return ret 

614 

615 

616class Function(NameAliasMixin, TokenList): 

617 """A function or procedure call.""" 

618 

619 def get_parameters(self): 

620 """Return a list of parameters.""" 

621 parenthesis = self.tokens[-1] 

622 result = [] 

623 for token in parenthesis.tokens: 

624 if isinstance(token, IdentifierList): 

625 return token.get_identifiers() 

626 elif imt(token, i=(Function, Identifier, TypedLiteral), 

627 t=T.Literal): 

628 result.append(token) 

629 return result 

630 

631 

632class Begin(TokenList): 

633 """A BEGIN/END block.""" 

634 M_OPEN = T.Keyword, 'BEGIN' 

635 M_CLOSE = T.Keyword, 'END' 

636 

637 

638class Operation(TokenList): 

639 """Grouping of operations""" 

640 

641 

642class Values(TokenList): 

643 """Grouping of values""" 

644 

645 

646class Command(TokenList): 

647 """Grouping of CLI commands."""