1#
2# Copyright (C) 2009-2020 the sqlparse authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of python-sqlparse and is released under
6# the BSD License: https://opensource.org/licenses/BSD-3-Clause
7
8import re
9
10from sqlparse import sql, tokens as T
11from sqlparse.utils import split_unquoted_newlines
12
13
14class StripCommentsFilter:
15
16 @staticmethod
17 def _process(tlist):
18 def get_next_comment(idx=-1):
19 # TODO(andi) Comment types should be unified, see related issue38
20 return tlist.token_next_by(i=sql.Comment, t=T.Comment, idx=idx)
21
22 def _get_insert_token(token):
23 """Returns either a whitespace or the line breaks from token."""
24 # See issue484 why line breaks should be preserved.
25 # Note: The actual value for a line break is replaced by \n
26 # in SerializerUnicode which will be executed in the
27 # postprocessing state.
28 m = re.search(r'([\r\n]+) *$', token.value)
29 if m is not None:
30 return sql.Token(T.Whitespace.Newline, m.groups()[0])
31 else:
32 return sql.Token(T.Whitespace, ' ')
33
34 sql_hints = (T.Comment.Multiline.Hint, T.Comment.Single.Hint)
35 tidx, token = get_next_comment()
36 while token:
37 # skipping token remove if token is a SQL-Hint. issue262
38 is_sql_hint = False
39 if token.ttype in sql_hints:
40 is_sql_hint = True
41 elif isinstance(token, sql.Comment):
42 comment_tokens = token.tokens
43 if len(comment_tokens) > 0:
44 if comment_tokens[0].ttype in sql_hints:
45 is_sql_hint = True
46
47 if is_sql_hint:
48 # using current index as start index to search next token for
49 # preventing infinite loop in cases when token type is a
50 # "SQL-Hint" and has to be skipped
51 tidx, token = get_next_comment(idx=tidx)
52 continue
53
54 pidx, prev_ = tlist.token_prev(tidx, skip_ws=False)
55 nidx, next_ = tlist.token_next(tidx, skip_ws=False)
56 # Replace by whitespace if prev and next exist and if they're not
57 # whitespaces. This doesn't apply if prev or next is a parenthesis.
58 if (
59 prev_ is None or next_ is None
60 or prev_.is_whitespace or prev_.match(T.Punctuation, '(')
61 or next_.is_whitespace or next_.match(T.Punctuation, ')')
62 ):
63 # Insert a whitespace to ensure the following SQL produces
64 # a valid SQL (see #425).
65 if prev_ is not None and not prev_.match(T.Punctuation, '('):
66 tlist.tokens.insert(tidx, _get_insert_token(token))
67 tlist.tokens.remove(token)
68 tidx -= 1
69 else:
70 tlist.tokens[tidx] = _get_insert_token(token)
71
72 # using current index as start index to search next token for
73 # preventing infinite loop in cases when token type is a
74 # "SQL-Hint" and has to be skipped
75 tidx, token = get_next_comment(idx=tidx)
76
77 def process(self, stmt):
78 [self.process(sgroup) for sgroup in stmt.get_sublists()]
79 StripCommentsFilter._process(stmt)
80 return stmt
81
82
83class StripWhitespaceFilter:
84 def _stripws(self, tlist):
85 func_name = f'_stripws_{type(tlist).__name__}'
86 func = getattr(self, func_name.lower(), self._stripws_default)
87 func(tlist)
88
89 @staticmethod
90 def _stripws_default(tlist):
91 last_was_ws = False
92 is_first_char = True
93 for token in tlist.tokens:
94 if token.is_whitespace:
95 token.value = '' if last_was_ws or is_first_char else ' '
96 last_was_ws = token.is_whitespace
97 is_first_char = False
98
99 def _stripws_identifierlist(self, tlist):
100 # Removes newlines before commas, see issue140
101 last_nl = None
102 for token in list(tlist.tokens):
103 if last_nl and token.ttype is T.Punctuation and token.value == ',':
104 tlist.tokens.remove(last_nl)
105 last_nl = token if token.is_whitespace else None
106
107 # next_ = tlist.token_next(token, skip_ws=False)
108 # if (next_ and not next_.is_whitespace and
109 # token.ttype is T.Punctuation and token.value == ','):
110 # tlist.insert_after(token, sql.Token(T.Whitespace, ' '))
111 return self._stripws_default(tlist)
112
113 def _stripws_parenthesis(self, tlist):
114 while tlist.tokens[1].is_whitespace:
115 tlist.tokens.pop(1)
116 while tlist.tokens[-2].is_whitespace:
117 tlist.tokens.pop(-2)
118 if tlist.tokens[-2].is_group:
119 # save to remove the last whitespace
120 while tlist.tokens[-2].tokens[-1].is_whitespace:
121 tlist.tokens[-2].tokens.pop(-1)
122 self._stripws_default(tlist)
123
124 def process(self, stmt, depth=0):
125 [self.process(sgroup, depth + 1) for sgroup in stmt.get_sublists()]
126 self._stripws(stmt)
127 if depth == 0 and stmt.tokens and stmt.tokens[-1].is_whitespace:
128 stmt.tokens.pop(-1)
129 return stmt
130
131
132class SpacesAroundOperatorsFilter:
133 @staticmethod
134 def _process(tlist):
135
136 ttypes = (T.Operator, T.Comparison)
137 tidx, token = tlist.token_next_by(t=ttypes)
138 while token:
139 nidx, next_ = tlist.token_next(tidx, skip_ws=False)
140 if next_ and next_.ttype != T.Whitespace:
141 tlist.insert_after(tidx, sql.Token(T.Whitespace, ' '))
142
143 pidx, prev_ = tlist.token_prev(tidx, skip_ws=False)
144 if prev_ and prev_.ttype != T.Whitespace:
145 tlist.insert_before(tidx, sql.Token(T.Whitespace, ' '))
146 tidx += 1 # has to shift since token inserted before it
147
148 # assert tlist.token_index(token) == tidx
149 tidx, token = tlist.token_next_by(t=ttypes, idx=tidx)
150
151 def process(self, stmt):
152 [self.process(sgroup) for sgroup in stmt.get_sublists()]
153 SpacesAroundOperatorsFilter._process(stmt)
154 return stmt
155
156
157class StripTrailingSemicolonFilter:
158
159 def process(self, stmt):
160 while stmt.tokens and (stmt.tokens[-1].is_whitespace
161 or stmt.tokens[-1].value == ';'):
162 stmt.tokens.pop()
163 return stmt
164
165
166# ---------------------------
167# postprocess
168
169class SerializerUnicode:
170 @staticmethod
171 def process(stmt):
172 lines = split_unquoted_newlines(stmt)
173 return '\n'.join(line.rstrip() for line in lines)