1#
2# Copyright (C) 2009-2020 the sqlparse authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of python-sqlparse and is released under
6# the BSD License: https://opensource.org/licenses/BSD-3-Clause
7
8import re
9
10from sqlparse import sql
11from sqlparse import tokens as T
12from sqlparse.utils import split_unquoted_newlines
13
14
15class StripCommentsFilter:
16
17 @staticmethod
18 def _process(tlist):
19 def get_next_comment(idx=-1):
20 # TODO(andi) Comment types should be unified, see related issue38
21 return tlist.token_next_by(i=sql.Comment, t=T.Comment, idx=idx)
22
23 def _get_insert_token(token):
24 """Returns either a whitespace or the line breaks from token."""
25 # See issue484 why line breaks should be preserved.
26 # Note: The actual value for a line break is replaced by \n
27 # in SerializerUnicode which will be executed in the
28 # postprocessing state.
29 m = re.search(r'([\r\n]+) *$', token.value)
30 if m is not None:
31 return sql.Token(T.Whitespace.Newline, m.groups()[0])
32 else:
33 return sql.Token(T.Whitespace, ' ')
34
35 sql_hints = (T.Comment.Multiline.Hint, T.Comment.Single.Hint)
36 tidx, token = get_next_comment()
37 while token:
38 # skipping token remove if token is a SQL-Hint. issue262
39 is_sql_hint = False
40 if token.ttype in sql_hints:
41 is_sql_hint = True
42 elif isinstance(token, sql.Comment):
43 comment_tokens = token.tokens
44 if len(comment_tokens) > 0:
45 if comment_tokens[0].ttype in sql_hints:
46 is_sql_hint = True
47
48 if is_sql_hint:
49 # using current index as start index to search next token for
50 # preventing infinite loop in cases when token type is a
51 # "SQL-Hint" and has to be skipped
52 tidx, token = get_next_comment(idx=tidx)
53 continue
54
55 pidx, prev_ = tlist.token_prev(tidx, skip_ws=False)
56 nidx, next_ = tlist.token_next(tidx, skip_ws=False)
57 # Replace by whitespace if prev and next exist and if they're not
58 # whitespaces. This doesn't apply if prev or next is a parenthesis.
59 if (
60 prev_ is None or next_ is None
61 or prev_.is_whitespace or prev_.match(T.Punctuation, '(')
62 or next_.is_whitespace or next_.match(T.Punctuation, ')')
63 ):
64 # Insert a whitespace to ensure the following SQL produces
65 # a valid SQL (see #425).
66 if prev_ is not None and not prev_.match(T.Punctuation, '('):
67 tlist.tokens.insert(tidx, _get_insert_token(token))
68 tlist.tokens.remove(token)
69 tidx -= 1
70 else:
71 tlist.tokens[tidx] = _get_insert_token(token)
72
73 # using current index as start index to search next token for
74 # preventing infinite loop in cases when token type is a
75 # "SQL-Hint" and has to be skipped
76 tidx, token = get_next_comment(idx=tidx)
77
78 def process(self, stmt):
79 [self.process(sgroup) for sgroup in stmt.get_sublists()]
80 StripCommentsFilter._process(stmt)
81 return stmt
82
83
84class StripWhitespaceFilter:
85 def _stripws(self, tlist):
86 func_name = f'_stripws_{type(tlist).__name__}'
87 func = getattr(self, func_name.lower(), self._stripws_default)
88 func(tlist)
89
90 @staticmethod
91 def _stripws_default(tlist):
92 last_was_ws = False
93 is_first_char = True
94 for token in tlist.tokens:
95 if token.is_whitespace:
96 token.value = '' if last_was_ws or is_first_char else ' '
97 last_was_ws = token.is_whitespace
98 is_first_char = False
99
100 def _stripws_identifierlist(self, tlist):
101 # Removes newlines before commas, see issue140
102 last_nl = None
103 for token in list(tlist.tokens):
104 if last_nl and token.ttype is T.Punctuation and token.value == ',':
105 tlist.tokens.remove(last_nl)
106 last_nl = token if token.is_whitespace else None
107
108 # next_ = tlist.token_next(token, skip_ws=False)
109 # if (next_ and not next_.is_whitespace and
110 # token.ttype is T.Punctuation and token.value == ','):
111 # tlist.insert_after(token, sql.Token(T.Whitespace, ' '))
112 return self._stripws_default(tlist)
113
114 def _stripws_parenthesis(self, tlist):
115 while tlist.tokens[1].is_whitespace:
116 tlist.tokens.pop(1)
117 while tlist.tokens[-2].is_whitespace:
118 tlist.tokens.pop(-2)
119 if tlist.tokens[-2].is_group:
120 # save to remove the last whitespace
121 while tlist.tokens[-2].tokens[-1].is_whitespace:
122 tlist.tokens[-2].tokens.pop(-1)
123 self._stripws_default(tlist)
124
125 def process(self, stmt, depth=0):
126 [self.process(sgroup, depth + 1) for sgroup in stmt.get_sublists()]
127 self._stripws(stmt)
128 if depth == 0 and stmt.tokens and stmt.tokens[-1].is_whitespace:
129 stmt.tokens.pop(-1)
130 return stmt
131
132
133class SpacesAroundOperatorsFilter:
134 @staticmethod
135 def _process(tlist):
136
137 ttypes = (T.Operator, T.Comparison)
138 tidx, token = tlist.token_next_by(t=ttypes)
139 while token:
140 nidx, next_ = tlist.token_next(tidx, skip_ws=False)
141 if next_ and next_.ttype != T.Whitespace:
142 tlist.insert_after(tidx, sql.Token(T.Whitespace, ' '))
143
144 pidx, prev_ = tlist.token_prev(tidx, skip_ws=False)
145 if prev_ and prev_.ttype != T.Whitespace:
146 tlist.insert_before(tidx, sql.Token(T.Whitespace, ' '))
147 tidx += 1 # has to shift since token inserted before it
148
149 # assert tlist.token_index(token) == tidx
150 tidx, token = tlist.token_next_by(t=ttypes, idx=tidx)
151
152 def process(self, stmt):
153 [self.process(sgroup) for sgroup in stmt.get_sublists()]
154 SpacesAroundOperatorsFilter._process(stmt)
155 return stmt
156
157
158class StripTrailingSemicolonFilter:
159
160 def process(self, stmt):
161 while stmt.tokens and (stmt.tokens[-1].is_whitespace
162 or stmt.tokens[-1].value == ';'):
163 stmt.tokens.pop()
164 return stmt
165
166
167# ---------------------------
168# postprocess
169
170class SerializerUnicode:
171 @staticmethod
172 def process(stmt):
173 lines = split_unquoted_newlines(stmt)
174 return '\n'.join(line.rstrip() for line in lines)