1"""Compiles nodes from the parser into Python code."""
2
3import typing as t
4from contextlib import contextmanager
5from functools import update_wrapper
6from io import StringIO
7from itertools import chain
8from keyword import iskeyword as is_python_keyword
9
10from markupsafe import escape
11from markupsafe import Markup
12
13from . import nodes
14from .exceptions import TemplateAssertionError
15from .idtracking import Symbols
16from .idtracking import VAR_LOAD_ALIAS
17from .idtracking import VAR_LOAD_PARAMETER
18from .idtracking import VAR_LOAD_RESOLVE
19from .idtracking import VAR_LOAD_UNDEFINED
20from .nodes import EvalContext
21from .optimizer import Optimizer
22from .utils import _PassArg
23from .utils import concat
24from .visitor import NodeVisitor
25
26if t.TYPE_CHECKING:
27 import typing_extensions as te
28
29 from .environment import Environment
30
31F = t.TypeVar("F", bound=t.Callable[..., t.Any])
32
33operators = {
34 "eq": "==",
35 "ne": "!=",
36 "gt": ">",
37 "gteq": ">=",
38 "lt": "<",
39 "lteq": "<=",
40 "in": "in",
41 "notin": "not in",
42}
43
44
45def optimizeconst(f: F) -> F:
46 def new_func(
47 self: "CodeGenerator", node: nodes.Expr, frame: "Frame", **kwargs: t.Any
48 ) -> t.Any:
49 # Only optimize if the frame is not volatile
50 if self.optimizer is not None and not frame.eval_ctx.volatile:
51 new_node = self.optimizer.visit(node, frame.eval_ctx)
52
53 if new_node != node:
54 return self.visit(new_node, frame)
55
56 return f(self, node, frame, **kwargs)
57
58 return update_wrapper(new_func, f) # type: ignore[return-value]
59
60
61def _make_binop(op: str) -> t.Callable[["CodeGenerator", nodes.BinExpr, "Frame"], None]:
62 @optimizeconst
63 def visitor(self: "CodeGenerator", node: nodes.BinExpr, frame: Frame) -> None:
64 if (
65 self.environment.sandboxed and op in self.environment.intercepted_binops # type: ignore
66 ):
67 self.write(f"environment.call_binop(context, {op!r}, ")
68 self.visit(node.left, frame)
69 self.write(", ")
70 self.visit(node.right, frame)
71 else:
72 self.write("(")
73 self.visit(node.left, frame)
74 self.write(f" {op} ")
75 self.visit(node.right, frame)
76
77 self.write(")")
78
79 return visitor
80
81
82def _make_unop(
83 op: str,
84) -> t.Callable[["CodeGenerator", nodes.UnaryExpr, "Frame"], None]:
85 @optimizeconst
86 def visitor(self: "CodeGenerator", node: nodes.UnaryExpr, frame: Frame) -> None:
87 if (
88 self.environment.sandboxed and op in self.environment.intercepted_unops # type: ignore
89 ):
90 self.write(f"environment.call_unop(context, {op!r}, ")
91 self.visit(node.node, frame)
92 else:
93 self.write("(" + op)
94 self.visit(node.node, frame)
95
96 self.write(")")
97
98 return visitor
99
100
101def generate(
102 node: nodes.Template,
103 environment: "Environment",
104 name: t.Optional[str],
105 filename: t.Optional[str],
106 stream: t.Optional[t.TextIO] = None,
107 defer_init: bool = False,
108 optimized: bool = True,
109) -> t.Optional[str]:
110 """Generate the python source for a node tree."""
111 if not isinstance(node, nodes.Template):
112 raise TypeError("Can't compile non template nodes")
113
114 generator = environment.code_generator_class(
115 environment, name, filename, stream, defer_init, optimized
116 )
117 generator.visit(node)
118
119 if stream is None:
120 return generator.stream.getvalue() # type: ignore
121
122 return None
123
124
125def has_safe_repr(value: t.Any) -> bool:
126 """Does the node have a safe representation?"""
127 if value is None or value is NotImplemented or value is Ellipsis:
128 return True
129
130 if type(value) in {bool, int, float, complex, range, str, Markup}:
131 return True
132
133 if type(value) in {tuple, list, set, frozenset}:
134 return all(has_safe_repr(v) for v in value)
135
136 if type(value) is dict: # noqa E721
137 return all(has_safe_repr(k) and has_safe_repr(v) for k, v in value.items())
138
139 return False
140
141
142def find_undeclared(
143 nodes: t.Iterable[nodes.Node], names: t.Iterable[str]
144) -> t.Set[str]:
145 """Check if the names passed are accessed undeclared. The return value
146 is a set of all the undeclared names from the sequence of names found.
147 """
148 visitor = UndeclaredNameVisitor(names)
149 try:
150 for node in nodes:
151 visitor.visit(node)
152 except VisitorExit:
153 pass
154 return visitor.undeclared
155
156
157class MacroRef:
158 def __init__(self, node: t.Union[nodes.Macro, nodes.CallBlock]) -> None:
159 self.node = node
160 self.accesses_caller = False
161 self.accesses_kwargs = False
162 self.accesses_varargs = False
163
164
165class Frame:
166 """Holds compile time information for us."""
167
168 def __init__(
169 self,
170 eval_ctx: EvalContext,
171 parent: t.Optional["Frame"] = None,
172 level: t.Optional[int] = None,
173 ) -> None:
174 self.eval_ctx = eval_ctx
175
176 # the parent of this frame
177 self.parent = parent
178
179 if parent is None:
180 self.symbols = Symbols(level=level)
181
182 # in some dynamic inheritance situations the compiler needs to add
183 # write tests around output statements.
184 self.require_output_check = False
185
186 # inside some tags we are using a buffer rather than yield statements.
187 # this for example affects {% filter %} or {% macro %}. If a frame
188 # is buffered this variable points to the name of the list used as
189 # buffer.
190 self.buffer: t.Optional[str] = None
191
192 # the name of the block we're in, otherwise None.
193 self.block: t.Optional[str] = None
194
195 else:
196 self.symbols = Symbols(parent.symbols, level=level)
197 self.require_output_check = parent.require_output_check
198 self.buffer = parent.buffer
199 self.block = parent.block
200
201 # a toplevel frame is the root + soft frames such as if conditions.
202 self.toplevel = False
203
204 # the root frame is basically just the outermost frame, so no if
205 # conditions. This information is used to optimize inheritance
206 # situations.
207 self.rootlevel = False
208
209 # variables set inside of loops and blocks should not affect outer frames,
210 # but they still needs to be kept track of as part of the active context.
211 self.loop_frame = False
212 self.block_frame = False
213
214 # track whether the frame is being used in an if-statement or conditional
215 # expression as it determines which errors should be raised during runtime
216 # or compile time.
217 self.soft_frame = False
218
219 def copy(self) -> "te.Self":
220 """Create a copy of the current one."""
221 rv = object.__new__(self.__class__)
222 rv.__dict__.update(self.__dict__)
223 rv.symbols = self.symbols.copy()
224 return rv
225
226 def inner(self, isolated: bool = False) -> "Frame":
227 """Return an inner frame."""
228 if isolated:
229 return Frame(self.eval_ctx, level=self.symbols.level + 1)
230 return Frame(self.eval_ctx, self)
231
232 def soft(self) -> "te.Self":
233 """Return a soft frame. A soft frame may not be modified as
234 standalone thing as it shares the resources with the frame it
235 was created of, but it's not a rootlevel frame any longer.
236
237 This is only used to implement if-statements and conditional
238 expressions.
239 """
240 rv = self.copy()
241 rv.rootlevel = False
242 rv.soft_frame = True
243 return rv
244
245 __copy__ = copy
246
247
248class VisitorExit(RuntimeError):
249 """Exception used by the `UndeclaredNameVisitor` to signal a stop."""
250
251
252class DependencyFinderVisitor(NodeVisitor):
253 """A visitor that collects filter and test calls."""
254
255 def __init__(self) -> None:
256 self.filters: t.Set[str] = set()
257 self.tests: t.Set[str] = set()
258
259 def visit_Filter(self, node: nodes.Filter) -> None:
260 self.generic_visit(node)
261 self.filters.add(node.name)
262
263 def visit_Test(self, node: nodes.Test) -> None:
264 self.generic_visit(node)
265 self.tests.add(node.name)
266
267 def visit_Block(self, node: nodes.Block) -> None:
268 """Stop visiting at blocks."""
269
270
271class UndeclaredNameVisitor(NodeVisitor):
272 """A visitor that checks if a name is accessed without being
273 declared. This is different from the frame visitor as it will
274 not stop at closure frames.
275 """
276
277 def __init__(self, names: t.Iterable[str]) -> None:
278 self.names = set(names)
279 self.undeclared: t.Set[str] = set()
280
281 def visit_Name(self, node: nodes.Name) -> None:
282 if node.ctx == "load" and node.name in self.names:
283 self.undeclared.add(node.name)
284 if self.undeclared == self.names:
285 raise VisitorExit()
286 else:
287 self.names.discard(node.name)
288
289 def visit_Block(self, node: nodes.Block) -> None:
290 """Stop visiting a blocks."""
291
292
293class CompilerExit(Exception):
294 """Raised if the compiler encountered a situation where it just
295 doesn't make sense to further process the code. Any block that
296 raises such an exception is not further processed.
297 """
298
299
300class CodeGenerator(NodeVisitor):
301 def __init__(
302 self,
303 environment: "Environment",
304 name: t.Optional[str],
305 filename: t.Optional[str],
306 stream: t.Optional[t.TextIO] = None,
307 defer_init: bool = False,
308 optimized: bool = True,
309 ) -> None:
310 if stream is None:
311 stream = StringIO()
312 self.environment = environment
313 self.name = name
314 self.filename = filename
315 self.stream = stream
316 self.created_block_context = False
317 self.defer_init = defer_init
318 self.optimizer: t.Optional[Optimizer] = None
319
320 if optimized:
321 self.optimizer = Optimizer(environment)
322
323 # aliases for imports
324 self.import_aliases: t.Dict[str, str] = {}
325
326 # a registry for all blocks. Because blocks are moved out
327 # into the global python scope they are registered here
328 self.blocks: t.Dict[str, nodes.Block] = {}
329
330 # the number of extends statements so far
331 self.extends_so_far = 0
332
333 # some templates have a rootlevel extends. In this case we
334 # can safely assume that we're a child template and do some
335 # more optimizations.
336 self.has_known_extends = False
337
338 # the current line number
339 self.code_lineno = 1
340
341 # registry of all filters and tests (global, not block local)
342 self.tests: t.Dict[str, str] = {}
343 self.filters: t.Dict[str, str] = {}
344
345 # the debug information
346 self.debug_info: t.List[t.Tuple[int, int]] = []
347 self._write_debug_info: t.Optional[int] = None
348
349 # the number of new lines before the next write()
350 self._new_lines = 0
351
352 # the line number of the last written statement
353 self._last_line = 0
354
355 # true if nothing was written so far.
356 self._first_write = True
357
358 # used by the `temporary_identifier` method to get new
359 # unique, temporary identifier
360 self._last_identifier = 0
361
362 # the current indentation
363 self._indentation = 0
364
365 # Tracks toplevel assignments
366 self._assign_stack: t.List[t.Set[str]] = []
367
368 # Tracks parameter definition blocks
369 self._param_def_block: t.List[t.Set[str]] = []
370
371 # Tracks the current context.
372 self._context_reference_stack = ["context"]
373
374 @property
375 def optimized(self) -> bool:
376 return self.optimizer is not None
377
378 # -- Various compilation helpers
379
380 def fail(self, msg: str, lineno: int) -> "te.NoReturn":
381 """Fail with a :exc:`TemplateAssertionError`."""
382 raise TemplateAssertionError(msg, lineno, self.name, self.filename)
383
384 def temporary_identifier(self) -> str:
385 """Get a new unique identifier."""
386 self._last_identifier += 1
387 return f"t_{self._last_identifier}"
388
389 def buffer(self, frame: Frame) -> None:
390 """Enable buffering for the frame from that point onwards."""
391 frame.buffer = self.temporary_identifier()
392 self.writeline(f"{frame.buffer} = []")
393
394 def return_buffer_contents(
395 self, frame: Frame, force_unescaped: bool = False
396 ) -> None:
397 """Return the buffer contents of the frame."""
398 if not force_unescaped:
399 if frame.eval_ctx.volatile:
400 self.writeline("if context.eval_ctx.autoescape:")
401 self.indent()
402 self.writeline(f"return Markup(concat({frame.buffer}))")
403 self.outdent()
404 self.writeline("else:")
405 self.indent()
406 self.writeline(f"return concat({frame.buffer})")
407 self.outdent()
408 return
409 elif frame.eval_ctx.autoescape:
410 self.writeline(f"return Markup(concat({frame.buffer}))")
411 return
412 self.writeline(f"return concat({frame.buffer})")
413
414 def indent(self) -> None:
415 """Indent by one."""
416 self._indentation += 1
417
418 def outdent(self, step: int = 1) -> None:
419 """Outdent by step."""
420 self._indentation -= step
421
422 def start_write(self, frame: Frame, node: t.Optional[nodes.Node] = None) -> None:
423 """Yield or write into the frame buffer."""
424 if frame.buffer is None:
425 self.writeline("yield ", node)
426 else:
427 self.writeline(f"{frame.buffer}.append(", node)
428
429 def end_write(self, frame: Frame) -> None:
430 """End the writing process started by `start_write`."""
431 if frame.buffer is not None:
432 self.write(")")
433
434 def simple_write(
435 self, s: str, frame: Frame, node: t.Optional[nodes.Node] = None
436 ) -> None:
437 """Simple shortcut for start_write + write + end_write."""
438 self.start_write(frame, node)
439 self.write(s)
440 self.end_write(frame)
441
442 def blockvisit(self, nodes: t.Iterable[nodes.Node], frame: Frame) -> None:
443 """Visit a list of nodes as block in a frame. If the current frame
444 is no buffer a dummy ``if 0: yield None`` is written automatically.
445 """
446 try:
447 self.writeline("pass")
448 for node in nodes:
449 self.visit(node, frame)
450 except CompilerExit:
451 pass
452
453 def write(self, x: str) -> None:
454 """Write a string into the output stream."""
455 if self._new_lines:
456 if not self._first_write:
457 self.stream.write("\n" * self._new_lines)
458 self.code_lineno += self._new_lines
459 if self._write_debug_info is not None:
460 self.debug_info.append((self._write_debug_info, self.code_lineno))
461 self._write_debug_info = None
462 self._first_write = False
463 self.stream.write(" " * self._indentation)
464 self._new_lines = 0
465 self.stream.write(x)
466
467 def writeline(
468 self, x: str, node: t.Optional[nodes.Node] = None, extra: int = 0
469 ) -> None:
470 """Combination of newline and write."""
471 self.newline(node, extra)
472 self.write(x)
473
474 def newline(self, node: t.Optional[nodes.Node] = None, extra: int = 0) -> None:
475 """Add one or more newlines before the next write."""
476 self._new_lines = max(self._new_lines, 1 + extra)
477 if node is not None and node.lineno != self._last_line:
478 self._write_debug_info = node.lineno
479 self._last_line = node.lineno
480
481 def signature(
482 self,
483 node: t.Union[nodes.Call, nodes.Filter, nodes.Test],
484 frame: Frame,
485 extra_kwargs: t.Optional[t.Mapping[str, t.Any]] = None,
486 ) -> None:
487 """Writes a function call to the stream for the current node.
488 A leading comma is added automatically. The extra keyword
489 arguments may not include python keywords otherwise a syntax
490 error could occur. The extra keyword arguments should be given
491 as python dict.
492 """
493 # if any of the given keyword arguments is a python keyword
494 # we have to make sure that no invalid call is created.
495 kwarg_workaround = any(
496 is_python_keyword(t.cast(str, k))
497 for k in chain((x.key for x in node.kwargs), extra_kwargs or ())
498 )
499
500 for arg in node.args:
501 self.write(", ")
502 self.visit(arg, frame)
503
504 if not kwarg_workaround:
505 for kwarg in node.kwargs:
506 self.write(", ")
507 self.visit(kwarg, frame)
508 if extra_kwargs is not None:
509 for key, value in extra_kwargs.items():
510 self.write(f", {key}={value}")
511 if node.dyn_args:
512 self.write(", *")
513 self.visit(node.dyn_args, frame)
514
515 if kwarg_workaround:
516 if node.dyn_kwargs is not None:
517 self.write(", **dict({")
518 else:
519 self.write(", **{")
520 for kwarg in node.kwargs:
521 self.write(f"{kwarg.key!r}: ")
522 self.visit(kwarg.value, frame)
523 self.write(", ")
524 if extra_kwargs is not None:
525 for key, value in extra_kwargs.items():
526 self.write(f"{key!r}: {value}, ")
527 if node.dyn_kwargs is not None:
528 self.write("}, **")
529 self.visit(node.dyn_kwargs, frame)
530 self.write(")")
531 else:
532 self.write("}")
533
534 elif node.dyn_kwargs is not None:
535 self.write(", **")
536 self.visit(node.dyn_kwargs, frame)
537
538 def pull_dependencies(self, nodes: t.Iterable[nodes.Node]) -> None:
539 """Find all filter and test names used in the template and
540 assign them to variables in the compiled namespace. Checking
541 that the names are registered with the environment is done when
542 compiling the Filter and Test nodes. If the node is in an If or
543 CondExpr node, the check is done at runtime instead.
544
545 .. versionchanged:: 3.0
546 Filters and tests in If and CondExpr nodes are checked at
547 runtime instead of compile time.
548 """
549 visitor = DependencyFinderVisitor()
550
551 for node in nodes:
552 visitor.visit(node)
553
554 for id_map, names, dependency in (
555 (self.filters, visitor.filters, "filters"),
556 (
557 self.tests,
558 visitor.tests,
559 "tests",
560 ),
561 ):
562 for name in sorted(names):
563 if name not in id_map:
564 id_map[name] = self.temporary_identifier()
565
566 # add check during runtime that dependencies used inside of executed
567 # blocks are defined, as this step may be skipped during compile time
568 self.writeline("try:")
569 self.indent()
570 self.writeline(f"{id_map[name]} = environment.{dependency}[{name!r}]")
571 self.outdent()
572 self.writeline("except KeyError:")
573 self.indent()
574 self.writeline("@internalcode")
575 self.writeline(f"def {id_map[name]}(*unused):")
576 self.indent()
577 self.writeline(
578 f'raise TemplateRuntimeError("No {dependency[:-1]}'
579 f' named {name!r} found.")'
580 )
581 self.outdent()
582 self.outdent()
583
584 def enter_frame(self, frame: Frame) -> None:
585 undefs = []
586 for target, (action, param) in frame.symbols.loads.items():
587 if action == VAR_LOAD_PARAMETER:
588 pass
589 elif action == VAR_LOAD_RESOLVE:
590 self.writeline(f"{target} = {self.get_resolve_func()}({param!r})")
591 elif action == VAR_LOAD_ALIAS:
592 self.writeline(f"{target} = {param}")
593 elif action == VAR_LOAD_UNDEFINED:
594 undefs.append(target)
595 else:
596 raise NotImplementedError("unknown load instruction")
597 if undefs:
598 self.writeline(f"{' = '.join(undefs)} = missing")
599
600 def leave_frame(self, frame: Frame, with_python_scope: bool = False) -> None:
601 if not with_python_scope:
602 undefs = []
603 for target in frame.symbols.loads:
604 undefs.append(target)
605 if undefs:
606 self.writeline(f"{' = '.join(undefs)} = missing")
607
608 def choose_async(self, async_value: str = "async ", sync_value: str = "") -> str:
609 return async_value if self.environment.is_async else sync_value
610
611 def func(self, name: str) -> str:
612 return f"{self.choose_async()}def {name}"
613
614 def macro_body(
615 self, node: t.Union[nodes.Macro, nodes.CallBlock], frame: Frame
616 ) -> t.Tuple[Frame, MacroRef]:
617 """Dump the function def of a macro or call block."""
618 frame = frame.inner()
619 frame.symbols.analyze_node(node)
620 macro_ref = MacroRef(node)
621
622 explicit_caller = None
623 skip_special_params = set()
624 args = []
625
626 for idx, arg in enumerate(node.args):
627 if arg.name == "caller":
628 explicit_caller = idx
629 if arg.name in ("kwargs", "varargs"):
630 skip_special_params.add(arg.name)
631 args.append(frame.symbols.ref(arg.name))
632
633 undeclared = find_undeclared(node.body, ("caller", "kwargs", "varargs"))
634
635 if "caller" in undeclared:
636 # In older Jinja versions there was a bug that allowed caller
637 # to retain the special behavior even if it was mentioned in
638 # the argument list. However thankfully this was only really
639 # working if it was the last argument. So we are explicitly
640 # checking this now and error out if it is anywhere else in
641 # the argument list.
642 if explicit_caller is not None:
643 try:
644 node.defaults[explicit_caller - len(node.args)]
645 except IndexError:
646 self.fail(
647 "When defining macros or call blocks the "
648 'special "caller" argument must be omitted '
649 "or be given a default.",
650 node.lineno,
651 )
652 else:
653 args.append(frame.symbols.declare_parameter("caller"))
654 macro_ref.accesses_caller = True
655 if "kwargs" in undeclared and "kwargs" not in skip_special_params:
656 args.append(frame.symbols.declare_parameter("kwargs"))
657 macro_ref.accesses_kwargs = True
658 if "varargs" in undeclared and "varargs" not in skip_special_params:
659 args.append(frame.symbols.declare_parameter("varargs"))
660 macro_ref.accesses_varargs = True
661
662 # macros are delayed, they never require output checks
663 frame.require_output_check = False
664 frame.symbols.analyze_node(node)
665 self.writeline(f"{self.func('macro')}({', '.join(args)}):", node)
666 self.indent()
667
668 self.buffer(frame)
669 self.enter_frame(frame)
670
671 self.push_parameter_definitions(frame)
672 for idx, arg in enumerate(node.args):
673 ref = frame.symbols.ref(arg.name)
674 self.writeline(f"if {ref} is missing:")
675 self.indent()
676 try:
677 default = node.defaults[idx - len(node.args)]
678 except IndexError:
679 self.writeline(
680 f'{ref} = undefined("parameter {arg.name!r} was not provided",'
681 f" name={arg.name!r})"
682 )
683 else:
684 self.writeline(f"{ref} = ")
685 self.visit(default, frame)
686 self.mark_parameter_stored(ref)
687 self.outdent()
688 self.pop_parameter_definitions()
689
690 self.blockvisit(node.body, frame)
691 self.return_buffer_contents(frame, force_unescaped=True)
692 self.leave_frame(frame, with_python_scope=True)
693 self.outdent()
694
695 return frame, macro_ref
696
697 def macro_def(self, macro_ref: MacroRef, frame: Frame) -> None:
698 """Dump the macro definition for the def created by macro_body."""
699 arg_tuple = ", ".join(repr(x.name) for x in macro_ref.node.args)
700 name = getattr(macro_ref.node, "name", None)
701 if len(macro_ref.node.args) == 1:
702 arg_tuple += ","
703 self.write(
704 f"Macro(environment, macro, {name!r}, ({arg_tuple}),"
705 f" {macro_ref.accesses_kwargs!r}, {macro_ref.accesses_varargs!r},"
706 f" {macro_ref.accesses_caller!r}, context.eval_ctx.autoescape)"
707 )
708
709 def position(self, node: nodes.Node) -> str:
710 """Return a human readable position for the node."""
711 rv = f"line {node.lineno}"
712 if self.name is not None:
713 rv = f"{rv} in {self.name!r}"
714 return rv
715
716 def dump_local_context(self, frame: Frame) -> str:
717 items_kv = ", ".join(
718 f"{name!r}: {target}"
719 for name, target in frame.symbols.dump_stores().items()
720 )
721 return f"{{{items_kv}}}"
722
723 def write_commons(self) -> None:
724 """Writes a common preamble that is used by root and block functions.
725 Primarily this sets up common local helpers and enforces a generator
726 through a dead branch.
727 """
728 self.writeline("resolve = context.resolve_or_missing")
729 self.writeline("undefined = environment.undefined")
730 self.writeline("concat = environment.concat")
731 # always use the standard Undefined class for the implicit else of
732 # conditional expressions
733 self.writeline("cond_expr_undefined = Undefined")
734 self.writeline("if 0: yield None")
735
736 def push_parameter_definitions(self, frame: Frame) -> None:
737 """Pushes all parameter targets from the given frame into a local
738 stack that permits tracking of yet to be assigned parameters. In
739 particular this enables the optimization from `visit_Name` to skip
740 undefined expressions for parameters in macros as macros can reference
741 otherwise unbound parameters.
742 """
743 self._param_def_block.append(frame.symbols.dump_param_targets())
744
745 def pop_parameter_definitions(self) -> None:
746 """Pops the current parameter definitions set."""
747 self._param_def_block.pop()
748
749 def mark_parameter_stored(self, target: str) -> None:
750 """Marks a parameter in the current parameter definitions as stored.
751 This will skip the enforced undefined checks.
752 """
753 if self._param_def_block:
754 self._param_def_block[-1].discard(target)
755
756 def push_context_reference(self, target: str) -> None:
757 self._context_reference_stack.append(target)
758
759 def pop_context_reference(self) -> None:
760 self._context_reference_stack.pop()
761
762 def get_context_ref(self) -> str:
763 return self._context_reference_stack[-1]
764
765 def get_resolve_func(self) -> str:
766 target = self._context_reference_stack[-1]
767 if target == "context":
768 return "resolve"
769 return f"{target}.resolve"
770
771 def derive_context(self, frame: Frame) -> str:
772 return f"{self.get_context_ref()}.derived({self.dump_local_context(frame)})"
773
774 def parameter_is_undeclared(self, target: str) -> bool:
775 """Checks if a given target is an undeclared parameter."""
776 if not self._param_def_block:
777 return False
778 return target in self._param_def_block[-1]
779
780 def push_assign_tracking(self) -> None:
781 """Pushes a new layer for assignment tracking."""
782 self._assign_stack.append(set())
783
784 def pop_assign_tracking(self, frame: Frame) -> None:
785 """Pops the topmost level for assignment tracking and updates the
786 context variables if necessary.
787 """
788 vars = self._assign_stack.pop()
789 if (
790 not frame.block_frame
791 and not frame.loop_frame
792 and not frame.toplevel
793 or not vars
794 ):
795 return
796 public_names = [x for x in vars if x[:1] != "_"]
797 if len(vars) == 1:
798 name = next(iter(vars))
799 ref = frame.symbols.ref(name)
800 if frame.loop_frame:
801 self.writeline(f"_loop_vars[{name!r}] = {ref}")
802 return
803 if frame.block_frame:
804 self.writeline(f"_block_vars[{name!r}] = {ref}")
805 return
806 self.writeline(f"context.vars[{name!r}] = {ref}")
807 else:
808 if frame.loop_frame:
809 self.writeline("_loop_vars.update({")
810 elif frame.block_frame:
811 self.writeline("_block_vars.update({")
812 else:
813 self.writeline("context.vars.update({")
814 for idx, name in enumerate(sorted(vars)):
815 if idx:
816 self.write(", ")
817 ref = frame.symbols.ref(name)
818 self.write(f"{name!r}: {ref}")
819 self.write("})")
820 if not frame.block_frame and not frame.loop_frame and public_names:
821 if len(public_names) == 1:
822 self.writeline(f"context.exported_vars.add({public_names[0]!r})")
823 else:
824 names_str = ", ".join(map(repr, sorted(public_names)))
825 self.writeline(f"context.exported_vars.update(({names_str}))")
826
827 # -- Statement Visitors
828
829 def visit_Template(
830 self, node: nodes.Template, frame: t.Optional[Frame] = None
831 ) -> None:
832 assert frame is None, "no root frame allowed"
833 eval_ctx = EvalContext(self.environment, self.name)
834
835 from .runtime import async_exported
836 from .runtime import exported
837
838 if self.environment.is_async:
839 exported_names = sorted(exported + async_exported)
840 else:
841 exported_names = sorted(exported)
842
843 self.writeline("from jinja2.runtime import " + ", ".join(exported_names))
844
845 # if we want a deferred initialization we cannot move the
846 # environment into a local name
847 envenv = "" if self.defer_init else ", environment=environment"
848
849 # do we have an extends tag at all? If not, we can save some
850 # overhead by just not processing any inheritance code.
851 have_extends = node.find(nodes.Extends) is not None
852
853 # find all blocks
854 for block in node.find_all(nodes.Block):
855 if block.name in self.blocks:
856 self.fail(f"block {block.name!r} defined twice", block.lineno)
857 self.blocks[block.name] = block
858
859 # find all imports and import them
860 for import_ in node.find_all(nodes.ImportedName):
861 if import_.importname not in self.import_aliases:
862 imp = import_.importname
863 self.import_aliases[imp] = alias = self.temporary_identifier()
864 if "." in imp:
865 module, obj = imp.rsplit(".", 1)
866 self.writeline(f"from {module} import {obj} as {alias}")
867 else:
868 self.writeline(f"import {imp} as {alias}")
869
870 # add the load name
871 self.writeline(f"name = {self.name!r}")
872
873 # generate the root render function.
874 self.writeline(
875 f"{self.func('root')}(context, missing=missing{envenv}):", extra=1
876 )
877 self.indent()
878 self.write_commons()
879
880 # process the root
881 frame = Frame(eval_ctx)
882 if "self" in find_undeclared(node.body, ("self",)):
883 ref = frame.symbols.declare_parameter("self")
884 self.writeline(f"{ref} = TemplateReference(context)")
885 frame.symbols.analyze_node(node)
886 frame.toplevel = frame.rootlevel = True
887 frame.require_output_check = have_extends and not self.has_known_extends
888 if have_extends:
889 self.writeline("parent_template = None")
890 self.enter_frame(frame)
891 self.pull_dependencies(node.body)
892 self.blockvisit(node.body, frame)
893 self.leave_frame(frame, with_python_scope=True)
894 self.outdent()
895
896 # make sure that the parent root is called.
897 if have_extends:
898 if not self.has_known_extends:
899 self.indent()
900 self.writeline("if parent_template is not None:")
901 self.indent()
902 if not self.environment.is_async:
903 self.writeline("yield from parent_template.root_render_func(context)")
904 else:
905 self.writeline("agen = parent_template.root_render_func(context)")
906 self.writeline("try:")
907 self.indent()
908 self.writeline("async for event in agen:")
909 self.indent()
910 self.writeline("yield event")
911 self.outdent()
912 self.outdent()
913 self.writeline("finally: await agen.aclose()")
914 self.outdent(1 + (not self.has_known_extends))
915
916 # at this point we now have the blocks collected and can visit them too.
917 for name, block in self.blocks.items():
918 self.writeline(
919 f"{self.func('block_' + name)}(context, missing=missing{envenv}):",
920 block,
921 1,
922 )
923 self.indent()
924 self.write_commons()
925 # It's important that we do not make this frame a child of the
926 # toplevel template. This would cause a variety of
927 # interesting issues with identifier tracking.
928 block_frame = Frame(eval_ctx)
929 block_frame.block_frame = True
930 undeclared = find_undeclared(block.body, ("self", "super"))
931 if "self" in undeclared:
932 ref = block_frame.symbols.declare_parameter("self")
933 self.writeline(f"{ref} = TemplateReference(context)")
934 if "super" in undeclared:
935 ref = block_frame.symbols.declare_parameter("super")
936 self.writeline(f"{ref} = context.super({name!r}, block_{name})")
937 block_frame.symbols.analyze_node(block)
938 block_frame.block = name
939 self.writeline("_block_vars = {}")
940 self.enter_frame(block_frame)
941 self.pull_dependencies(block.body)
942 self.blockvisit(block.body, block_frame)
943 self.leave_frame(block_frame, with_python_scope=True)
944 self.outdent()
945
946 blocks_kv_str = ", ".join(f"{x!r}: block_{x}" for x in self.blocks)
947 self.writeline(f"blocks = {{{blocks_kv_str}}}", extra=1)
948 debug_kv_str = "&".join(f"{k}={v}" for k, v in self.debug_info)
949 self.writeline(f"debug_info = {debug_kv_str!r}")
950
951 def visit_Block(self, node: nodes.Block, frame: Frame) -> None:
952 """Call a block and register it for the template."""
953 level = 0
954 if frame.toplevel:
955 # if we know that we are a child template, there is no need to
956 # check if we are one
957 if self.has_known_extends:
958 return
959 if self.extends_so_far > 0:
960 self.writeline("if parent_template is None:")
961 self.indent()
962 level += 1
963
964 if node.scoped:
965 context = self.derive_context(frame)
966 else:
967 context = self.get_context_ref()
968
969 if node.required:
970 self.writeline(f"if len(context.blocks[{node.name!r}]) <= 1:", node)
971 self.indent()
972 self.writeline(
973 f'raise TemplateRuntimeError("Required block {node.name!r} not found")',
974 node,
975 )
976 self.outdent()
977
978 if not self.environment.is_async and frame.buffer is None:
979 self.writeline(
980 f"yield from context.blocks[{node.name!r}][0]({context})", node
981 )
982 else:
983 self.writeline(f"gen = context.blocks[{node.name!r}][0]({context})")
984 self.writeline("try:")
985 self.indent()
986 self.writeline(
987 f"{self.choose_async()}for event in gen:",
988 node,
989 )
990 self.indent()
991 self.simple_write("event", frame)
992 self.outdent()
993 self.outdent()
994 self.writeline(
995 f"finally: {self.choose_async('await gen.aclose()', 'gen.close()')}"
996 )
997
998 self.outdent(level)
999
1000 def visit_Extends(self, node: nodes.Extends, frame: Frame) -> None:
1001 """Calls the extender."""
1002 if not frame.toplevel:
1003 self.fail("cannot use extend from a non top-level scope", node.lineno)
1004
1005 # if the number of extends statements in general is zero so
1006 # far, we don't have to add a check if something extended
1007 # the template before this one.
1008 if self.extends_so_far > 0:
1009 # if we have a known extends we just add a template runtime
1010 # error into the generated code. We could catch that at compile
1011 # time too, but i welcome it not to confuse users by throwing the
1012 # same error at different times just "because we can".
1013 if not self.has_known_extends:
1014 self.writeline("if parent_template is not None:")
1015 self.indent()
1016 self.writeline('raise TemplateRuntimeError("extended multiple times")')
1017
1018 # if we have a known extends already we don't need that code here
1019 # as we know that the template execution will end here.
1020 if self.has_known_extends:
1021 raise CompilerExit()
1022 else:
1023 self.outdent()
1024
1025 self.writeline("parent_template = environment.get_template(", node)
1026 self.visit(node.template, frame)
1027 self.write(f", {self.name!r})")
1028 self.writeline("for name, parent_block in parent_template.blocks.items():")
1029 self.indent()
1030 self.writeline("context.blocks.setdefault(name, []).append(parent_block)")
1031 self.outdent()
1032
1033 # if this extends statement was in the root level we can take
1034 # advantage of that information and simplify the generated code
1035 # in the top level from this point onwards
1036 if frame.rootlevel:
1037 self.has_known_extends = True
1038
1039 # and now we have one more
1040 self.extends_so_far += 1
1041
1042 def visit_Include(self, node: nodes.Include, frame: Frame) -> None:
1043 """Handles includes."""
1044 if node.ignore_missing:
1045 self.writeline("try:")
1046 self.indent()
1047
1048 func_name = "get_or_select_template"
1049 if isinstance(node.template, nodes.Const):
1050 if isinstance(node.template.value, str):
1051 func_name = "get_template"
1052 elif isinstance(node.template.value, (tuple, list)):
1053 func_name = "select_template"
1054 elif isinstance(node.template, (nodes.Tuple, nodes.List)):
1055 func_name = "select_template"
1056
1057 self.writeline(f"template = environment.{func_name}(", node)
1058 self.visit(node.template, frame)
1059 self.write(f", {self.name!r})")
1060 if node.ignore_missing:
1061 self.outdent()
1062 self.writeline("except TemplateNotFound:")
1063 self.indent()
1064 self.writeline("pass")
1065 self.outdent()
1066 self.writeline("else:")
1067 self.indent()
1068
1069 def loop_body() -> None:
1070 self.indent()
1071 self.simple_write("event", frame)
1072 self.outdent()
1073
1074 if node.with_context:
1075 self.writeline(
1076 f"gen = template.root_render_func("
1077 "template.new_context(context.get_all(), True,"
1078 f" {self.dump_local_context(frame)}))"
1079 )
1080 self.writeline("try:")
1081 self.indent()
1082 self.writeline(f"{self.choose_async()}for event in gen:")
1083 loop_body()
1084 self.outdent()
1085 self.writeline(
1086 f"finally: {self.choose_async('await gen.aclose()', 'gen.close()')}"
1087 )
1088 elif self.environment.is_async:
1089 self.writeline(
1090 "for event in (await template._get_default_module_async())"
1091 "._body_stream:"
1092 )
1093 loop_body()
1094 else:
1095 self.writeline("yield from template._get_default_module()._body_stream")
1096
1097 if node.ignore_missing:
1098 self.outdent()
1099
1100 def _import_common(
1101 self, node: t.Union[nodes.Import, nodes.FromImport], frame: Frame
1102 ) -> None:
1103 self.write(f"{self.choose_async('await ')}environment.get_template(")
1104 self.visit(node.template, frame)
1105 self.write(f", {self.name!r}).")
1106
1107 if node.with_context:
1108 f_name = f"make_module{self.choose_async('_async')}"
1109 self.write(
1110 f"{f_name}(context.get_all(), True, {self.dump_local_context(frame)})"
1111 )
1112 else:
1113 self.write(f"_get_default_module{self.choose_async('_async')}(context)")
1114
1115 def visit_Import(self, node: nodes.Import, frame: Frame) -> None:
1116 """Visit regular imports."""
1117 self.writeline(f"{frame.symbols.ref(node.target)} = ", node)
1118 if frame.toplevel:
1119 self.write(f"context.vars[{node.target!r}] = ")
1120
1121 self._import_common(node, frame)
1122
1123 if frame.toplevel and not node.target.startswith("_"):
1124 self.writeline(f"context.exported_vars.discard({node.target!r})")
1125
1126 def visit_FromImport(self, node: nodes.FromImport, frame: Frame) -> None:
1127 """Visit named imports."""
1128 self.newline(node)
1129 self.write("included_template = ")
1130 self._import_common(node, frame)
1131 var_names = []
1132 discarded_names = []
1133 for name in node.names:
1134 if isinstance(name, tuple):
1135 name, alias = name
1136 else:
1137 alias = name
1138 self.writeline(
1139 f"{frame.symbols.ref(alias)} ="
1140 f" getattr(included_template, {name!r}, missing)"
1141 )
1142 self.writeline(f"if {frame.symbols.ref(alias)} is missing:")
1143 self.indent()
1144 # The position will contain the template name, and will be formatted
1145 # into a string that will be compiled into an f-string. Curly braces
1146 # in the name must be replaced with escapes so that they will not be
1147 # executed as part of the f-string.
1148 position = self.position(node).replace("{", "{{").replace("}", "}}")
1149 message = (
1150 "the template {included_template.__name__!r}"
1151 f" (imported on {position})"
1152 f" does not export the requested name {name!r}"
1153 )
1154 self.writeline(
1155 f"{frame.symbols.ref(alias)} = undefined(f{message!r}, name={name!r})"
1156 )
1157 self.outdent()
1158 if frame.toplevel:
1159 var_names.append(alias)
1160 if not alias.startswith("_"):
1161 discarded_names.append(alias)
1162
1163 if var_names:
1164 if len(var_names) == 1:
1165 name = var_names[0]
1166 self.writeline(f"context.vars[{name!r}] = {frame.symbols.ref(name)}")
1167 else:
1168 names_kv = ", ".join(
1169 f"{name!r}: {frame.symbols.ref(name)}" for name in var_names
1170 )
1171 self.writeline(f"context.vars.update({{{names_kv}}})")
1172 if discarded_names:
1173 if len(discarded_names) == 1:
1174 self.writeline(f"context.exported_vars.discard({discarded_names[0]!r})")
1175 else:
1176 names_str = ", ".join(map(repr, discarded_names))
1177 self.writeline(
1178 f"context.exported_vars.difference_update(({names_str}))"
1179 )
1180
1181 def visit_For(self, node: nodes.For, frame: Frame) -> None:
1182 loop_frame = frame.inner()
1183 loop_frame.loop_frame = True
1184 test_frame = frame.inner()
1185 else_frame = frame.inner()
1186
1187 # try to figure out if we have an extended loop. An extended loop
1188 # is necessary if the loop is in recursive mode if the special loop
1189 # variable is accessed in the body if the body is a scoped block.
1190 extended_loop = (
1191 node.recursive
1192 or "loop"
1193 in find_undeclared(node.iter_child_nodes(only=("body",)), ("loop",))
1194 or any(block.scoped for block in node.find_all(nodes.Block))
1195 )
1196
1197 loop_ref = None
1198 if extended_loop:
1199 loop_ref = loop_frame.symbols.declare_parameter("loop")
1200
1201 loop_frame.symbols.analyze_node(node, for_branch="body")
1202 if node.else_:
1203 else_frame.symbols.analyze_node(node, for_branch="else")
1204
1205 if node.test:
1206 loop_filter_func = self.temporary_identifier()
1207 test_frame.symbols.analyze_node(node, for_branch="test")
1208 self.writeline(f"{self.func(loop_filter_func)}(fiter):", node.test)
1209 self.indent()
1210 self.enter_frame(test_frame)
1211 self.writeline(self.choose_async("async for ", "for "))
1212 self.visit(node.target, loop_frame)
1213 self.write(" in ")
1214 self.write(self.choose_async("auto_aiter(fiter)", "fiter"))
1215 self.write(":")
1216 self.indent()
1217 self.writeline("if ", node.test)
1218 self.visit(node.test, test_frame)
1219 self.write(":")
1220 self.indent()
1221 self.writeline("yield ")
1222 self.visit(node.target, loop_frame)
1223 self.outdent(3)
1224 self.leave_frame(test_frame, with_python_scope=True)
1225
1226 # if we don't have an recursive loop we have to find the shadowed
1227 # variables at that point. Because loops can be nested but the loop
1228 # variable is a special one we have to enforce aliasing for it.
1229 if node.recursive:
1230 self.writeline(
1231 f"{self.func('loop')}(reciter, loop_render_func, depth=0):", node
1232 )
1233 self.indent()
1234 self.buffer(loop_frame)
1235
1236 # Use the same buffer for the else frame
1237 else_frame.buffer = loop_frame.buffer
1238
1239 # make sure the loop variable is a special one and raise a template
1240 # assertion error if a loop tries to write to loop
1241 if extended_loop:
1242 self.writeline(f"{loop_ref} = missing")
1243
1244 for name in node.find_all(nodes.Name):
1245 if name.ctx == "store" and name.name == "loop":
1246 self.fail(
1247 "Can't assign to special loop variable in for-loop target",
1248 name.lineno,
1249 )
1250
1251 if node.else_:
1252 iteration_indicator = self.temporary_identifier()
1253 self.writeline(f"{iteration_indicator} = 1")
1254
1255 self.writeline(self.choose_async("async for ", "for "), node)
1256 self.visit(node.target, loop_frame)
1257 if extended_loop:
1258 self.write(f", {loop_ref} in {self.choose_async('Async')}LoopContext(")
1259 else:
1260 self.write(" in ")
1261
1262 if node.test:
1263 self.write(f"{loop_filter_func}(")
1264 if node.recursive:
1265 self.write("reciter")
1266 else:
1267 if self.environment.is_async and not extended_loop:
1268 self.write("auto_aiter(")
1269 self.visit(node.iter, frame)
1270 if self.environment.is_async and not extended_loop:
1271 self.write(")")
1272 if node.test:
1273 self.write(")")
1274
1275 if node.recursive:
1276 self.write(", undefined, loop_render_func, depth):")
1277 else:
1278 self.write(", undefined):" if extended_loop else ":")
1279
1280 self.indent()
1281 self.enter_frame(loop_frame)
1282
1283 self.writeline("_loop_vars = {}")
1284 self.blockvisit(node.body, loop_frame)
1285 if node.else_:
1286 self.writeline(f"{iteration_indicator} = 0")
1287 self.outdent()
1288 self.leave_frame(
1289 loop_frame, with_python_scope=node.recursive and not node.else_
1290 )
1291
1292 if node.else_:
1293 self.writeline(f"if {iteration_indicator}:")
1294 self.indent()
1295 self.enter_frame(else_frame)
1296 self.blockvisit(node.else_, else_frame)
1297 self.leave_frame(else_frame)
1298 self.outdent()
1299
1300 # if the node was recursive we have to return the buffer contents
1301 # and start the iteration code
1302 if node.recursive:
1303 self.return_buffer_contents(loop_frame)
1304 self.outdent()
1305 self.start_write(frame, node)
1306 self.write(f"{self.choose_async('await ')}loop(")
1307 if self.environment.is_async:
1308 self.write("auto_aiter(")
1309 self.visit(node.iter, frame)
1310 if self.environment.is_async:
1311 self.write(")")
1312 self.write(", loop)")
1313 self.end_write(frame)
1314
1315 # at the end of the iteration, clear any assignments made in the
1316 # loop from the top level
1317 if self._assign_stack:
1318 self._assign_stack[-1].difference_update(loop_frame.symbols.stores)
1319
1320 def visit_If(self, node: nodes.If, frame: Frame) -> None:
1321 if_frame = frame.soft()
1322 self.writeline("if ", node)
1323 self.visit(node.test, if_frame)
1324 self.write(":")
1325 self.indent()
1326 self.blockvisit(node.body, if_frame)
1327 self.outdent()
1328 for elif_ in node.elif_:
1329 self.writeline("elif ", elif_)
1330 self.visit(elif_.test, if_frame)
1331 self.write(":")
1332 self.indent()
1333 self.blockvisit(elif_.body, if_frame)
1334 self.outdent()
1335 if node.else_:
1336 self.writeline("else:")
1337 self.indent()
1338 self.blockvisit(node.else_, if_frame)
1339 self.outdent()
1340
1341 def visit_Macro(self, node: nodes.Macro, frame: Frame) -> None:
1342 macro_frame, macro_ref = self.macro_body(node, frame)
1343 self.newline()
1344 if frame.toplevel:
1345 if not node.name.startswith("_"):
1346 self.write(f"context.exported_vars.add({node.name!r})")
1347 self.writeline(f"context.vars[{node.name!r}] = ")
1348 self.write(f"{frame.symbols.ref(node.name)} = ")
1349 self.macro_def(macro_ref, macro_frame)
1350
1351 def visit_CallBlock(self, node: nodes.CallBlock, frame: Frame) -> None:
1352 call_frame, macro_ref = self.macro_body(node, frame)
1353 self.writeline("caller = ")
1354 self.macro_def(macro_ref, call_frame)
1355 self.start_write(frame, node)
1356 self.visit_Call(node.call, frame, forward_caller=True)
1357 self.end_write(frame)
1358
1359 def visit_FilterBlock(self, node: nodes.FilterBlock, frame: Frame) -> None:
1360 filter_frame = frame.inner()
1361 filter_frame.symbols.analyze_node(node)
1362 self.enter_frame(filter_frame)
1363 self.buffer(filter_frame)
1364 self.blockvisit(node.body, filter_frame)
1365 self.start_write(frame, node)
1366 self.visit_Filter(node.filter, filter_frame)
1367 self.end_write(frame)
1368 self.leave_frame(filter_frame)
1369
1370 def visit_With(self, node: nodes.With, frame: Frame) -> None:
1371 with_frame = frame.inner()
1372 with_frame.symbols.analyze_node(node)
1373 self.enter_frame(with_frame)
1374 for target, expr in zip(node.targets, node.values):
1375 self.newline()
1376 self.visit(target, with_frame)
1377 self.write(" = ")
1378 self.visit(expr, frame)
1379 self.blockvisit(node.body, with_frame)
1380 self.leave_frame(with_frame)
1381
1382 def visit_ExprStmt(self, node: nodes.ExprStmt, frame: Frame) -> None:
1383 self.newline(node)
1384 self.visit(node.node, frame)
1385
1386 class _FinalizeInfo(t.NamedTuple):
1387 const: t.Optional[t.Callable[..., str]]
1388 src: t.Optional[str]
1389
1390 @staticmethod
1391 def _default_finalize(value: t.Any) -> t.Any:
1392 """The default finalize function if the environment isn't
1393 configured with one. Or, if the environment has one, this is
1394 called on that function's output for constants.
1395 """
1396 return str(value)
1397
1398 _finalize: t.Optional[_FinalizeInfo] = None
1399
1400 def _make_finalize(self) -> _FinalizeInfo:
1401 """Build the finalize function to be used on constants and at
1402 runtime. Cached so it's only created once for all output nodes.
1403
1404 Returns a ``namedtuple`` with the following attributes:
1405
1406 ``const``
1407 A function to finalize constant data at compile time.
1408
1409 ``src``
1410 Source code to output around nodes to be evaluated at
1411 runtime.
1412 """
1413 if self._finalize is not None:
1414 return self._finalize
1415
1416 finalize: t.Optional[t.Callable[..., t.Any]]
1417 finalize = default = self._default_finalize
1418 src = None
1419
1420 if self.environment.finalize:
1421 src = "environment.finalize("
1422 env_finalize = self.environment.finalize
1423 pass_arg = {
1424 _PassArg.context: "context",
1425 _PassArg.eval_context: "context.eval_ctx",
1426 _PassArg.environment: "environment",
1427 }.get(
1428 _PassArg.from_obj(env_finalize) # type: ignore
1429 )
1430 finalize = None
1431
1432 if pass_arg is None:
1433
1434 def finalize(value: t.Any) -> t.Any: # noqa: F811
1435 return default(env_finalize(value))
1436
1437 else:
1438 src = f"{src}{pass_arg}, "
1439
1440 if pass_arg == "environment":
1441
1442 def finalize(value: t.Any) -> t.Any: # noqa: F811
1443 return default(env_finalize(self.environment, value))
1444
1445 self._finalize = self._FinalizeInfo(finalize, src)
1446 return self._finalize
1447
1448 def _output_const_repr(self, group: t.Iterable[t.Any]) -> str:
1449 """Given a group of constant values converted from ``Output``
1450 child nodes, produce a string to write to the template module
1451 source.
1452 """
1453 return repr(concat(group))
1454
1455 def _output_child_to_const(
1456 self, node: nodes.Expr, frame: Frame, finalize: _FinalizeInfo
1457 ) -> str:
1458 """Try to optimize a child of an ``Output`` node by trying to
1459 convert it to constant, finalized data at compile time.
1460
1461 If :exc:`Impossible` is raised, the node is not constant and
1462 will be evaluated at runtime. Any other exception will also be
1463 evaluated at runtime for easier debugging.
1464 """
1465 const = node.as_const(frame.eval_ctx)
1466
1467 if frame.eval_ctx.autoescape:
1468 const = escape(const)
1469
1470 # Template data doesn't go through finalize.
1471 if isinstance(node, nodes.TemplateData):
1472 return str(const)
1473
1474 return finalize.const(const) # type: ignore
1475
1476 def _output_child_pre(
1477 self, node: nodes.Expr, frame: Frame, finalize: _FinalizeInfo
1478 ) -> None:
1479 """Output extra source code before visiting a child of an
1480 ``Output`` node.
1481 """
1482 if frame.eval_ctx.volatile:
1483 self.write("(escape if context.eval_ctx.autoescape else str)(")
1484 elif frame.eval_ctx.autoescape:
1485 self.write("escape(")
1486 else:
1487 self.write("str(")
1488
1489 if finalize.src is not None:
1490 self.write(finalize.src)
1491
1492 def _output_child_post(
1493 self, node: nodes.Expr, frame: Frame, finalize: _FinalizeInfo
1494 ) -> None:
1495 """Output extra source code after visiting a child of an
1496 ``Output`` node.
1497 """
1498 self.write(")")
1499
1500 if finalize.src is not None:
1501 self.write(")")
1502
1503 def visit_Output(self, node: nodes.Output, frame: Frame) -> None:
1504 # If an extends is active, don't render outside a block.
1505 if frame.require_output_check:
1506 # A top-level extends is known to exist at compile time.
1507 if self.has_known_extends:
1508 return
1509
1510 self.writeline("if parent_template is None:")
1511 self.indent()
1512
1513 finalize = self._make_finalize()
1514 body: t.List[t.Union[t.List[t.Any], nodes.Expr]] = []
1515
1516 # Evaluate constants at compile time if possible. Each item in
1517 # body will be either a list of static data or a node to be
1518 # evaluated at runtime.
1519 for child in node.nodes:
1520 try:
1521 if not (
1522 # If the finalize function requires runtime context,
1523 # constants can't be evaluated at compile time.
1524 finalize.const
1525 # Unless it's basic template data that won't be
1526 # finalized anyway.
1527 or isinstance(child, nodes.TemplateData)
1528 ):
1529 raise nodes.Impossible()
1530
1531 const = self._output_child_to_const(child, frame, finalize)
1532 except (nodes.Impossible, Exception):
1533 # The node was not constant and needs to be evaluated at
1534 # runtime. Or another error was raised, which is easier
1535 # to debug at runtime.
1536 body.append(child)
1537 continue
1538
1539 if body and isinstance(body[-1], list):
1540 body[-1].append(const)
1541 else:
1542 body.append([const])
1543
1544 if frame.buffer is not None:
1545 if len(body) == 1:
1546 self.writeline(f"{frame.buffer}.append(")
1547 else:
1548 self.writeline(f"{frame.buffer}.extend((")
1549
1550 self.indent()
1551
1552 for item in body:
1553 if isinstance(item, list):
1554 # A group of constant data to join and output.
1555 val = self._output_const_repr(item)
1556
1557 if frame.buffer is None:
1558 self.writeline("yield " + val)
1559 else:
1560 self.writeline(val + ",")
1561 else:
1562 if frame.buffer is None:
1563 self.writeline("yield ", item)
1564 else:
1565 self.newline(item)
1566
1567 # A node to be evaluated at runtime.
1568 self._output_child_pre(item, frame, finalize)
1569 self.visit(item, frame)
1570 self._output_child_post(item, frame, finalize)
1571
1572 if frame.buffer is not None:
1573 self.write(",")
1574
1575 if frame.buffer is not None:
1576 self.outdent()
1577 self.writeline(")" if len(body) == 1 else "))")
1578
1579 if frame.require_output_check:
1580 self.outdent()
1581
1582 def visit_Assign(self, node: nodes.Assign, frame: Frame) -> None:
1583 self.push_assign_tracking()
1584
1585 # ``a.b`` is allowed for assignment, and is parsed as an NSRef. However,
1586 # it is only valid if it references a Namespace object. Emit a check for
1587 # that for each ref here, before assignment code is emitted. This can't
1588 # be done in visit_NSRef as the ref could be in the middle of a tuple.
1589 seen_refs: t.Set[str] = set()
1590
1591 for nsref in node.find_all(nodes.NSRef):
1592 if nsref.name in seen_refs:
1593 # Only emit the check for each reference once, in case the same
1594 # ref is used multiple times in a tuple, `ns.a, ns.b = c, d`.
1595 continue
1596
1597 seen_refs.add(nsref.name)
1598 ref = frame.symbols.ref(nsref.name)
1599 self.writeline(f"if not isinstance({ref}, Namespace):")
1600 self.indent()
1601 self.writeline(
1602 "raise TemplateRuntimeError"
1603 '("cannot assign attribute on non-namespace object")'
1604 )
1605 self.outdent()
1606
1607 self.newline(node)
1608 self.visit(node.target, frame)
1609 self.write(" = ")
1610 self.visit(node.node, frame)
1611 self.pop_assign_tracking(frame)
1612
1613 def visit_AssignBlock(self, node: nodes.AssignBlock, frame: Frame) -> None:
1614 self.push_assign_tracking()
1615 block_frame = frame.inner()
1616 # This is a special case. Since a set block always captures we
1617 # will disable output checks. This way one can use set blocks
1618 # toplevel even in extended templates.
1619 block_frame.require_output_check = False
1620 block_frame.symbols.analyze_node(node)
1621 self.enter_frame(block_frame)
1622 self.buffer(block_frame)
1623 self.blockvisit(node.body, block_frame)
1624 self.newline(node)
1625 self.visit(node.target, frame)
1626 self.write(" = (Markup if context.eval_ctx.autoescape else identity)(")
1627 if node.filter is not None:
1628 self.visit_Filter(node.filter, block_frame)
1629 else:
1630 self.write(f"concat({block_frame.buffer})")
1631 self.write(")")
1632 self.pop_assign_tracking(frame)
1633 self.leave_frame(block_frame)
1634
1635 # -- Expression Visitors
1636
1637 def visit_Name(self, node: nodes.Name, frame: Frame) -> None:
1638 if node.ctx == "store" and (
1639 frame.toplevel or frame.loop_frame or frame.block_frame
1640 ):
1641 if self._assign_stack:
1642 self._assign_stack[-1].add(node.name)
1643 ref = frame.symbols.ref(node.name)
1644
1645 # If we are looking up a variable we might have to deal with the
1646 # case where it's undefined. We can skip that case if the load
1647 # instruction indicates a parameter which are always defined.
1648 if node.ctx == "load":
1649 load = frame.symbols.find_load(ref)
1650 if not (
1651 load is not None
1652 and load[0] == VAR_LOAD_PARAMETER
1653 and not self.parameter_is_undeclared(ref)
1654 ):
1655 self.write(
1656 f"(undefined(name={node.name!r}) if {ref} is missing else {ref})"
1657 )
1658 return
1659
1660 self.write(ref)
1661
1662 def visit_NSRef(self, node: nodes.NSRef, frame: Frame) -> None:
1663 # NSRef is a dotted assignment target a.b=c, but uses a[b]=c internally.
1664 # visit_Assign emits code to validate that each ref is to a Namespace
1665 # object only. That can't be emitted here as the ref could be in the
1666 # middle of a tuple assignment.
1667 ref = frame.symbols.ref(node.name)
1668 self.writeline(f"{ref}[{node.attr!r}]")
1669
1670 def visit_Const(self, node: nodes.Const, frame: Frame) -> None:
1671 val = node.as_const(frame.eval_ctx)
1672 if isinstance(val, float):
1673 self.write(str(val))
1674 else:
1675 self.write(repr(val))
1676
1677 def visit_TemplateData(self, node: nodes.TemplateData, frame: Frame) -> None:
1678 try:
1679 self.write(repr(node.as_const(frame.eval_ctx)))
1680 except nodes.Impossible:
1681 self.write(
1682 f"(Markup if context.eval_ctx.autoescape else identity)({node.data!r})"
1683 )
1684
1685 def visit_Tuple(self, node: nodes.Tuple, frame: Frame) -> None:
1686 self.write("(")
1687 idx = -1
1688 for idx, item in enumerate(node.items):
1689 if idx:
1690 self.write(", ")
1691 self.visit(item, frame)
1692 self.write(",)" if idx == 0 else ")")
1693
1694 def visit_List(self, node: nodes.List, frame: Frame) -> None:
1695 self.write("[")
1696 for idx, item in enumerate(node.items):
1697 if idx:
1698 self.write(", ")
1699 self.visit(item, frame)
1700 self.write("]")
1701
1702 def visit_Dict(self, node: nodes.Dict, frame: Frame) -> None:
1703 self.write("{")
1704 for idx, item in enumerate(node.items):
1705 if idx:
1706 self.write(", ")
1707 self.visit(item.key, frame)
1708 self.write(": ")
1709 self.visit(item.value, frame)
1710 self.write("}")
1711
1712 visit_Add = _make_binop("+")
1713 visit_Sub = _make_binop("-")
1714 visit_Mul = _make_binop("*")
1715 visit_Div = _make_binop("/")
1716 visit_FloorDiv = _make_binop("//")
1717 visit_Pow = _make_binop("**")
1718 visit_Mod = _make_binop("%")
1719 visit_And = _make_binop("and")
1720 visit_Or = _make_binop("or")
1721 visit_Pos = _make_unop("+")
1722 visit_Neg = _make_unop("-")
1723 visit_Not = _make_unop("not ")
1724
1725 @optimizeconst
1726 def visit_Concat(self, node: nodes.Concat, frame: Frame) -> None:
1727 if frame.eval_ctx.volatile:
1728 func_name = "(markup_join if context.eval_ctx.volatile else str_join)"
1729 elif frame.eval_ctx.autoescape:
1730 func_name = "markup_join"
1731 else:
1732 func_name = "str_join"
1733 self.write(f"{func_name}((")
1734 for arg in node.nodes:
1735 self.visit(arg, frame)
1736 self.write(", ")
1737 self.write("))")
1738
1739 @optimizeconst
1740 def visit_Compare(self, node: nodes.Compare, frame: Frame) -> None:
1741 self.write("(")
1742 self.visit(node.expr, frame)
1743 for op in node.ops:
1744 self.visit(op, frame)
1745 self.write(")")
1746
1747 def visit_Operand(self, node: nodes.Operand, frame: Frame) -> None:
1748 self.write(f" {operators[node.op]} ")
1749 self.visit(node.expr, frame)
1750
1751 @optimizeconst
1752 def visit_Getattr(self, node: nodes.Getattr, frame: Frame) -> None:
1753 if self.environment.is_async:
1754 self.write("(await auto_await(")
1755
1756 self.write("environment.getattr(")
1757 self.visit(node.node, frame)
1758 self.write(f", {node.attr!r})")
1759
1760 if self.environment.is_async:
1761 self.write("))")
1762
1763 @optimizeconst
1764 def visit_Getitem(self, node: nodes.Getitem, frame: Frame) -> None:
1765 # slices bypass the environment getitem method.
1766 if isinstance(node.arg, nodes.Slice):
1767 self.visit(node.node, frame)
1768 self.write("[")
1769 self.visit(node.arg, frame)
1770 self.write("]")
1771 else:
1772 if self.environment.is_async:
1773 self.write("(await auto_await(")
1774
1775 self.write("environment.getitem(")
1776 self.visit(node.node, frame)
1777 self.write(", ")
1778 self.visit(node.arg, frame)
1779 self.write(")")
1780
1781 if self.environment.is_async:
1782 self.write("))")
1783
1784 def visit_Slice(self, node: nodes.Slice, frame: Frame) -> None:
1785 if node.start is not None:
1786 self.visit(node.start, frame)
1787 self.write(":")
1788 if node.stop is not None:
1789 self.visit(node.stop, frame)
1790 if node.step is not None:
1791 self.write(":")
1792 self.visit(node.step, frame)
1793
1794 @contextmanager
1795 def _filter_test_common(
1796 self, node: t.Union[nodes.Filter, nodes.Test], frame: Frame, is_filter: bool
1797 ) -> t.Iterator[None]:
1798 if self.environment.is_async:
1799 self.write("(await auto_await(")
1800
1801 if is_filter:
1802 self.write(f"{self.filters[node.name]}(")
1803 func = self.environment.filters.get(node.name)
1804 else:
1805 self.write(f"{self.tests[node.name]}(")
1806 func = self.environment.tests.get(node.name)
1807
1808 # When inside an If or CondExpr frame, allow the filter to be
1809 # undefined at compile time and only raise an error if it's
1810 # actually called at runtime. See pull_dependencies.
1811 if func is None and not frame.soft_frame:
1812 type_name = "filter" if is_filter else "test"
1813 self.fail(f"No {type_name} named {node.name!r}.", node.lineno)
1814
1815 pass_arg = {
1816 _PassArg.context: "context",
1817 _PassArg.eval_context: "context.eval_ctx",
1818 _PassArg.environment: "environment",
1819 }.get(
1820 _PassArg.from_obj(func) # type: ignore
1821 )
1822
1823 if pass_arg is not None:
1824 self.write(f"{pass_arg}, ")
1825
1826 # Back to the visitor function to handle visiting the target of
1827 # the filter or test.
1828 yield
1829
1830 self.signature(node, frame)
1831 self.write(")")
1832
1833 if self.environment.is_async:
1834 self.write("))")
1835
1836 @optimizeconst
1837 def visit_Filter(self, node: nodes.Filter, frame: Frame) -> None:
1838 with self._filter_test_common(node, frame, True):
1839 # if the filter node is None we are inside a filter block
1840 # and want to write to the current buffer
1841 if node.node is not None:
1842 self.visit(node.node, frame)
1843 elif frame.eval_ctx.volatile:
1844 self.write(
1845 f"(Markup(concat({frame.buffer}))"
1846 f" if context.eval_ctx.autoescape else concat({frame.buffer}))"
1847 )
1848 elif frame.eval_ctx.autoescape:
1849 self.write(f"Markup(concat({frame.buffer}))")
1850 else:
1851 self.write(f"concat({frame.buffer})")
1852
1853 @optimizeconst
1854 def visit_Test(self, node: nodes.Test, frame: Frame) -> None:
1855 with self._filter_test_common(node, frame, False):
1856 self.visit(node.node, frame)
1857
1858 @optimizeconst
1859 def visit_CondExpr(self, node: nodes.CondExpr, frame: Frame) -> None:
1860 frame = frame.soft()
1861
1862 def write_expr2() -> None:
1863 if node.expr2 is not None:
1864 self.visit(node.expr2, frame)
1865 return
1866
1867 self.write(
1868 f'cond_expr_undefined("the inline if-expression on'
1869 f" {self.position(node)} evaluated to false and no else"
1870 f' section was defined.")'
1871 )
1872
1873 self.write("(")
1874 self.visit(node.expr1, frame)
1875 self.write(" if ")
1876 self.visit(node.test, frame)
1877 self.write(" else ")
1878 write_expr2()
1879 self.write(")")
1880
1881 @optimizeconst
1882 def visit_Call(
1883 self, node: nodes.Call, frame: Frame, forward_caller: bool = False
1884 ) -> None:
1885 if self.environment.is_async:
1886 self.write("(await auto_await(")
1887 if self.environment.sandboxed:
1888 self.write("environment.call(context, ")
1889 else:
1890 self.write("context.call(")
1891 self.visit(node.node, frame)
1892 extra_kwargs = {"caller": "caller"} if forward_caller else None
1893 loop_kwargs = {"_loop_vars": "_loop_vars"} if frame.loop_frame else {}
1894 block_kwargs = {"_block_vars": "_block_vars"} if frame.block_frame else {}
1895 if extra_kwargs:
1896 extra_kwargs.update(loop_kwargs, **block_kwargs)
1897 elif loop_kwargs or block_kwargs:
1898 extra_kwargs = dict(loop_kwargs, **block_kwargs)
1899 self.signature(node, frame, extra_kwargs)
1900 self.write(")")
1901 if self.environment.is_async:
1902 self.write("))")
1903
1904 def visit_Keyword(self, node: nodes.Keyword, frame: Frame) -> None:
1905 self.write(node.key + "=")
1906 self.visit(node.value, frame)
1907
1908 # -- Unused nodes for extensions
1909
1910 def visit_MarkSafe(self, node: nodes.MarkSafe, frame: Frame) -> None:
1911 self.write("Markup(")
1912 self.visit(node.expr, frame)
1913 self.write(")")
1914
1915 def visit_MarkSafeIfAutoescape(
1916 self, node: nodes.MarkSafeIfAutoescape, frame: Frame
1917 ) -> None:
1918 self.write("(Markup if context.eval_ctx.autoescape else identity)(")
1919 self.visit(node.expr, frame)
1920 self.write(")")
1921
1922 def visit_EnvironmentAttribute(
1923 self, node: nodes.EnvironmentAttribute, frame: Frame
1924 ) -> None:
1925 self.write("environment." + node.name)
1926
1927 def visit_ExtensionAttribute(
1928 self, node: nodes.ExtensionAttribute, frame: Frame
1929 ) -> None:
1930 self.write(f"environment.extensions[{node.identifier!r}].{node.name}")
1931
1932 def visit_ImportedName(self, node: nodes.ImportedName, frame: Frame) -> None:
1933 self.write(self.import_aliases[node.importname])
1934
1935 def visit_InternalName(self, node: nodes.InternalName, frame: Frame) -> None:
1936 self.write(node.name)
1937
1938 def visit_ContextReference(
1939 self, node: nodes.ContextReference, frame: Frame
1940 ) -> None:
1941 self.write("context")
1942
1943 def visit_DerivedContextReference(
1944 self, node: nodes.DerivedContextReference, frame: Frame
1945 ) -> None:
1946 self.write(self.derive_context(frame))
1947
1948 def visit_Continue(self, node: nodes.Continue, frame: Frame) -> None:
1949 self.writeline("continue", node)
1950
1951 def visit_Break(self, node: nodes.Break, frame: Frame) -> None:
1952 self.writeline("break", node)
1953
1954 def visit_Scope(self, node: nodes.Scope, frame: Frame) -> None:
1955 scope_frame = frame.inner()
1956 scope_frame.symbols.analyze_node(node)
1957 self.enter_frame(scope_frame)
1958 self.blockvisit(node.body, scope_frame)
1959 self.leave_frame(scope_frame)
1960
1961 def visit_OverlayScope(self, node: nodes.OverlayScope, frame: Frame) -> None:
1962 ctx = self.temporary_identifier()
1963 self.writeline(f"{ctx} = {self.derive_context(frame)}")
1964 self.writeline(f"{ctx}.vars = ")
1965 self.visit(node.context, frame)
1966 self.push_context_reference(ctx)
1967
1968 scope_frame = frame.inner(isolated=True)
1969 scope_frame.symbols.analyze_node(node)
1970 self.enter_frame(scope_frame)
1971 self.blockvisit(node.body, scope_frame)
1972 self.leave_frame(scope_frame)
1973 self.pop_context_reference()
1974
1975 def visit_EvalContextModifier(
1976 self, node: nodes.EvalContextModifier, frame: Frame
1977 ) -> None:
1978 for keyword in node.options:
1979 self.writeline(f"context.eval_ctx.{keyword.key} = ")
1980 self.visit(keyword.value, frame)
1981 try:
1982 val = keyword.value.as_const(frame.eval_ctx)
1983 except nodes.Impossible:
1984 frame.eval_ctx.volatile = True
1985 else:
1986 setattr(frame.eval_ctx, keyword.key, val)
1987
1988 def visit_ScopedEvalContextModifier(
1989 self, node: nodes.ScopedEvalContextModifier, frame: Frame
1990 ) -> None:
1991 old_ctx_name = self.temporary_identifier()
1992 saved_ctx = frame.eval_ctx.save()
1993 self.writeline(f"{old_ctx_name} = context.eval_ctx.save()")
1994 self.visit_EvalContextModifier(node, frame)
1995 for child in node.body:
1996 self.visit(child, frame)
1997 frame.eval_ctx.revert(saved_ctx)
1998 self.writeline(f"context.eval_ctx.revert({old_ctx_name})")