Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlparse/sql.py: 54%
336 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-04 06:18 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-04 06:18 +0000
1#
2# Copyright (C) 2009-2020 the sqlparse authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of python-sqlparse and is released under
6# the BSD License: https://opensource.org/licenses/BSD-3-Clause
8"""This module contains classes representing syntactical elements of SQL."""
10import re
12from sqlparse import tokens as T
13from sqlparse.utils import imt, remove_quotes
16class NameAliasMixin:
17 """Implements get_real_name and get_alias."""
19 def get_real_name(self):
20 """Returns the real name (object name) of this identifier."""
21 # a.b
22 dot_idx, _ = self.token_next_by(m=(T.Punctuation, '.'))
23 return self._get_first_name(dot_idx, real_name=True)
25 def get_alias(self):
26 """Returns the alias for this identifier or ``None``."""
28 # "name AS alias"
29 kw_idx, kw = self.token_next_by(m=(T.Keyword, 'AS'))
30 if kw is not None:
31 return self._get_first_name(kw_idx + 1, keywords=True)
33 # "name alias" or "complicated column expression alias"
34 _, ws = self.token_next_by(t=T.Whitespace)
35 if len(self.tokens) > 2 and ws is not None:
36 return self._get_first_name(reverse=True)
39class Token:
40 """Base class for all other classes in this module.
42 It represents a single token and has two instance attributes:
43 ``value`` is the unchanged value of the token and ``ttype`` is
44 the type of the token.
45 """
47 __slots__ = ('value', 'ttype', 'parent', 'normalized', 'is_keyword',
48 'is_group', 'is_whitespace')
50 def __init__(self, ttype, value):
51 value = str(value)
52 self.value = value
53 self.ttype = ttype
54 self.parent = None
55 self.is_group = False
56 self.is_keyword = ttype in T.Keyword
57 self.is_whitespace = self.ttype in T.Whitespace
58 self.normalized = value.upper() if self.is_keyword else value
60 def __str__(self):
61 return self.value
63 # Pending tokenlist __len__ bug fix
64 # def __len__(self):
65 # return len(self.value)
67 def __repr__(self):
68 cls = self._get_repr_name()
69 value = self._get_repr_value()
71 q = '"' if value.startswith("'") and value.endswith("'") else "'"
72 return "<{cls} {q}{value}{q} at 0x{id:2X}>".format(
73 id=id(self), **locals())
75 def _get_repr_name(self):
76 return str(self.ttype).split('.')[-1]
78 def _get_repr_value(self):
79 raw = str(self)
80 if len(raw) > 7:
81 raw = raw[:6] + '...'
82 return re.sub(r'\s+', ' ', raw)
84 def flatten(self):
85 """Resolve subgroups."""
86 yield self
88 def match(self, ttype, values, regex=False):
89 """Checks whether the token matches the given arguments.
91 *ttype* is a token type. If this token doesn't match the given token
92 type.
93 *values* is a list of possible values for this token. The values
94 are OR'ed together so if only one of the values matches ``True``
95 is returned. Except for keyword tokens the comparison is
96 case-sensitive. For convenience it's OK to pass in a single string.
97 If *regex* is ``True`` (default is ``False``) the given values are
98 treated as regular expressions.
99 """
100 type_matched = self.ttype is ttype
101 if not type_matched or values is None:
102 return type_matched
104 if isinstance(values, str):
105 values = (values,)
107 if regex:
108 # TODO: Add test for regex with is_keyboard = false
109 flag = re.IGNORECASE if self.is_keyword else 0
110 values = (re.compile(v, flag) for v in values)
112 for pattern in values:
113 if pattern.search(self.normalized):
114 return True
115 return False
117 if self.is_keyword:
118 values = (v.upper() for v in values)
120 return self.normalized in values
122 def within(self, group_cls):
123 """Returns ``True`` if this token is within *group_cls*.
125 Use this method for example to check if an identifier is within
126 a function: ``t.within(sql.Function)``.
127 """
128 parent = self.parent
129 while parent:
130 if isinstance(parent, group_cls):
131 return True
132 parent = parent.parent
133 return False
135 def is_child_of(self, other):
136 """Returns ``True`` if this token is a direct child of *other*."""
137 return self.parent == other
139 def has_ancestor(self, other):
140 """Returns ``True`` if *other* is in this tokens ancestry."""
141 parent = self.parent
142 while parent:
143 if parent == other:
144 return True
145 parent = parent.parent
146 return False
149class TokenList(Token):
150 """A group of tokens.
152 It has an additional instance attribute ``tokens`` which holds a
153 list of child-tokens.
154 """
156 __slots__ = 'tokens'
158 def __init__(self, tokens=None):
159 self.tokens = tokens or []
160 [setattr(token, 'parent', self) for token in self.tokens]
161 super().__init__(None, str(self))
162 self.is_group = True
164 def __str__(self):
165 return ''.join(token.value for token in self.flatten())
167 # weird bug
168 # def __len__(self):
169 # return len(self.tokens)
171 def __iter__(self):
172 return iter(self.tokens)
174 def __getitem__(self, item):
175 return self.tokens[item]
177 def _get_repr_name(self):
178 return type(self).__name__
180 def _pprint_tree(self, max_depth=None, depth=0, f=None, _pre=''):
181 """Pretty-print the object tree."""
182 token_count = len(self.tokens)
183 for idx, token in enumerate(self.tokens):
184 cls = token._get_repr_name()
185 value = token._get_repr_value()
187 last = idx == (token_count - 1)
188 pre = '`- ' if last else '|- '
190 q = '"' if value.startswith("'") and value.endswith("'") else "'"
191 print("{_pre}{pre}{idx} {cls} {q}{value}{q}"
192 .format(**locals()), file=f)
194 if token.is_group and (max_depth is None or depth < max_depth):
195 parent_pre = ' ' if last else '| '
196 token._pprint_tree(max_depth, depth + 1, f, _pre + parent_pre)
198 def get_token_at_offset(self, offset):
199 """Returns the token that is on position offset."""
200 idx = 0
201 for token in self.flatten():
202 end = idx + len(token.value)
203 if idx <= offset < end:
204 return token
205 idx = end
207 def flatten(self):
208 """Generator yielding ungrouped tokens.
210 This method is recursively called for all child tokens.
211 """
212 for token in self.tokens:
213 if token.is_group:
214 yield from token.flatten()
215 else:
216 yield token
218 def get_sublists(self):
219 for token in self.tokens:
220 if token.is_group:
221 yield token
223 @property
224 def _groupable_tokens(self):
225 return self.tokens
227 def _token_matching(self, funcs, start=0, end=None, reverse=False):
228 """next token that match functions"""
229 if start is None:
230 return None
232 if not isinstance(funcs, (list, tuple)):
233 funcs = (funcs,)
235 if reverse:
236 assert end is None
237 indexes = range(start - 2, -1, -1)
238 else:
239 if end is None:
240 end = len(self.tokens)
241 indexes = range(start, end)
242 for idx in indexes:
243 token = self.tokens[idx]
244 for func in funcs:
245 if func(token):
246 return idx, token
247 return None, None
249 def token_first(self, skip_ws=True, skip_cm=False):
250 """Returns the first child token.
252 If *skip_ws* is ``True`` (the default), whitespace
253 tokens are ignored.
255 if *skip_cm* is ``True`` (default: ``False``), comments are
256 ignored too.
257 """
258 # this on is inconsistent, using Comment instead of T.Comment...
259 def matcher(tk):
260 return not ((skip_ws and tk.is_whitespace)
261 or (skip_cm and imt(tk, t=T.Comment, i=Comment)))
262 return self._token_matching(matcher)[1]
264 def token_next_by(self, i=None, m=None, t=None, idx=-1, end=None):
265 idx += 1
266 return self._token_matching(lambda tk: imt(tk, i, m, t), idx, end)
268 def token_not_matching(self, funcs, idx):
269 funcs = (funcs,) if not isinstance(funcs, (list, tuple)) else funcs
270 funcs = [lambda tk: not func(tk) for func in funcs]
271 return self._token_matching(funcs, idx)
273 def token_matching(self, funcs, idx):
274 return self._token_matching(funcs, idx)[1]
276 def token_prev(self, idx, skip_ws=True, skip_cm=False):
277 """Returns the previous token relative to *idx*.
279 If *skip_ws* is ``True`` (the default) whitespace tokens are ignored.
280 If *skip_cm* is ``True`` comments are ignored.
281 ``None`` is returned if there's no previous token.
282 """
283 return self.token_next(idx, skip_ws, skip_cm, _reverse=True)
285 # TODO: May need to re-add default value to idx
286 def token_next(self, idx, skip_ws=True, skip_cm=False, _reverse=False):
287 """Returns the next token relative to *idx*.
289 If *skip_ws* is ``True`` (the default) whitespace tokens are ignored.
290 If *skip_cm* is ``True`` comments are ignored.
291 ``None`` is returned if there's no next token.
292 """
293 if idx is None:
294 return None, None
295 idx += 1 # alot of code usage current pre-compensates for this
297 def matcher(tk):
298 return not ((skip_ws and tk.is_whitespace)
299 or (skip_cm and imt(tk, t=T.Comment, i=Comment)))
300 return self._token_matching(matcher, idx, reverse=_reverse)
302 def token_index(self, token, start=0):
303 """Return list index of token."""
304 start = start if isinstance(start, int) else self.token_index(start)
305 return start + self.tokens[start:].index(token)
307 def group_tokens(self, grp_cls, start, end, include_end=True,
308 extend=False):
309 """Replace tokens by an instance of *grp_cls*."""
310 start_idx = start
311 start = self.tokens[start_idx]
313 end_idx = end + include_end
315 # will be needed later for new group_clauses
316 # while skip_ws and tokens and tokens[-1].is_whitespace:
317 # tokens = tokens[:-1]
319 if extend and isinstance(start, grp_cls):
320 subtokens = self.tokens[start_idx + 1:end_idx]
322 grp = start
323 grp.tokens.extend(subtokens)
324 del self.tokens[start_idx + 1:end_idx]
325 grp.value = str(start)
326 else:
327 subtokens = self.tokens[start_idx:end_idx]
328 grp = grp_cls(subtokens)
329 self.tokens[start_idx:end_idx] = [grp]
330 grp.parent = self
332 for token in subtokens:
333 token.parent = grp
335 return grp
337 def insert_before(self, where, token):
338 """Inserts *token* before *where*."""
339 if not isinstance(where, int):
340 where = self.token_index(where)
341 token.parent = self
342 self.tokens.insert(where, token)
344 def insert_after(self, where, token, skip_ws=True):
345 """Inserts *token* after *where*."""
346 if not isinstance(where, int):
347 where = self.token_index(where)
348 nidx, next_ = self.token_next(where, skip_ws=skip_ws)
349 token.parent = self
350 if next_ is None:
351 self.tokens.append(token)
352 else:
353 self.tokens.insert(nidx, token)
355 def has_alias(self):
356 """Returns ``True`` if an alias is present."""
357 return self.get_alias() is not None
359 def get_alias(self):
360 """Returns the alias for this identifier or ``None``."""
361 return None
363 def get_name(self):
364 """Returns the name of this identifier.
366 This is either it's alias or it's real name. The returned valued can
367 be considered as the name under which the object corresponding to
368 this identifier is known within the current statement.
369 """
370 return self.get_alias() or self.get_real_name()
372 def get_real_name(self):
373 """Returns the real name (object name) of this identifier."""
374 return None
376 def get_parent_name(self):
377 """Return name of the parent object if any.
379 A parent object is identified by the first occurring dot.
380 """
381 dot_idx, _ = self.token_next_by(m=(T.Punctuation, '.'))
382 _, prev_ = self.token_prev(dot_idx)
383 return remove_quotes(prev_.value) if prev_ is not None else None
385 def _get_first_name(self, idx=None, reverse=False, keywords=False,
386 real_name=False):
387 """Returns the name of the first token with a name"""
389 tokens = self.tokens[idx:] if idx else self.tokens
390 tokens = reversed(tokens) if reverse else tokens
391 types = [T.Name, T.Wildcard, T.String.Symbol]
393 if keywords:
394 types.append(T.Keyword)
396 for token in tokens:
397 if token.ttype in types:
398 return remove_quotes(token.value)
399 elif isinstance(token, (Identifier, Function)):
400 return token.get_real_name() if real_name else token.get_name()
403class Statement(TokenList):
404 """Represents a SQL statement."""
406 def get_type(self):
407 """Returns the type of a statement.
409 The returned value is a string holding an upper-cased reprint of
410 the first DML or DDL keyword. If the first token in this group
411 isn't a DML or DDL keyword "UNKNOWN" is returned.
413 Whitespaces and comments at the beginning of the statement
414 are ignored.
415 """
416 token = self.token_first(skip_cm=True)
417 if token is None:
418 # An "empty" statement that either has not tokens at all
419 # or only whitespace tokens.
420 return 'UNKNOWN'
422 elif token.ttype in (T.Keyword.DML, T.Keyword.DDL):
423 return token.normalized
425 elif token.ttype == T.Keyword.CTE:
426 # The WITH keyword should be followed by either an Identifier or
427 # an IdentifierList containing the CTE definitions; the actual
428 # DML keyword (e.g. SELECT, INSERT) will follow next.
429 tidx = self.token_index(token)
430 while tidx is not None:
431 tidx, token = self.token_next(tidx, skip_ws=True)
432 if isinstance(token, (Identifier, IdentifierList)):
433 tidx, token = self.token_next(tidx, skip_ws=True)
435 if token is not None \
436 and token.ttype == T.Keyword.DML:
437 return token.normalized
439 # Hmm, probably invalid syntax, so return unknown.
440 return 'UNKNOWN'
443class Identifier(NameAliasMixin, TokenList):
444 """Represents an identifier.
446 Identifiers may have aliases or typecasts.
447 """
449 def is_wildcard(self):
450 """Return ``True`` if this identifier contains a wildcard."""
451 _, token = self.token_next_by(t=T.Wildcard)
452 return token is not None
454 def get_typecast(self):
455 """Returns the typecast or ``None`` of this object as a string."""
456 midx, marker = self.token_next_by(m=(T.Punctuation, '::'))
457 nidx, next_ = self.token_next(midx, skip_ws=False)
458 return next_.value if next_ else None
460 def get_ordering(self):
461 """Returns the ordering or ``None`` as uppercase string."""
462 _, ordering = self.token_next_by(t=T.Keyword.Order)
463 return ordering.normalized if ordering else None
465 def get_array_indices(self):
466 """Returns an iterator of index token lists"""
468 for token in self.tokens:
469 if isinstance(token, SquareBrackets):
470 # Use [1:-1] index to discard the square brackets
471 yield token.tokens[1:-1]
474class IdentifierList(TokenList):
475 """A list of :class:`~sqlparse.sql.Identifier`\'s."""
477 def get_identifiers(self):
478 """Returns the identifiers.
480 Whitespaces and punctuations are not included in this generator.
481 """
482 for token in self.tokens:
483 if not (token.is_whitespace or token.match(T.Punctuation, ',')):
484 yield token
487class TypedLiteral(TokenList):
488 """A typed literal, such as "date '2001-09-28'" or "interval '2 hours'"."""
489 M_OPEN = [(T.Name.Builtin, None), (T.Keyword, "TIMESTAMP")]
490 M_CLOSE = T.String.Single, None
491 M_EXTEND = T.Keyword, ("DAY", "HOUR", "MINUTE", "MONTH", "SECOND", "YEAR")
494class Parenthesis(TokenList):
495 """Tokens between parenthesis."""
496 M_OPEN = T.Punctuation, '('
497 M_CLOSE = T.Punctuation, ')'
499 @property
500 def _groupable_tokens(self):
501 return self.tokens[1:-1]
504class SquareBrackets(TokenList):
505 """Tokens between square brackets"""
506 M_OPEN = T.Punctuation, '['
507 M_CLOSE = T.Punctuation, ']'
509 @property
510 def _groupable_tokens(self):
511 return self.tokens[1:-1]
514class Assignment(TokenList):
515 """An assignment like 'var := val;'"""
518class If(TokenList):
519 """An 'if' clause with possible 'else if' or 'else' parts."""
520 M_OPEN = T.Keyword, 'IF'
521 M_CLOSE = T.Keyword, 'END IF'
524class For(TokenList):
525 """A 'FOR' loop."""
526 M_OPEN = T.Keyword, ('FOR', 'FOREACH')
527 M_CLOSE = T.Keyword, 'END LOOP'
530class Comparison(TokenList):
531 """A comparison used for example in WHERE clauses."""
533 @property
534 def left(self):
535 return self.tokens[0]
537 @property
538 def right(self):
539 return self.tokens[-1]
542class Comment(TokenList):
543 """A comment."""
545 def is_multiline(self):
546 return self.tokens and self.tokens[0].ttype == T.Comment.Multiline
549class Where(TokenList):
550 """A WHERE clause."""
551 M_OPEN = T.Keyword, 'WHERE'
552 M_CLOSE = T.Keyword, (
553 'ORDER BY', 'GROUP BY', 'LIMIT', 'UNION', 'UNION ALL', 'EXCEPT',
554 'HAVING', 'RETURNING', 'INTO')
557class Having(TokenList):
558 """A HAVING clause."""
559 M_OPEN = T.Keyword, 'HAVING'
560 M_CLOSE = T.Keyword, ('ORDER BY', 'LIMIT')
563class Case(TokenList):
564 """A CASE statement with one or more WHEN and possibly an ELSE part."""
565 M_OPEN = T.Keyword, 'CASE'
566 M_CLOSE = T.Keyword, 'END'
568 def get_cases(self, skip_ws=False):
569 """Returns a list of 2-tuples (condition, value).
571 If an ELSE exists condition is None.
572 """
573 CONDITION = 1
574 VALUE = 2
576 ret = []
577 mode = CONDITION
579 for token in self.tokens:
580 # Set mode from the current statement
581 if token.match(T.Keyword, 'CASE'):
582 continue
584 elif skip_ws and token.ttype in T.Whitespace:
585 continue
587 elif token.match(T.Keyword, 'WHEN'):
588 ret.append(([], []))
589 mode = CONDITION
591 elif token.match(T.Keyword, 'THEN'):
592 mode = VALUE
594 elif token.match(T.Keyword, 'ELSE'):
595 ret.append((None, []))
596 mode = VALUE
598 elif token.match(T.Keyword, 'END'):
599 mode = None
601 # First condition without preceding WHEN
602 if mode and not ret:
603 ret.append(([], []))
605 # Append token depending of the current mode
606 if mode == CONDITION:
607 ret[-1][0].append(token)
609 elif mode == VALUE:
610 ret[-1][1].append(token)
612 # Return cases list
613 return ret
616class Function(NameAliasMixin, TokenList):
617 """A function or procedure call."""
619 def get_parameters(self):
620 """Return a list of parameters."""
621 parenthesis = self.tokens[-1]
622 result = []
623 for token in parenthesis.tokens:
624 if isinstance(token, IdentifierList):
625 return token.get_identifiers()
626 elif imt(token, i=(Function, Identifier, TypedLiteral),
627 t=T.Literal):
628 result.append(token)
629 return result
632class Begin(TokenList):
633 """A BEGIN/END block."""
634 M_OPEN = T.Keyword, 'BEGIN'
635 M_CLOSE = T.Keyword, 'END'
638class Operation(TokenList):
639 """Grouping of operations"""
642class Values(TokenList):
643 """Grouping of values"""
646class Command(TokenList):
647 """Grouping of CLI commands."""