Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/ansible_core-2.17.0.dev0-py3.8.egg/ansible/parsing/splitter.py: 76%
125 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-30 06:38 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-30 06:38 +0000
1# (c) 2014 James Cammarata, <jcammarata@ansible.com>
2#
3# This file is part of Ansible
4#
5# Ansible is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# (at your option) any later version.
9#
10# Ansible is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
18from __future__ import annotations
20import codecs
21import re
23from ansible.errors import AnsibleParserError
24from ansible.module_utils.common.text.converters import to_text
25from ansible.parsing.quoting import unquote
28# Decode escapes adapted from rspeer's answer here:
29# http://stackoverflow.com/questions/4020539/process-escape-sequences-in-a-string-in-python
30_HEXCHAR = '[a-fA-F0-9]'
31_ESCAPE_SEQUENCE_RE = re.compile(r'''
32 ( \\U{0} # 8-digit hex escapes
33 | \\u{1} # 4-digit hex escapes
34 | \\x{2} # 2-digit hex escapes
35 | \\N\{{[^}}]+\}} # Unicode characters by name
36 | \\[\\'"abfnrtv] # Single-character escapes
37 )'''.format(_HEXCHAR * 8, _HEXCHAR * 4, _HEXCHAR * 2), re.UNICODE | re.VERBOSE)
40def _decode_escapes(s):
41 def decode_match(match):
42 return codecs.decode(match.group(0), 'unicode-escape')
44 return _ESCAPE_SEQUENCE_RE.sub(decode_match, s)
47def parse_kv(args, check_raw=False):
48 '''
49 Convert a string of key/value items to a dict. If any free-form params
50 are found and the check_raw option is set to True, they will be added
51 to a new parameter called '_raw_params'. If check_raw is not enabled,
52 they will simply be ignored.
53 '''
55 args = to_text(args, nonstring='passthru')
57 options = {}
58 if args is not None:
59 vargs = split_args(args)
61 raw_params = []
62 for orig_x in vargs:
63 x = _decode_escapes(orig_x)
64 if "=" in x:
65 pos = 0
66 try:
67 while True:
68 pos = x.index('=', pos + 1)
69 if pos > 0 and x[pos - 1] != '\\':
70 break
71 except ValueError:
72 # ran out of string, but we must have some escaped equals,
73 # so replace those and append this to the list of raw params
74 raw_params.append(x.replace('\\=', '='))
75 continue
77 k = x[:pos]
78 v = x[pos + 1:]
80 # FIXME: make the retrieval of this list of shell/command options a function, so the list is centralized
81 if check_raw and k not in ('creates', 'removes', 'chdir', 'executable', 'warn', 'stdin', 'stdin_add_newline', 'strip_empty_ends'):
82 raw_params.append(orig_x)
83 else:
84 options[k.strip()] = unquote(v.strip())
85 else:
86 raw_params.append(orig_x)
88 # recombine the free-form params, if any were found, and assign
89 # them to a special option for use later by the shell/command module
90 if len(raw_params) > 0:
91 options[u'_raw_params'] = join_args(raw_params)
93 return options
96def _get_quote_state(token, quote_char):
97 '''
98 the goal of this block is to determine if the quoted string
99 is unterminated in which case it needs to be put back together
100 '''
101 # the char before the current one, used to see if
102 # the current character is escaped
103 prev_char = None
104 for idx, cur_char in enumerate(token):
105 if idx > 0:
106 prev_char = token[idx - 1]
107 if cur_char in '"\'' and prev_char != '\\':
108 if quote_char:
109 if cur_char == quote_char:
110 quote_char = None
111 else:
112 quote_char = cur_char
113 return quote_char
116def _count_jinja2_blocks(token, cur_depth, open_token, close_token):
117 '''
118 this function counts the number of opening/closing blocks for a
119 given opening/closing type and adjusts the current depth for that
120 block based on the difference
121 '''
122 num_open = token.count(open_token)
123 num_close = token.count(close_token)
124 if num_open != num_close:
125 cur_depth += (num_open - num_close)
126 if cur_depth < 0:
127 cur_depth = 0
128 return cur_depth
131def join_args(s):
132 '''
133 Join the original cmd based on manipulations by split_args().
134 This retains the original newlines and whitespaces.
135 '''
136 result = ''
137 for p in s:
138 if len(result) == 0 or result.endswith('\n'):
139 result += p
140 else:
141 result += ' ' + p
142 return result
145def split_args(args):
146 '''
147 Splits args on whitespace, but intelligently reassembles
148 those that may have been split over a jinja2 block or quotes.
150 When used in a remote module, we won't ever have to be concerned about
151 jinja2 blocks, however this function is/will be used in the
152 core portions as well before the args are templated.
154 example input: a=b c="foo bar"
155 example output: ['a=b', 'c="foo bar"']
157 Basically this is a variation shlex that has some more intelligence for
158 how Ansible needs to use it.
159 '''
161 if not args:
162 return []
164 # the list of params parsed out of the arg string
165 # this is going to be the result value when we are done
166 params = []
168 # Initial split on newlines
169 items = args.split('\n')
171 # iterate over the tokens, and reassemble any that may have been
172 # split on a space inside a jinja2 block.
173 # ex if tokens are "{{", "foo", "}}" these go together
175 # These variables are used
176 # to keep track of the state of the parsing, since blocks and quotes
177 # may be nested within each other.
179 quote_char = None
180 inside_quotes = False
181 print_depth = 0 # used to count nested jinja2 {{ }} blocks
182 block_depth = 0 # used to count nested jinja2 {% %} blocks
183 comment_depth = 0 # used to count nested jinja2 {# #} blocks
185 # now we loop over each split chunk, coalescing tokens if the white space
186 # split occurred within quotes or a jinja2 block of some kind
187 for (itemidx, item) in enumerate(items):
189 # we split on spaces and newlines separately, so that we
190 # can tell which character we split on for reassembly
191 # inside quotation characters
192 tokens = item.split(' ')
194 line_continuation = False
195 for (idx, token) in enumerate(tokens):
197 # Empty entries means we have subsequent spaces
198 # We want to hold onto them so we can reconstruct them later
199 if len(token) == 0 and idx != 0:
200 # Make sure there is a params item to store result in.
201 if not params:
202 params.append('')
204 params[-1] += ' '
205 continue
207 # if we hit a line continuation character, but
208 # we're not inside quotes, ignore it and continue
209 # on to the next token while setting a flag
210 if token == '\\' and not inside_quotes:
211 line_continuation = True
212 continue
214 # store the previous quoting state for checking later
215 was_inside_quotes = inside_quotes
216 quote_char = _get_quote_state(token, quote_char)
217 inside_quotes = quote_char is not None
219 # multiple conditions may append a token to the list of params,
220 # so we keep track with this flag to make sure it only happens once
221 # append means add to the end of the list, don't append means concatenate
222 # it to the end of the last token
223 appended = False
225 # if we're inside quotes now, but weren't before, append the token
226 # to the end of the list, since we'll tack on more to it later
227 # otherwise, if we're inside any jinja2 block, inside quotes, or we were
228 # inside quotes (but aren't now) concat this token to the last param
229 if inside_quotes and not was_inside_quotes and not (print_depth or block_depth or comment_depth):
230 params.append(token)
231 appended = True
232 elif print_depth or block_depth or comment_depth or inside_quotes or was_inside_quotes:
233 if idx == 0 and was_inside_quotes:
234 params[-1] = "%s%s" % (params[-1], token)
235 else:
236 spacer = ''
237 if idx > 0:
238 spacer = ' '
239 params[-1] = "%s%s%s" % (params[-1], spacer, token)
240 appended = True
242 # if the number of paired block tags is not the same, the depth has changed, so we calculate that here
243 # and may append the current token to the params (if we haven't previously done so)
244 prev_print_depth = print_depth
245 print_depth = _count_jinja2_blocks(token, print_depth, "{{", "}}")
246 if print_depth != prev_print_depth and not appended:
247 params.append(token)
248 appended = True
250 prev_block_depth = block_depth
251 block_depth = _count_jinja2_blocks(token, block_depth, "{%", "%}")
252 if block_depth != prev_block_depth and not appended:
253 params.append(token)
254 appended = True
256 prev_comment_depth = comment_depth
257 comment_depth = _count_jinja2_blocks(token, comment_depth, "{#", "#}")
258 if comment_depth != prev_comment_depth and not appended:
259 params.append(token)
260 appended = True
262 # finally, if we're at zero depth for all blocks and not inside quotes, and have not
263 # yet appended anything to the list of params, we do so now
264 if not (print_depth or block_depth or comment_depth) and not inside_quotes and not appended and token != '':
265 params.append(token)
267 # if this was the last token in the list, and we have more than
268 # one item (meaning we split on newlines), add a newline back here
269 # to preserve the original structure
270 if len(items) > 1 and itemidx != len(items) - 1 and not line_continuation:
271 # Make sure there is a params item to store result in.
272 if not params:
273 params.append('')
275 params[-1] += '\n'
277 # If we're done and things are not at zero depth or we're still inside quotes,
278 # raise an error to indicate that the args were unbalanced
279 if print_depth or block_depth or comment_depth or inside_quotes:
280 raise AnsibleParserError(u"failed at splitting arguments, either an unbalanced jinja2 block or quotes: {0}".format(args))
282 return params