Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/genshi/template/markup.py: 12%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

223 statements  

1# -*- coding: utf-8 -*- 

2# 

3# Copyright (C) 2006-2010 Edgewall Software 

4# All rights reserved. 

5# 

6# This software is licensed as described in the file COPYING, which 

7# you should have received as part of this distribution. The terms 

8# are also available at http://genshi.edgewall.org/wiki/License. 

9# 

10# This software consists of voluntary contributions made by many 

11# individuals. For the exact contribution history, see the revision 

12# history and logs, available at http://genshi.edgewall.org/log/. 

13 

14"""Markup templating engine.""" 

15 

16from itertools import chain 

17 

18from genshi.core import Attrs, Markup, Namespace, Stream 

19from genshi.core import START, END, START_NS, END_NS, TEXT, PI, COMMENT 

20from genshi.input import XMLParser 

21from genshi.template.base import BadDirectiveError, Template, \ 

22 TemplateSyntaxError, _apply_directives, \ 

23 EXEC, INCLUDE, SUB 

24from genshi.template.eval import Suite 

25from genshi.template.interpolation import interpolate 

26from genshi.template.directives import * 

27from genshi.template.text import NewTextTemplate 

28 

29__all__ = ['MarkupTemplate'] 

30__docformat__ = 'restructuredtext en' 

31 

32 

33class MarkupTemplate(Template): 

34 """Implementation of the template language for XML-based templates. 

35  

36 >>> tmpl = MarkupTemplate('''<ul xmlns:py="http://genshi.edgewall.org/"> 

37 ... <li py:for="item in items">${item}</li> 

38 ... </ul>''') 

39 >>> print(tmpl.generate(items=[1, 2, 3])) 

40 <ul> 

41 <li>1</li><li>2</li><li>3</li> 

42 </ul> 

43 """ 

44 

45 DIRECTIVE_NAMESPACE = 'http://genshi.edgewall.org/' 

46 XINCLUDE_NAMESPACE = 'http://www.w3.org/2001/XInclude' 

47 

48 directives = [('def', DefDirective), 

49 ('match', MatchDirective), 

50 ('when', WhenDirective), 

51 ('otherwise', OtherwiseDirective), 

52 ('for', ForDirective), 

53 ('if', IfDirective), 

54 ('choose', ChooseDirective), 

55 ('with', WithDirective), 

56 ('replace', ReplaceDirective), 

57 ('content', ContentDirective), 

58 ('attrs', AttrsDirective), 

59 ('strip', StripDirective)] 

60 serializer = 'xml' 

61 _number_conv = Markup 

62 

63 def __init__(self, source, filepath=None, filename=None, loader=None, 

64 encoding=None, lookup='strict', allow_exec=True): 

65 Template.__init__(self, source, filepath=filepath, filename=filename, 

66 loader=loader, encoding=encoding, lookup=lookup, 

67 allow_exec=allow_exec) 

68 self.add_directives(self.DIRECTIVE_NAMESPACE, self) 

69 

70 def _init_filters(self): 

71 Template._init_filters(self) 

72 # Make sure the include filter comes after the match filter 

73 self.filters.remove(self._include) 

74 self.filters += [self._match, self._include] 

75 

76 def _parse(self, source, encoding): 

77 if not isinstance(source, Stream): 

78 source = XMLParser(source, filename=self.filename, 

79 encoding=encoding) 

80 stream = [] 

81 

82 for kind, data, pos in source: 

83 

84 if kind is TEXT: 

85 for kind, data, pos in interpolate(data, self.filepath, pos[1], 

86 pos[2], lookup=self.lookup): 

87 stream.append((kind, data, pos)) 

88 

89 elif kind is PI and data[0] == 'python': 

90 if not self.allow_exec: 

91 raise TemplateSyntaxError('Python code blocks not allowed', 

92 self.filepath, *pos[1:]) 

93 try: 

94 suite = Suite(data[1], self.filepath, pos[1], 

95 lookup=self.lookup) 

96 except SyntaxError as err: 

97 raise TemplateSyntaxError(err, self.filepath, 

98 pos[1] + (err.lineno or 1) - 1, 

99 pos[2] + (err.offset or 0)) 

100 stream.append((EXEC, suite, pos)) 

101 

102 elif kind is COMMENT: 

103 if not data.lstrip().startswith('!'): 

104 stream.append((kind, data, pos)) 

105 

106 else: 

107 stream.append((kind, data, pos)) 

108 

109 return stream 

110 

111 def _extract_directives(self, stream, namespace, factory): 

112 depth = 0 

113 dirmap = {} # temporary mapping of directives to elements 

114 new_stream = [] 

115 ns_prefix = {} # namespace prefixes in use 

116 

117 for kind, data, pos in stream: 

118 

119 if kind is START: 

120 tag, attrs = data 

121 directives = [] 

122 strip = False 

123 

124 if tag.namespace == namespace: 

125 cls = factory.get_directive(tag.localname) 

126 if cls is None: 

127 raise BadDirectiveError(tag.localname, 

128 self.filepath, pos[1]) 

129 args = dict([(name.localname, value) for name, value 

130 in attrs if not name.namespace]) 

131 directives.append((factory.get_directive_index(cls), cls, 

132 args, ns_prefix.copy(), pos)) 

133 strip = True 

134 

135 new_attrs = [] 

136 for name, value in attrs: 

137 if name.namespace == namespace: 

138 cls = factory.get_directive(name.localname) 

139 if cls is None: 

140 raise BadDirectiveError(name.localname, 

141 self.filepath, pos[1]) 

142 if type(value) is list and len(value) == 1: 

143 value = value[0][1] 

144 directives.append((factory.get_directive_index(cls), 

145 cls, value, ns_prefix.copy(), pos)) 

146 else: 

147 new_attrs.append((name, value)) 

148 new_attrs = Attrs(new_attrs) 

149 

150 if directives: 

151 directives.sort(key=lambda x: x[0]) 

152 dirmap[(depth, tag)] = (directives, len(new_stream), 

153 strip) 

154 

155 new_stream.append((kind, (tag, new_attrs), pos)) 

156 depth += 1 

157 

158 elif kind is END: 

159 depth -= 1 

160 new_stream.append((kind, data, pos)) 

161 

162 # If there have have directive attributes with the 

163 # corresponding start tag, move the events inbetween into 

164 # a "subprogram" 

165 if (depth, data) in dirmap: 

166 directives, offset, strip = dirmap.pop((depth, data)) 

167 substream = new_stream[offset:] 

168 if strip: 

169 substream = substream[1:-1] 

170 new_stream[offset:] = [ 

171 (SUB, (directives, substream), pos) 

172 ] 

173 

174 elif kind is SUB: 

175 directives, prev_substream = data 

176 substream = self._extract_directives(prev_substream, namespace, 

177 factory) 

178 

179 if (len(substream) == 1 and substream[0][0] is SUB 

180 # merge only if the direct substream has changed 

181 and (prev_substream[0][0] is not SUB 

182 or prev_substream[0][1][0] != substream[0][1][0])): 

183 added_directives, substream = substream[0][1] 

184 directives += added_directives 

185 

186 new_stream.append((kind, (directives, substream), pos)) 

187 

188 elif kind is START_NS: 

189 # Strip out the namespace declaration for template 

190 # directives 

191 prefix, uri = data 

192 ns_prefix[prefix] = uri 

193 if uri != namespace: 

194 new_stream.append((kind, data, pos)) 

195 

196 elif kind is END_NS: 

197 uri = ns_prefix.pop(data, None) 

198 if uri and uri != namespace: 

199 new_stream.append((kind, data, pos)) 

200 

201 else: 

202 new_stream.append((kind, data, pos)) 

203 

204 return new_stream 

205 

206 def _extract_includes(self, stream): 

207 streams = [[]] # stacked lists of events of the "compiled" template 

208 prefixes = {} 

209 fallbacks = [] 

210 includes = [] 

211 xinclude_ns = Namespace(self.XINCLUDE_NAMESPACE) 

212 

213 for kind, data, pos in stream: 

214 stream = streams[-1] 

215 

216 if kind is START: 

217 # Record any directive attributes in start tags 

218 tag, attrs = data 

219 if tag in xinclude_ns: 

220 if tag.localname == 'include': 

221 include_href = attrs.get('href') 

222 if not include_href: 

223 raise TemplateSyntaxError('Include misses required ' 

224 'attribute "href"', 

225 self.filepath, *pos[1:]) 

226 includes.append((include_href, attrs.get('parse'))) 

227 streams.append([]) 

228 elif tag.localname == 'fallback': 

229 streams.append([]) 

230 fallbacks.append(streams[-1]) 

231 else: 

232 stream.append((kind, (tag, attrs), pos)) 

233 

234 elif kind is END: 

235 if fallbacks and data == xinclude_ns['fallback']: 

236 fallback_stream = streams.pop() 

237 assert fallback_stream is fallbacks[-1] 

238 elif data == xinclude_ns['include']: 

239 fallback = None 

240 if len(fallbacks) == len(includes): 

241 fallback = fallbacks.pop() 

242 streams.pop() # discard anything between the include tags 

243 # and the fallback element 

244 stream = streams[-1] 

245 href, parse = includes.pop() 

246 try: 

247 cls = { 

248 'xml': MarkupTemplate, 

249 'text': NewTextTemplate 

250 }.get(parse) or self.__class__ 

251 except KeyError: 

252 raise TemplateSyntaxError('Invalid value for "parse" ' 

253 'attribute of include', 

254 self.filepath, *pos[1:]) 

255 stream.append((INCLUDE, (href, cls, fallback), pos)) 

256 else: 

257 stream.append((kind, data, pos)) 

258 

259 elif kind is START_NS and data[1] == xinclude_ns: 

260 # Strip out the XInclude namespace 

261 prefixes[data[0]] = data[1] 

262 

263 elif kind is END_NS and data in prefixes: 

264 prefixes.pop(data) 

265 

266 else: 

267 stream.append((kind, data, pos)) 

268 

269 assert len(streams) == 1 

270 return streams[0] 

271 

272 def _interpolate_attrs(self, stream): 

273 for kind, data, pos in stream: 

274 

275 if kind is START: 

276 # Record any directive attributes in start tags 

277 tag, attrs = data 

278 new_attrs = [] 

279 for name, value in attrs: 

280 if value: 

281 value = list(interpolate(value, self.filepath, pos[1], 

282 pos[2], lookup=self.lookup)) 

283 if len(value) == 1 and value[0][0] is TEXT: 

284 value = value[0][1] 

285 new_attrs.append((name, value)) 

286 data = tag, Attrs(new_attrs) 

287 

288 yield kind, data, pos 

289 

290 def _prepare(self, stream, inlined=None): 

291 return Template._prepare( 

292 self, self._extract_includes(self._interpolate_attrs(stream)), 

293 inlined=inlined) 

294 

295 def add_directives(self, namespace, factory): 

296 """Register a custom `DirectiveFactory` for a given namespace. 

297  

298 :param namespace: the namespace URI 

299 :type namespace: `basestring` 

300 :param factory: the directive factory to register 

301 :type factory: `DirectiveFactory` 

302 :since: version 0.6 

303 """ 

304 assert not self._prepared, 'Too late for adding directives, ' \ 

305 'template already prepared' 

306 self._stream = self._extract_directives(self._stream, namespace, 

307 factory) 

308 

309 def _match(self, stream, ctxt, start=0, end=None, **vars): 

310 """Internal stream filter that applies any defined match templates 

311 to the stream. 

312 """ 

313 match_templates = ctxt._match_templates 

314 

315 def _strip(stream, append): 

316 depth = 1 

317 while 1: 

318 event = next(stream) 

319 if event[0] is START: 

320 depth += 1 

321 elif event[0] is END: 

322 depth -= 1 

323 if depth > 0: 

324 yield event 

325 else: 

326 append(event) 

327 break 

328 

329 for event in stream: 

330 

331 # We (currently) only care about start and end events for matching 

332 # We might care about namespace events in the future, though 

333 if not match_templates or (event[0] is not START and 

334 event[0] is not END): 

335 yield event 

336 continue 

337 

338 for idx, (test, path, template, hints, namespaces, directives) \ 

339 in enumerate(match_templates): 

340 if idx < start or end is not None and idx >= end: 

341 continue 

342 

343 if test(event, namespaces, ctxt) is True: 

344 if 'match_once' in hints: 

345 del match_templates[idx] 

346 idx -= 1 

347 

348 # Let the remaining match templates know about the event so 

349 # they get a chance to update their internal state 

350 for test in [mt[0] for mt in match_templates[idx + 1:]]: 

351 test(event, namespaces, ctxt, updateonly=True) 

352 

353 # Consume and store all events until an end event 

354 # corresponding to this start event is encountered 

355 pre_end = idx + 1 

356 if 'match_once' not in hints and 'not_recursive' in hints: 

357 pre_end -= 1 

358 tail = [] 

359 inner = _strip(stream, tail.append) 

360 if pre_end > 0: 

361 inner = self._match(inner, ctxt, start=start, 

362 end=pre_end, **vars) 

363 content = self._include(chain([event], inner, tail), ctxt) 

364 if 'not_buffered' not in hints: 

365 content = list(content) 

366 content = Stream(content) 

367 

368 # Make the select() function available in the body of the 

369 # match template 

370 selected = [False] 

371 def select(path): 

372 selected[0] = True 

373 return content.select(path, namespaces, ctxt) 

374 vars = dict(select=select) 

375 

376 # Recursively process the output 

377 template = _apply_directives(template, directives, ctxt, 

378 vars) 

379 for event in self._match(self._flatten(template, ctxt, 

380 **vars), 

381 ctxt, start=idx + 1, **vars): 

382 yield event 

383 

384 # If the match template did not actually call select to 

385 # consume the matched stream, the original events need to 

386 # be consumed here or they'll get appended to the output 

387 if not selected[0]: 

388 for event in content: 

389 pass 

390 

391 # Let this match template and the remaining match 

392 # templates know about the last event in the 

393 # matched content, so they can update their 

394 # internal state accordingly 

395 for test in [mt[0] for mt in match_templates[idx:]]: 

396 test(tail[0], namespaces, ctxt, updateonly=True) 

397 

398 break 

399 

400 else: # no matches 

401 yield event