Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlparse/sql.py: 36%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Copyright (C) 2009-2020 the sqlparse authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of python-sqlparse and is released under
6# the BSD License: https://opensource.org/licenses/BSD-3-Clause
8"""This module contains classes representing syntactical elements of SQL."""
10import re
12from sqlparse import tokens as T
13from sqlparse.utils import imt, remove_quotes
16class NameAliasMixin:
17 """Implements get_real_name and get_alias."""
19 def get_real_name(self):
20 """Returns the real name (object name) of this identifier."""
21 # a.b
22 dot_idx, _ = self.token_next_by(m=(T.Punctuation, '.'))
23 return self._get_first_name(dot_idx, real_name=True)
25 def get_alias(self):
26 """Returns the alias for this identifier or ``None``."""
28 # "name AS alias"
29 kw_idx, kw = self.token_next_by(m=(T.Keyword, 'AS'))
30 if kw is not None:
31 return self._get_first_name(kw_idx + 1, keywords=True)
33 # "name alias" or "complicated column expression alias"
34 _, ws = self.token_next_by(t=T.Whitespace)
35 if len(self.tokens) > 2 and ws is not None:
36 return self._get_first_name(reverse=True)
39class Token:
40 """Base class for all other classes in this module.
42 It represents a single token and has two instance attributes:
43 ``value`` is the unchanged value of the token and ``ttype`` is
44 the type of the token.
45 """
47 __slots__ = ('value', 'ttype', 'parent', 'normalized', 'is_keyword',
48 'is_group', 'is_whitespace', 'is_newline')
50 def __init__(self, ttype, value):
51 value = str(value)
52 self.value = value
53 self.ttype = ttype
54 self.parent = None
55 self.is_group = False
56 self.is_keyword = ttype in T.Keyword
57 self.is_whitespace = self.ttype in T.Whitespace
58 self.is_newline = self.ttype in T.Newline
59 self.normalized = value.upper() if self.is_keyword else value
61 def __str__(self):
62 return self.value
64 # Pending tokenlist __len__ bug fix
65 # def __len__(self):
66 # return len(self.value)
68 def __repr__(self):
69 cls = self._get_repr_name()
70 value = self._get_repr_value()
72 q = '"' if value.startswith("'") and value.endswith("'") else "'"
73 return "<{cls} {q}{value}{q} at 0x{id:2X}>".format(
74 id=id(self), **locals())
76 def _get_repr_name(self):
77 return str(self.ttype).split('.')[-1]
79 def _get_repr_value(self):
80 raw = str(self)
81 if len(raw) > 7:
82 raw = raw[:6] + '...'
83 return re.sub(r'\s+', ' ', raw)
85 def flatten(self):
86 """Resolve subgroups."""
87 yield self
89 def match(self, ttype, values, regex=False):
90 """Checks whether the token matches the given arguments.
92 *ttype* is a token type. If this token doesn't match the given token
93 type.
94 *values* is a list of possible values for this token. The values
95 are OR'ed together so if only one of the values matches ``True``
96 is returned. Except for keyword tokens the comparison is
97 case-sensitive. For convenience it's OK to pass in a single string.
98 If *regex* is ``True`` (default is ``False``) the given values are
99 treated as regular expressions.
100 """
101 type_matched = self.ttype is ttype
102 if not type_matched or values is None:
103 return type_matched
105 if isinstance(values, str):
106 values = (values,)
108 if regex:
109 # TODO: Add test for regex with is_keyboard = false
110 flag = re.IGNORECASE if self.is_keyword else 0
111 values = (re.compile(v, flag) for v in values)
113 for pattern in values:
114 if pattern.search(self.normalized):
115 return True
116 return False
118 if self.is_keyword:
119 values = (v.upper() for v in values)
121 return self.normalized in values
123 def within(self, group_cls):
124 """Returns ``True`` if this token is within *group_cls*.
126 Use this method for example to check if an identifier is within
127 a function: ``t.within(sql.Function)``.
128 """
129 parent = self.parent
130 while parent:
131 if isinstance(parent, group_cls):
132 return True
133 parent = parent.parent
134 return False
136 def is_child_of(self, other):
137 """Returns ``True`` if this token is a direct child of *other*."""
138 return self.parent == other
140 def has_ancestor(self, other):
141 """Returns ``True`` if *other* is in this tokens ancestry."""
142 parent = self.parent
143 while parent:
144 if parent == other:
145 return True
146 parent = parent.parent
147 return False
150class TokenList(Token):
151 """A group of tokens.
153 It has an additional instance attribute ``tokens`` which holds a
154 list of child-tokens.
155 """
157 __slots__ = 'tokens'
159 def __init__(self, tokens=None):
160 self.tokens = tokens or []
161 [setattr(token, 'parent', self) for token in self.tokens]
162 super().__init__(None, str(self))
163 self.is_group = True
165 def __str__(self):
166 return ''.join(token.value for token in self.flatten())
168 # weird bug
169 # def __len__(self):
170 # return len(self.tokens)
172 def __iter__(self):
173 return iter(self.tokens)
175 def __getitem__(self, item):
176 return self.tokens[item]
178 def _get_repr_name(self):
179 return type(self).__name__
181 def _pprint_tree(self, max_depth=None, depth=0, f=None, _pre=''):
182 """Pretty-print the object tree."""
183 token_count = len(self.tokens)
184 for idx, token in enumerate(self.tokens):
185 cls = token._get_repr_name()
186 value = token._get_repr_value()
188 last = idx == (token_count - 1)
189 pre = '`- ' if last else '|- '
191 q = '"' if value.startswith("'") and value.endswith("'") else "'"
192 print(f"{_pre}{pre}{idx} {cls} {q}{value}{q}", file=f)
194 if token.is_group and (max_depth is None or depth < max_depth):
195 parent_pre = ' ' if last else '| '
196 token._pprint_tree(max_depth, depth + 1, f, _pre + parent_pre)
198 def get_token_at_offset(self, offset):
199 """Returns the token that is on position offset."""
200 idx = 0
201 for token in self.flatten():
202 end = idx + len(token.value)
203 if idx <= offset < end:
204 return token
205 idx = end
207 def flatten(self):
208 """Generator yielding ungrouped tokens.
210 This method is recursively called for all child tokens.
211 """
212 for token in self.tokens:
213 if token.is_group:
214 yield from token.flatten()
215 else:
216 yield token
218 def get_sublists(self):
219 for token in self.tokens:
220 if token.is_group:
221 yield token
223 @property
224 def _groupable_tokens(self):
225 return self.tokens
227 def _token_matching(self, funcs, start=0, end=None, reverse=False):
228 """next token that match functions"""
229 if start is None:
230 return None
232 if not isinstance(funcs, (list, tuple)):
233 funcs = (funcs,)
235 if reverse:
236 assert end is None
237 indexes = range(start - 2, -1, -1)
238 else:
239 if end is None:
240 end = len(self.tokens)
241 indexes = range(start, end)
242 for idx in indexes:
243 token = self.tokens[idx]
244 for func in funcs:
245 if func(token):
246 return idx, token
247 return None, None
249 def token_first(self, skip_ws=True, skip_cm=False):
250 """Returns the first child token.
252 If *skip_ws* is ``True`` (the default), whitespace
253 tokens are ignored.
255 if *skip_cm* is ``True`` (default: ``False``), comments are
256 ignored too.
257 """
258 # this on is inconsistent, using Comment instead of T.Comment...
259 def matcher(tk):
260 return not ((skip_ws and tk.is_whitespace)
261 or (skip_cm and imt(tk, t=T.Comment, i=Comment)))
262 return self._token_matching(matcher)[1]
264 def token_next_by(self, i=None, m=None, t=None, idx=-1, end=None):
265 idx += 1
266 return self._token_matching(lambda tk: imt(tk, i, m, t), idx, end)
268 def token_not_matching(self, funcs, idx):
269 funcs = (funcs,) if not isinstance(funcs, (list, tuple)) else funcs
270 funcs = [lambda tk: not func(tk) for func in funcs]
271 return self._token_matching(funcs, idx)
273 def token_matching(self, funcs, idx):
274 return self._token_matching(funcs, idx)[1]
276 def token_prev(self, idx, skip_ws=True, skip_cm=False):
277 """Returns the previous token relative to *idx*.
279 If *skip_ws* is ``True`` (the default) whitespace tokens are ignored.
280 If *skip_cm* is ``True`` comments are ignored.
281 ``None`` is returned if there's no previous token.
282 """
283 return self.token_next(idx, skip_ws, skip_cm, _reverse=True)
285 # TODO: May need to re-add default value to idx
286 def token_next(self, idx, skip_ws=True, skip_cm=False, _reverse=False):
287 """Returns the next token relative to *idx*.
289 If *skip_ws* is ``True`` (the default) whitespace tokens are ignored.
290 If *skip_cm* is ``True`` comments are ignored.
291 ``None`` is returned if there's no next token.
292 """
293 if idx is None:
294 return None, None
295 idx += 1 # alot of code usage current pre-compensates for this
297 def matcher(tk):
298 return not ((skip_ws and tk.is_whitespace)
299 or (skip_cm and imt(tk, t=T.Comment, i=Comment)))
300 return self._token_matching(matcher, idx, reverse=_reverse)
302 def token_index(self, token, start=0):
303 """Return list index of token."""
304 start = start if isinstance(start, int) else self.token_index(start)
305 return start + self.tokens[start:].index(token)
307 def group_tokens(self, grp_cls, start, end, include_end=True,
308 extend=False):
309 """Replace tokens by an instance of *grp_cls*."""
310 start_idx = start
311 start = self.tokens[start_idx]
313 end_idx = end + include_end
315 # will be needed later for new group_clauses
316 # while skip_ws and tokens and tokens[-1].is_whitespace:
317 # tokens = tokens[:-1]
319 if extend and isinstance(start, grp_cls):
320 subtokens = self.tokens[start_idx + 1:end_idx]
322 grp = start
323 grp.tokens.extend(subtokens)
324 del self.tokens[start_idx + 1:end_idx]
325 grp.value = str(start)
326 else:
327 subtokens = self.tokens[start_idx:end_idx]
328 grp = grp_cls(subtokens)
329 self.tokens[start_idx:end_idx] = [grp]
330 grp.parent = self
332 for token in subtokens:
333 token.parent = grp
335 return grp
337 def insert_before(self, where, token):
338 """Inserts *token* before *where*."""
339 if not isinstance(where, int):
340 where = self.token_index(where)
341 token.parent = self
342 self.tokens.insert(where, token)
344 def insert_after(self, where, token, skip_ws=True):
345 """Inserts *token* after *where*."""
346 if not isinstance(where, int):
347 where = self.token_index(where)
348 nidx, next_ = self.token_next(where, skip_ws=skip_ws)
349 token.parent = self
350 if next_ is None:
351 self.tokens.append(token)
352 else:
353 self.tokens.insert(nidx, token)
355 def has_alias(self):
356 """Returns ``True`` if an alias is present."""
357 return self.get_alias() is not None
359 def get_alias(self):
360 """Returns the alias for this identifier or ``None``."""
361 return None
363 def get_name(self):
364 """Returns the name of this identifier.
366 This is either it's alias or it's real name. The returned valued can
367 be considered as the name under which the object corresponding to
368 this identifier is known within the current statement.
369 """
370 return self.get_alias() or self.get_real_name()
372 def get_real_name(self):
373 """Returns the real name (object name) of this identifier."""
374 return None
376 def get_parent_name(self):
377 """Return name of the parent object if any.
379 A parent object is identified by the first occurring dot.
380 """
381 dot_idx, _ = self.token_next_by(m=(T.Punctuation, '.'))
382 _, prev_ = self.token_prev(dot_idx)
383 return remove_quotes(prev_.value) if prev_ is not None else None
385 def _get_first_name(self, idx=None, reverse=False, keywords=False,
386 real_name=False):
387 """Returns the name of the first token with a name"""
389 tokens = self.tokens[idx:] if idx else self.tokens
390 tokens = reversed(tokens) if reverse else tokens
391 types = [T.Name, T.Wildcard, T.String.Symbol]
393 if keywords:
394 types.append(T.Keyword)
396 for token in tokens:
397 if token.ttype in types:
398 return remove_quotes(token.value)
399 elif isinstance(token, (Identifier, Function)):
400 return token.get_real_name() if real_name else token.get_name()
403class Statement(TokenList):
404 """Represents a SQL statement."""
406 def get_type(self):
407 """Returns the type of a statement.
409 The returned value is a string holding an upper-cased reprint of
410 the first DML or DDL keyword. If the first token in this group
411 isn't a DML or DDL keyword "UNKNOWN" is returned.
413 Whitespaces and comments at the beginning of the statement
414 are ignored.
415 """
416 token = self.token_first(skip_cm=True)
417 if token is None:
418 # An "empty" statement that either has not tokens at all
419 # or only whitespace tokens.
420 return 'UNKNOWN'
422 elif token.ttype in (T.Keyword.DML, T.Keyword.DDL):
423 return token.normalized
425 elif token.ttype == T.Keyword.CTE:
426 # The WITH keyword should be followed by either an Identifier or
427 # an IdentifierList containing the CTE definitions; the actual
428 # DML keyword (e.g. SELECT, INSERT) will follow next.
429 tidx = self.token_index(token)
430 while tidx is not None:
431 tidx, token = self.token_next(tidx, skip_ws=True)
432 if isinstance(token, (Identifier, IdentifierList)):
433 tidx, token = self.token_next(tidx, skip_ws=True)
435 if token is not None \
436 and token.ttype == T.Keyword.DML:
437 return token.normalized
439 # Hmm, probably invalid syntax, so return unknown.
440 return 'UNKNOWN'
443class Identifier(NameAliasMixin, TokenList):
444 """Represents an identifier.
446 Identifiers may have aliases or typecasts.
447 """
449 def is_wildcard(self):
450 """Return ``True`` if this identifier contains a wildcard."""
451 _, token = self.token_next_by(t=T.Wildcard)
452 return token is not None
454 def get_typecast(self):
455 """Returns the typecast or ``None`` of this object as a string."""
456 midx, marker = self.token_next_by(m=(T.Punctuation, '::'))
457 nidx, next_ = self.token_next(midx, skip_ws=False)
458 return next_.value if next_ else None
460 def get_ordering(self):
461 """Returns the ordering or ``None`` as uppercase string."""
462 _, ordering = self.token_next_by(t=T.Keyword.Order)
463 return ordering.normalized if ordering else None
465 def get_array_indices(self):
466 """Returns an iterator of index token lists"""
468 for token in self.tokens:
469 if isinstance(token, SquareBrackets):
470 # Use [1:-1] index to discard the square brackets
471 yield token.tokens[1:-1]
474class IdentifierList(TokenList):
475 """A list of :class:`~sqlparse.sql.Identifier`\'s."""
477 def get_identifiers(self):
478 """Returns the identifiers.
480 Whitespaces and punctuations are not included in this generator.
481 """
482 for token in self.tokens:
483 if not (token.is_whitespace or token.match(T.Punctuation, ',')):
484 yield token
487class TypedLiteral(TokenList):
488 """A typed literal, such as "date '2001-09-28'" or "interval '2 hours'"."""
489 M_OPEN = [(T.Name.Builtin, None), (T.Keyword, "TIMESTAMP")]
490 M_CLOSE = T.String.Single, None
491 M_EXTEND = T.Keyword, ("DAY", "HOUR", "MINUTE", "MONTH", "SECOND", "YEAR")
494class Parenthesis(TokenList):
495 """Tokens between parenthesis."""
496 M_OPEN = T.Punctuation, '('
497 M_CLOSE = T.Punctuation, ')'
499 @property
500 def _groupable_tokens(self):
501 return self.tokens[1:-1]
504class SquareBrackets(TokenList):
505 """Tokens between square brackets"""
506 M_OPEN = T.Punctuation, '['
507 M_CLOSE = T.Punctuation, ']'
509 @property
510 def _groupable_tokens(self):
511 return self.tokens[1:-1]
514class Assignment(TokenList):
515 """An assignment like 'var := val;'"""
518class If(TokenList):
519 """An 'if' clause with possible 'else if' or 'else' parts."""
520 M_OPEN = T.Keyword, 'IF'
521 M_CLOSE = T.Keyword, 'END IF'
524class For(TokenList):
525 """A 'FOR' loop."""
526 M_OPEN = T.Keyword, ('FOR', 'FOREACH')
527 M_CLOSE = T.Keyword, 'END LOOP'
530class Comparison(TokenList):
531 """A comparison used for example in WHERE clauses."""
533 @property
534 def left(self):
535 return self.tokens[0]
537 @property
538 def right(self):
539 return self.tokens[-1]
542class Comment(TokenList):
543 """A comment."""
545 def is_multiline(self):
546 return self.tokens and self.tokens[0].ttype == T.Comment.Multiline
549class Where(TokenList):
550 """A WHERE clause."""
551 M_OPEN = T.Keyword, 'WHERE'
552 M_CLOSE = T.Keyword, (
553 'ORDER BY', 'GROUP BY', 'LIMIT', 'UNION', 'UNION ALL', 'EXCEPT',
554 'HAVING', 'RETURNING', 'INTO')
557class Over(TokenList):
558 """An OVER clause."""
559 M_OPEN = T.Keyword, 'OVER'
562class Having(TokenList):
563 """A HAVING clause."""
564 M_OPEN = T.Keyword, 'HAVING'
565 M_CLOSE = T.Keyword, ('ORDER BY', 'LIMIT')
568class Case(TokenList):
569 """A CASE statement with one or more WHEN and possibly an ELSE part."""
570 M_OPEN = T.Keyword, 'CASE'
571 M_CLOSE = T.Keyword, 'END'
573 def get_cases(self, skip_ws=False):
574 """Returns a list of 2-tuples (condition, value).
576 If an ELSE exists condition is None.
577 """
578 CONDITION = 1
579 VALUE = 2
581 ret = []
582 mode = CONDITION
584 for token in self.tokens:
585 # Set mode from the current statement
586 if token.match(T.Keyword, 'CASE'):
587 continue
589 elif skip_ws and token.ttype in T.Whitespace:
590 continue
592 elif token.match(T.Keyword, 'WHEN'):
593 ret.append(([], []))
594 mode = CONDITION
596 elif token.match(T.Keyword, 'THEN'):
597 mode = VALUE
599 elif token.match(T.Keyword, 'ELSE'):
600 ret.append((None, []))
601 mode = VALUE
603 elif token.match(T.Keyword, 'END'):
604 mode = None
606 # First condition without preceding WHEN
607 if mode and not ret:
608 ret.append(([], []))
610 # Append token depending of the current mode
611 if mode == CONDITION:
612 ret[-1][0].append(token)
614 elif mode == VALUE:
615 ret[-1][1].append(token)
617 # Return cases list
618 return ret
621class Function(NameAliasMixin, TokenList):
622 """A function or procedure call."""
624 def get_parameters(self):
625 """Return a list of parameters."""
626 parenthesis = self.token_next_by(i=Parenthesis)[1]
627 result = []
628 for token in parenthesis.tokens:
629 if isinstance(token, IdentifierList):
630 return token.get_identifiers()
631 elif imt(token, i=(Function, Identifier, TypedLiteral),
632 t=T.Literal):
633 result.append(token)
634 return result
636 def get_window(self):
637 """Return the window if it exists."""
638 over_clause = self.token_next_by(i=Over)
639 if not over_clause:
640 return None
641 return over_clause[1].tokens[-1]
644class Begin(TokenList):
645 """A BEGIN/END block."""
646 M_OPEN = T.Keyword, 'BEGIN'
647 M_CLOSE = T.Keyword, 'END'
650class Operation(TokenList):
651 """Grouping of operations"""
654class Values(TokenList):
655 """Grouping of values"""
658class Command(TokenList):
659 """Grouping of CLI commands."""