Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlparse/sql.py: 36%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Copyright (C) 2009-2020 the sqlparse authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of python-sqlparse and is released under
6# the BSD License: https://opensource.org/licenses/BSD-3-Clause
8"""This module contains classes representing syntactical elements of SQL."""
10import re
12from sqlparse import tokens as T
13from sqlparse.utils import imt, remove_quotes
16class NameAliasMixin:
17 """Implements get_real_name and get_alias."""
19 def get_real_name(self):
20 """Returns the real name (object name) of this identifier."""
21 # a.b
22 dot_idx, _ = self.token_next_by(m=(T.Punctuation, '.'))
23 return self._get_first_name(dot_idx, real_name=True)
25 def get_alias(self):
26 """Returns the alias for this identifier or ``None``."""
28 # "name AS alias"
29 kw_idx, kw = self.token_next_by(m=(T.Keyword, 'AS'))
30 if kw is not None:
31 return self._get_first_name(kw_idx + 1, keywords=True)
33 # "name alias" or "complicated column expression alias"
34 _, ws = self.token_next_by(t=T.Whitespace)
35 if len(self.tokens) > 2 and ws is not None:
36 return self._get_first_name(reverse=True)
39class Token:
40 """Base class for all other classes in this module.
42 It represents a single token and has two instance attributes:
43 ``value`` is the unchanged value of the token and ``ttype`` is
44 the type of the token.
45 """
47 __slots__ = (
48 'is_group',
49 'is_keyword',
50 'is_newline',
51 'is_whitespace',
52 'normalized',
53 'parent',
54 'ttype',
55 'value',
56 )
58 def __init__(self, ttype, value):
59 value = str(value)
60 self.value = value
61 self.ttype = ttype
62 self.parent = None
63 self.is_group = False
64 self.is_keyword = ttype in T.Keyword
65 self.is_whitespace = self.ttype in T.Whitespace
66 self.is_newline = self.ttype in T.Newline
67 self.normalized = value.upper() if self.is_keyword else value
69 def __str__(self):
70 return self.value
72 # Pending tokenlist __len__ bug fix
73 # def __len__(self):
74 # return len(self.value)
76 def __repr__(self):
77 cls = self._get_repr_name()
78 value = self._get_repr_value()
80 q = '"' if value.startswith("'") and value.endswith("'") else "'"
81 return "<{cls} {q}{value}{q} at 0x{id:2X}>".format(
82 id=id(self), **locals())
84 def _get_repr_name(self):
85 return str(self.ttype).split('.')[-1]
87 def _get_repr_value(self):
88 raw = str(self)
89 if len(raw) > 7:
90 raw = raw[:6] + '...'
91 return re.sub(r'\s+', ' ', raw)
93 def flatten(self):
94 """Resolve subgroups."""
95 yield self
97 def match(self, ttype, values, regex=False):
98 """Checks whether the token matches the given arguments.
100 *ttype* is a token type as defined in `sqlparse.tokens`. If it does
101 not match, ``False`` is returned.
102 *values* is a list of possible values for this token. For match to be
103 considered valid, the token value needs to be in this list. For tokens
104 of type ``Keyword`` the comparison is case-insensitive. For
105 convenience, a single value can be given passed as a string.
106 If *regex* is ``True``, the given values are treated as regular
107 expressions. Partial matches are allowed. Defaults to ``False``.
108 """
109 type_matched = self.ttype is ttype
110 if not type_matched or values is None:
111 return type_matched
113 if isinstance(values, str):
114 values = (values,)
116 if regex:
117 # TODO: Add test for regex with is_keyword = false
118 flag = re.IGNORECASE if self.is_keyword else 0
119 values = (re.compile(v, flag) for v in values)
121 return any(pattern.search(self.normalized) for pattern in values)
123 if self.is_keyword:
124 values = (v.upper() for v in values)
126 return self.normalized in values
128 def within(self, group_cls):
129 """Returns ``True`` if this token is within *group_cls*.
131 Use this method for example to check if an identifier is within
132 a function: ``t.within(sql.Function)``.
133 """
134 parent = self.parent
135 while parent:
136 if isinstance(parent, group_cls):
137 return True
138 parent = parent.parent
139 return False
141 def is_child_of(self, other):
142 """Returns ``True`` if this token is a direct child of *other*."""
143 return self.parent == other
145 def has_ancestor(self, other):
146 """Returns ``True`` if *other* is in this tokens ancestry."""
147 parent = self.parent
148 while parent:
149 if parent == other:
150 return True
151 parent = parent.parent
152 return False
155class TokenList(Token):
156 """A group of tokens.
158 It has an additional instance attribute ``tokens`` which holds a
159 list of child-tokens.
160 """
162 __slots__ = 'tokens'
164 def __init__(self, tokens=None):
165 self.tokens = tokens or []
166 [setattr(token, 'parent', self) for token in self.tokens]
167 super().__init__(None, ''.join(token.value for token in self.tokens))
168 self.is_group = True
170 def __str__(self):
171 return ''.join(token.value for token in self.flatten())
173 # weird bug
174 # def __len__(self):
175 # return len(self.tokens)
177 def __iter__(self):
178 return iter(self.tokens)
180 def __getitem__(self, item):
181 return self.tokens[item]
183 def _get_repr_name(self):
184 return type(self).__name__
186 def _pprint_tree(self, max_depth=None, depth=0, f=None, _pre=''):
187 """Pretty-print the object tree."""
188 token_count = len(self.tokens)
189 for idx, token in enumerate(self.tokens):
190 cls = token._get_repr_name()
191 value = token._get_repr_value()
193 last = idx == (token_count - 1)
194 pre = '`- ' if last else '|- '
196 q = '"' if value.startswith("'") and value.endswith("'") else "'"
197 print(f"{_pre}{pre}{idx} {cls} {q}{value}{q}", file=f)
199 if token.is_group and (max_depth is None or depth < max_depth):
200 parent_pre = ' ' if last else '| '
201 token._pprint_tree(max_depth, depth + 1, f, _pre + parent_pre)
203 def get_token_at_offset(self, offset):
204 """Returns the token that is on position offset."""
205 idx = 0
206 for token in self.flatten():
207 end = idx + len(token.value)
208 if idx <= offset < end:
209 return token
210 idx = end
212 def flatten(self):
213 """Generator yielding ungrouped tokens.
215 This method is recursively called for all child tokens.
216 """
217 for token in self.tokens:
218 if token.is_group:
219 yield from token.flatten()
220 else:
221 yield token
223 def get_sublists(self):
224 for token in self.tokens:
225 if token.is_group:
226 yield token
228 @property
229 def _groupable_tokens(self):
230 return self.tokens
232 def _token_matching(self, funcs, start=0, end=None, reverse=False):
233 """next token that match functions"""
234 if start is None:
235 return None
237 if not isinstance(funcs, (list, tuple)):
238 funcs = (funcs,)
240 if reverse:
241 assert end is None
242 indexes = range(start - 2, -1, -1)
243 else:
244 if end is None:
245 end = len(self.tokens)
246 indexes = range(start, end)
247 for idx in indexes:
248 token = self.tokens[idx]
249 for func in funcs:
250 if func(token):
251 return idx, token
252 return None, None
254 def token_first(self, skip_ws=True, skip_cm=False):
255 """Returns the first child token.
257 If *skip_ws* is ``True`` (the default), whitespace
258 tokens are ignored.
260 if *skip_cm* is ``True`` (default: ``False``), comments are
261 ignored too.
262 """
263 # this on is inconsistent, using Comment instead of T.Comment...
264 def matcher(tk):
265 return not ((skip_ws and tk.is_whitespace)
266 or (skip_cm and imt(tk, t=T.Comment, i=Comment)))
267 return self._token_matching(matcher)[1]
269 def token_next_by(self, i=None, m=None, t=None, idx=-1, end=None):
270 idx += 1
271 return self._token_matching(lambda tk: imt(tk, i, m, t), idx, end)
273 def token_not_matching(self, funcs, idx):
274 funcs = (funcs,) if not isinstance(funcs, (list, tuple)) else funcs
275 funcs = [lambda tk, func=func: not func(tk) for func in funcs]
276 return self._token_matching(funcs, idx)
278 def token_matching(self, funcs, idx):
279 return self._token_matching(funcs, idx)[1]
281 def token_prev(self, idx, skip_ws=True, skip_cm=False):
282 """Returns the previous token relative to *idx*.
284 If *skip_ws* is ``True`` (the default) whitespace tokens are ignored.
285 If *skip_cm* is ``True`` comments are ignored.
286 ``None`` is returned if there's no previous token.
287 """
288 return self.token_next(idx, skip_ws, skip_cm, _reverse=True)
290 # TODO: May need to re-add default value to idx
291 def token_next(self, idx, skip_ws=True, skip_cm=False, _reverse=False):
292 """Returns the next token relative to *idx*.
294 If *skip_ws* is ``True`` (the default) whitespace tokens are ignored.
295 If *skip_cm* is ``True`` comments are ignored.
296 ``None`` is returned if there's no next token.
297 """
298 if idx is None:
299 return None, None
300 idx += 1 # alot of code usage current pre-compensates for this
302 def matcher(tk):
303 return not ((skip_ws and tk.is_whitespace)
304 or (skip_cm and imt(tk, t=T.Comment, i=Comment)))
305 return self._token_matching(matcher, idx, reverse=_reverse)
307 def token_index(self, token, start=0):
308 """Return list index of token."""
309 start = start if isinstance(start, int) else self.token_index(start)
310 return start + self.tokens[start:].index(token)
312 def group_tokens(self, grp_cls, start, end, include_end=True,
313 extend=False):
314 """Replace tokens by an instance of *grp_cls*."""
315 start_idx = start
316 start = self.tokens[start_idx]
318 end_idx = end + include_end
320 # will be needed later for new group_clauses
321 # while skip_ws and tokens and tokens[-1].is_whitespace:
322 # tokens = tokens[:-1]
324 if extend and isinstance(start, grp_cls):
325 subtokens = self.tokens[start_idx + 1:end_idx]
327 grp = start
328 grp.tokens.extend(subtokens)
329 del self.tokens[start_idx + 1:end_idx]
330 grp.value += ''.join(token.value for token in subtokens)
331 else:
332 subtokens = self.tokens[start_idx:end_idx]
333 grp = grp_cls(subtokens)
334 self.tokens[start_idx:end_idx] = [grp]
335 grp.parent = self
337 for token in subtokens:
338 token.parent = grp
340 return grp
342 def insert_before(self, where, token):
343 """Inserts *token* before *where*."""
344 if not isinstance(where, int):
345 where = self.token_index(where)
346 token.parent = self
347 self.tokens.insert(where, token)
349 def insert_after(self, where, token, skip_ws=True):
350 """Inserts *token* after *where*."""
351 if not isinstance(where, int):
352 where = self.token_index(where)
353 nidx, next_ = self.token_next(where, skip_ws=skip_ws)
354 token.parent = self
355 if next_ is None:
356 self.tokens.append(token)
357 else:
358 self.tokens.insert(nidx, token)
360 def has_alias(self):
361 """Returns ``True`` if an alias is present."""
362 return self.get_alias() is not None
364 def get_alias(self):
365 """Returns the alias for this identifier or ``None``."""
366 return None
368 def get_name(self):
369 """Returns the name of this identifier.
371 This is either it's alias or it's real name. The returned valued can
372 be considered as the name under which the object corresponding to
373 this identifier is known within the current statement.
374 """
375 return self.get_alias() or self.get_real_name()
377 def get_real_name(self):
378 """Returns the real name (object name) of this identifier."""
379 return None
381 def get_parent_name(self):
382 """Return name of the parent object if any.
384 A parent object is identified by the first occurring dot.
385 """
386 dot_idx, _ = self.token_next_by(m=(T.Punctuation, '.'))
387 _, prev_ = self.token_prev(dot_idx)
388 return remove_quotes(prev_.value) if prev_ is not None else None
390 def _get_first_name(self, idx=None, reverse=False, keywords=False,
391 real_name=False):
392 """Returns the name of the first token with a name"""
394 tokens = self.tokens[idx:] if idx else self.tokens
395 tokens = reversed(tokens) if reverse else tokens
396 types = [T.Name, T.Wildcard, T.String.Symbol]
398 if keywords:
399 types.append(T.Keyword)
401 for token in tokens:
402 if token.ttype in types:
403 return remove_quotes(token.value)
404 elif isinstance(token, (Identifier, Function)):
405 return token.get_real_name() if real_name else token.get_name()
408class Statement(TokenList):
409 """Represents a SQL statement."""
411 def get_type(self):
412 """Returns the type of a statement.
414 The returned value is a string holding an upper-cased reprint of
415 the first DML or DDL keyword. If the first token in this group
416 isn't a DML or DDL keyword "UNKNOWN" is returned.
418 Whitespaces and comments at the beginning of the statement
419 are ignored.
420 """
421 token = self.token_first(skip_cm=True)
422 if token is None:
423 # An "empty" statement that either has not tokens at all
424 # or only whitespace tokens.
425 return 'UNKNOWN'
427 elif token.ttype in (T.Keyword.DML, T.Keyword.DDL):
428 return token.normalized
430 elif token.ttype == T.Keyword.CTE:
431 # The WITH keyword should be followed by either an Identifier or
432 # an IdentifierList containing the CTE definitions; the actual
433 # DML keyword (e.g. SELECT, INSERT) will follow next.
434 tidx = self.token_index(token)
435 while tidx is not None:
436 tidx, token = self.token_next(tidx, skip_ws=True)
437 if isinstance(token, (Identifier, IdentifierList)):
438 tidx, token = self.token_next(tidx, skip_ws=True)
440 if token is not None \
441 and token.ttype == T.Keyword.DML:
442 return token.normalized
444 # Hmm, probably invalid syntax, so return unknown.
445 return 'UNKNOWN'
448class Identifier(NameAliasMixin, TokenList):
449 """Represents an identifier.
451 Identifiers may have aliases or typecasts.
452 """
454 def is_wildcard(self):
455 """Return ``True`` if this identifier contains a wildcard."""
456 _, token = self.token_next_by(t=T.Wildcard)
457 return token is not None
459 def get_typecast(self):
460 """Returns the typecast or ``None`` of this object as a string."""
461 midx, marker = self.token_next_by(m=(T.Punctuation, '::'))
462 nidx, next_ = self.token_next(midx, skip_ws=False)
463 return next_.value if next_ else None
465 def get_ordering(self):
466 """Returns the ordering or ``None`` as uppercase string."""
467 _, ordering = self.token_next_by(t=T.Keyword.Order)
468 return ordering.normalized if ordering else None
470 def get_array_indices(self):
471 """Returns an iterator of index token lists"""
473 for token in self.tokens:
474 if isinstance(token, SquareBrackets):
475 # Use [1:-1] index to discard the square brackets
476 yield token.tokens[1:-1]
479class IdentifierList(TokenList):
480 """A list of :class:`~sqlparse.sql.Identifier`\'s."""
482 def get_identifiers(self):
483 """Returns the identifiers.
485 Whitespaces and punctuations are not included in this generator.
486 """
487 for token in self.tokens:
488 if not (token.is_whitespace or token.match(T.Punctuation, ',')):
489 yield token
492class TypedLiteral(TokenList):
493 """A typed literal, such as "date '2001-09-28'" or "interval '2 hours'"."""
494 M_OPEN = [(T.Name.Builtin, None), (T.Keyword, "TIMESTAMP")]
495 M_CLOSE = T.String.Single, None
496 M_EXTEND = T.Keyword, ("DAY", "HOUR", "MINUTE", "MONTH", "SECOND", "YEAR")
499class Parenthesis(TokenList):
500 """Tokens between parenthesis."""
501 M_OPEN = T.Punctuation, '('
502 M_CLOSE = T.Punctuation, ')'
504 @property
505 def _groupable_tokens(self):
506 return self.tokens[1:-1]
509class SquareBrackets(TokenList):
510 """Tokens between square brackets"""
511 M_OPEN = T.Punctuation, '['
512 M_CLOSE = T.Punctuation, ']'
514 @property
515 def _groupable_tokens(self):
516 return self.tokens[1:-1]
519class Assignment(TokenList):
520 """An assignment like 'var := val;'"""
523class If(TokenList):
524 """An 'if' clause with possible 'else if' or 'else' parts."""
525 M_OPEN = T.Keyword, 'IF'
526 M_CLOSE = T.Keyword, 'END IF'
529class For(TokenList):
530 """A 'FOR' loop."""
531 M_OPEN = T.Keyword, ('FOR', 'FOREACH')
532 M_CLOSE = T.Keyword, 'END LOOP'
535class Comparison(TokenList):
536 """A comparison used for example in WHERE clauses."""
538 @property
539 def left(self):
540 return self.tokens[0]
542 @property
543 def right(self):
544 return self.tokens[-1]
547class Comment(TokenList):
548 """A comment."""
550 def is_multiline(self):
551 return self.tokens and self.tokens[0].ttype == T.Comment.Multiline
554class Where(TokenList):
555 """A WHERE clause."""
556 M_OPEN = T.Keyword, 'WHERE'
557 M_CLOSE = T.Keyword, (
558 'ORDER BY', 'GROUP BY', 'LIMIT', 'UNION', 'UNION ALL', 'EXCEPT',
559 'INTERSECT', 'HAVING', 'RETURNING', 'INTO')
562class Over(TokenList):
563 """An OVER clause."""
564 M_OPEN = T.Keyword, 'OVER'
567class Having(TokenList):
568 """A HAVING clause."""
569 M_OPEN = T.Keyword, 'HAVING'
570 M_CLOSE = T.Keyword, ('ORDER BY', 'LIMIT')
573class Case(TokenList):
574 """A CASE statement with one or more WHEN and possibly an ELSE part."""
575 M_OPEN = T.Keyword, 'CASE'
576 M_CLOSE = T.Keyword, 'END'
578 def get_cases(self, skip_ws=False):
579 """Returns a list of 2-tuples (condition, value).
581 If an ELSE exists condition is None.
582 """
583 CONDITION = 1
584 VALUE = 2
586 ret = []
587 mode = CONDITION
589 for token in self.tokens:
590 # Set mode from the current statement
591 if token.match(T.Keyword, 'CASE') or (skip_ws and token.ttype in T.Whitespace):
592 continue
594 elif token.match(T.Keyword, 'WHEN'):
595 ret.append(([], []))
596 mode = CONDITION
598 elif token.match(T.Keyword, 'THEN'):
599 mode = VALUE
601 elif token.match(T.Keyword, 'ELSE'):
602 ret.append((None, []))
603 mode = VALUE
605 elif token.match(T.Keyword, 'END'):
606 mode = None
608 # First condition without preceding WHEN
609 if mode and not ret:
610 ret.append(([], []))
612 # Append token depending of the current mode
613 if mode == CONDITION:
614 ret[-1][0].append(token)
616 elif mode == VALUE:
617 ret[-1][1].append(token)
619 # Return cases list
620 return ret
623class Function(NameAliasMixin, TokenList):
624 """A function or procedure call."""
626 def get_parameters(self):
627 """Return a list of parameters."""
628 parenthesis = self.token_next_by(i=Parenthesis)[1]
629 result = []
630 for token in parenthesis.tokens:
631 if isinstance(token, IdentifierList):
632 return token.get_identifiers()
633 elif imt(token, i=(Function, Identifier, TypedLiteral),
634 t=T.Literal):
635 result.append(token)
636 return result
638 def get_window(self):
639 """Return the window if it exists."""
640 over_clause = self.token_next_by(i=Over)
641 if not over_clause:
642 return None
643 return over_clause[1].tokens[-1]
646class Begin(TokenList):
647 """A BEGIN/END block."""
648 M_OPEN = T.Keyword, 'BEGIN'
649 M_CLOSE = T.Keyword, 'END'
652class Operation(TokenList):
653 """Grouping of operations"""
656class Values(TokenList):
657 """Grouping of values"""
660class Command(TokenList):
661 """Grouping of CLI commands."""