Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/ansible_core-2.17.0.dev0-py3.8.egg/ansible/parsing/splitter.py: 76%

125 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-11-30 06:38 +0000

1# (c) 2014 James Cammarata, <jcammarata@ansible.com> 

2# 

3# This file is part of Ansible 

4# 

5# Ansible is free software: you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation, either version 3 of the License, or 

8# (at your option) any later version. 

9# 

10# Ansible is distributed in the hope that it will be useful, 

11# but WITHOUT ANY WARRANTY; without even the implied warranty of 

12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13# GNU General Public License for more details. 

14# 

15# You should have received a copy of the GNU General Public License 

16# along with Ansible. If not, see <http://www.gnu.org/licenses/>. 

17 

18from __future__ import annotations 

19 

20import codecs 

21import re 

22 

23from ansible.errors import AnsibleParserError 

24from ansible.module_utils.common.text.converters import to_text 

25from ansible.parsing.quoting import unquote 

26 

27 

28# Decode escapes adapted from rspeer's answer here: 

29# http://stackoverflow.com/questions/4020539/process-escape-sequences-in-a-string-in-python 

30_HEXCHAR = '[a-fA-F0-9]' 

31_ESCAPE_SEQUENCE_RE = re.compile(r''' 

32 ( \\U{0} # 8-digit hex escapes 

33 | \\u{1} # 4-digit hex escapes 

34 | \\x{2} # 2-digit hex escapes 

35 | \\N\{{[^}}]+\}} # Unicode characters by name 

36 | \\[\\'"abfnrtv] # Single-character escapes 

37 )'''.format(_HEXCHAR * 8, _HEXCHAR * 4, _HEXCHAR * 2), re.UNICODE | re.VERBOSE) 

38 

39 

40def _decode_escapes(s): 

41 def decode_match(match): 

42 return codecs.decode(match.group(0), 'unicode-escape') 

43 

44 return _ESCAPE_SEQUENCE_RE.sub(decode_match, s) 

45 

46 

47def parse_kv(args, check_raw=False): 

48 ''' 

49 Convert a string of key/value items to a dict. If any free-form params 

50 are found and the check_raw option is set to True, they will be added 

51 to a new parameter called '_raw_params'. If check_raw is not enabled, 

52 they will simply be ignored. 

53 ''' 

54 

55 args = to_text(args, nonstring='passthru') 

56 

57 options = {} 

58 if args is not None: 

59 vargs = split_args(args) 

60 

61 raw_params = [] 

62 for orig_x in vargs: 

63 x = _decode_escapes(orig_x) 

64 if "=" in x: 

65 pos = 0 

66 try: 

67 while True: 

68 pos = x.index('=', pos + 1) 

69 if pos > 0 and x[pos - 1] != '\\': 

70 break 

71 except ValueError: 

72 # ran out of string, but we must have some escaped equals, 

73 # so replace those and append this to the list of raw params 

74 raw_params.append(x.replace('\\=', '=')) 

75 continue 

76 

77 k = x[:pos] 

78 v = x[pos + 1:] 

79 

80 # FIXME: make the retrieval of this list of shell/command options a function, so the list is centralized 

81 if check_raw and k not in ('creates', 'removes', 'chdir', 'executable', 'warn', 'stdin', 'stdin_add_newline', 'strip_empty_ends'): 

82 raw_params.append(orig_x) 

83 else: 

84 options[k.strip()] = unquote(v.strip()) 

85 else: 

86 raw_params.append(orig_x) 

87 

88 # recombine the free-form params, if any were found, and assign 

89 # them to a special option for use later by the shell/command module 

90 if len(raw_params) > 0: 

91 options[u'_raw_params'] = join_args(raw_params) 

92 

93 return options 

94 

95 

96def _get_quote_state(token, quote_char): 

97 ''' 

98 the goal of this block is to determine if the quoted string 

99 is unterminated in which case it needs to be put back together 

100 ''' 

101 # the char before the current one, used to see if 

102 # the current character is escaped 

103 prev_char = None 

104 for idx, cur_char in enumerate(token): 

105 if idx > 0: 

106 prev_char = token[idx - 1] 

107 if cur_char in '"\'' and prev_char != '\\': 

108 if quote_char: 

109 if cur_char == quote_char: 

110 quote_char = None 

111 else: 

112 quote_char = cur_char 

113 return quote_char 

114 

115 

116def _count_jinja2_blocks(token, cur_depth, open_token, close_token): 

117 ''' 

118 this function counts the number of opening/closing blocks for a 

119 given opening/closing type and adjusts the current depth for that 

120 block based on the difference 

121 ''' 

122 num_open = token.count(open_token) 

123 num_close = token.count(close_token) 

124 if num_open != num_close: 

125 cur_depth += (num_open - num_close) 

126 if cur_depth < 0: 

127 cur_depth = 0 

128 return cur_depth 

129 

130 

131def join_args(s): 

132 ''' 

133 Join the original cmd based on manipulations by split_args(). 

134 This retains the original newlines and whitespaces. 

135 ''' 

136 result = '' 

137 for p in s: 

138 if len(result) == 0 or result.endswith('\n'): 

139 result += p 

140 else: 

141 result += ' ' + p 

142 return result 

143 

144 

145def split_args(args): 

146 ''' 

147 Splits args on whitespace, but intelligently reassembles 

148 those that may have been split over a jinja2 block or quotes. 

149 

150 When used in a remote module, we won't ever have to be concerned about 

151 jinja2 blocks, however this function is/will be used in the 

152 core portions as well before the args are templated. 

153 

154 example input: a=b c="foo bar" 

155 example output: ['a=b', 'c="foo bar"'] 

156 

157 Basically this is a variation shlex that has some more intelligence for 

158 how Ansible needs to use it. 

159 ''' 

160 

161 if not args: 

162 return [] 

163 

164 # the list of params parsed out of the arg string 

165 # this is going to be the result value when we are done 

166 params = [] 

167 

168 # Initial split on newlines 

169 items = args.split('\n') 

170 

171 # iterate over the tokens, and reassemble any that may have been 

172 # split on a space inside a jinja2 block. 

173 # ex if tokens are "{{", "foo", "}}" these go together 

174 

175 # These variables are used 

176 # to keep track of the state of the parsing, since blocks and quotes 

177 # may be nested within each other. 

178 

179 quote_char = None 

180 inside_quotes = False 

181 print_depth = 0 # used to count nested jinja2 {{ }} blocks 

182 block_depth = 0 # used to count nested jinja2 {% %} blocks 

183 comment_depth = 0 # used to count nested jinja2 {# #} blocks 

184 

185 # now we loop over each split chunk, coalescing tokens if the white space 

186 # split occurred within quotes or a jinja2 block of some kind 

187 for (itemidx, item) in enumerate(items): 

188 

189 # we split on spaces and newlines separately, so that we 

190 # can tell which character we split on for reassembly 

191 # inside quotation characters 

192 tokens = item.split(' ') 

193 

194 line_continuation = False 

195 for (idx, token) in enumerate(tokens): 

196 

197 # Empty entries means we have subsequent spaces 

198 # We want to hold onto them so we can reconstruct them later 

199 if len(token) == 0 and idx != 0: 

200 # Make sure there is a params item to store result in. 

201 if not params: 

202 params.append('') 

203 

204 params[-1] += ' ' 

205 continue 

206 

207 # if we hit a line continuation character, but 

208 # we're not inside quotes, ignore it and continue 

209 # on to the next token while setting a flag 

210 if token == '\\' and not inside_quotes: 

211 line_continuation = True 

212 continue 

213 

214 # store the previous quoting state for checking later 

215 was_inside_quotes = inside_quotes 

216 quote_char = _get_quote_state(token, quote_char) 

217 inside_quotes = quote_char is not None 

218 

219 # multiple conditions may append a token to the list of params, 

220 # so we keep track with this flag to make sure it only happens once 

221 # append means add to the end of the list, don't append means concatenate 

222 # it to the end of the last token 

223 appended = False 

224 

225 # if we're inside quotes now, but weren't before, append the token 

226 # to the end of the list, since we'll tack on more to it later 

227 # otherwise, if we're inside any jinja2 block, inside quotes, or we were 

228 # inside quotes (but aren't now) concat this token to the last param 

229 if inside_quotes and not was_inside_quotes and not (print_depth or block_depth or comment_depth): 

230 params.append(token) 

231 appended = True 

232 elif print_depth or block_depth or comment_depth or inside_quotes or was_inside_quotes: 

233 if idx == 0 and was_inside_quotes: 

234 params[-1] = "%s%s" % (params[-1], token) 

235 else: 

236 spacer = '' 

237 if idx > 0: 

238 spacer = ' ' 

239 params[-1] = "%s%s%s" % (params[-1], spacer, token) 

240 appended = True 

241 

242 # if the number of paired block tags is not the same, the depth has changed, so we calculate that here 

243 # and may append the current token to the params (if we haven't previously done so) 

244 prev_print_depth = print_depth 

245 print_depth = _count_jinja2_blocks(token, print_depth, "{{", "}}") 

246 if print_depth != prev_print_depth and not appended: 

247 params.append(token) 

248 appended = True 

249 

250 prev_block_depth = block_depth 

251 block_depth = _count_jinja2_blocks(token, block_depth, "{%", "%}") 

252 if block_depth != prev_block_depth and not appended: 

253 params.append(token) 

254 appended = True 

255 

256 prev_comment_depth = comment_depth 

257 comment_depth = _count_jinja2_blocks(token, comment_depth, "{#", "#}") 

258 if comment_depth != prev_comment_depth and not appended: 

259 params.append(token) 

260 appended = True 

261 

262 # finally, if we're at zero depth for all blocks and not inside quotes, and have not 

263 # yet appended anything to the list of params, we do so now 

264 if not (print_depth or block_depth or comment_depth) and not inside_quotes and not appended and token != '': 

265 params.append(token) 

266 

267 # if this was the last token in the list, and we have more than 

268 # one item (meaning we split on newlines), add a newline back here 

269 # to preserve the original structure 

270 if len(items) > 1 and itemidx != len(items) - 1 and not line_continuation: 

271 # Make sure there is a params item to store result in. 

272 if not params: 

273 params.append('') 

274 

275 params[-1] += '\n' 

276 

277 # If we're done and things are not at zero depth or we're still inside quotes, 

278 # raise an error to indicate that the args were unbalanced 

279 if print_depth or block_depth or comment_depth or inside_quotes: 

280 raise AnsibleParserError(u"failed at splitting arguments, either an unbalanced jinja2 block or quotes: {0}".format(args)) 

281 

282 return params