1"""Classes for managing templates and their runtime and compile time
2options.
3"""
4
5import os
6import typing
7import typing as t
8import weakref
9from collections import ChainMap
10from functools import lru_cache
11from functools import partial
12from functools import reduce
13from types import CodeType
14
15from markupsafe import Markup
16
17from . import nodes
18from .compiler import CodeGenerator
19from .compiler import generate
20from .defaults import BLOCK_END_STRING
21from .defaults import BLOCK_START_STRING
22from .defaults import COMMENT_END_STRING
23from .defaults import COMMENT_START_STRING
24from .defaults import DEFAULT_FILTERS # type: ignore[attr-defined]
25from .defaults import DEFAULT_NAMESPACE
26from .defaults import DEFAULT_POLICIES
27from .defaults import DEFAULT_TESTS # type: ignore[attr-defined]
28from .defaults import KEEP_TRAILING_NEWLINE
29from .defaults import LINE_COMMENT_PREFIX
30from .defaults import LINE_STATEMENT_PREFIX
31from .defaults import LSTRIP_BLOCKS
32from .defaults import NEWLINE_SEQUENCE
33from .defaults import TRIM_BLOCKS
34from .defaults import VARIABLE_END_STRING
35from .defaults import VARIABLE_START_STRING
36from .exceptions import TemplateNotFound
37from .exceptions import TemplateRuntimeError
38from .exceptions import TemplatesNotFound
39from .exceptions import TemplateSyntaxError
40from .exceptions import UndefinedError
41from .lexer import get_lexer
42from .lexer import Lexer
43from .lexer import TokenStream
44from .nodes import EvalContext
45from .parser import Parser
46from .runtime import Context
47from .runtime import new_context
48from .runtime import Undefined
49from .utils import _PassArg
50from .utils import concat
51from .utils import consume
52from .utils import import_string
53from .utils import internalcode
54from .utils import LRUCache
55from .utils import missing
56
57if t.TYPE_CHECKING:
58 import typing_extensions as te
59
60 from .bccache import BytecodeCache
61 from .ext import Extension
62 from .loaders import BaseLoader
63
64_env_bound = t.TypeVar("_env_bound", bound="Environment")
65
66
67# for direct template usage we have up to ten living environments
68@lru_cache(maxsize=10)
69def get_spontaneous_environment(cls: t.Type[_env_bound], *args: t.Any) -> _env_bound:
70 """Return a new spontaneous environment. A spontaneous environment
71 is used for templates created directly rather than through an
72 existing environment.
73
74 :param cls: Environment class to create.
75 :param args: Positional arguments passed to environment.
76 """
77 env = cls(*args)
78 env.shared = True
79 return env
80
81
82def create_cache(
83 size: int,
84) -> t.Optional[t.MutableMapping[t.Tuple["weakref.ref[t.Any]", str], "Template"]]:
85 """Return the cache class for the given size."""
86 if size == 0:
87 return None
88
89 if size < 0:
90 return {}
91
92 return LRUCache(size) # type: ignore
93
94
95def copy_cache(
96 cache: t.Optional[t.MutableMapping[t.Any, t.Any]],
97) -> t.Optional[t.MutableMapping[t.Tuple["weakref.ref[t.Any]", str], "Template"]]:
98 """Create an empty copy of the given cache."""
99 if cache is None:
100 return None
101
102 if type(cache) is dict: # noqa E721
103 return {}
104
105 return LRUCache(cache.capacity) # type: ignore
106
107
108def load_extensions(
109 environment: "Environment",
110 extensions: t.Sequence[t.Union[str, t.Type["Extension"]]],
111) -> t.Dict[str, "Extension"]:
112 """Load the extensions from the list and bind it to the environment.
113 Returns a dict of instantiated extensions.
114 """
115 result = {}
116
117 for extension in extensions:
118 if isinstance(extension, str):
119 extension = t.cast(t.Type["Extension"], import_string(extension))
120
121 result[extension.identifier] = extension(environment)
122
123 return result
124
125
126def _environment_config_check(environment: "Environment") -> "Environment":
127 """Perform a sanity check on the environment."""
128 assert issubclass(
129 environment.undefined, Undefined
130 ), "'undefined' must be a subclass of 'jinja2.Undefined'."
131 assert (
132 environment.block_start_string
133 != environment.variable_start_string
134 != environment.comment_start_string
135 ), "block, variable and comment start strings must be different."
136 assert environment.newline_sequence in {
137 "\r",
138 "\r\n",
139 "\n",
140 }, "'newline_sequence' must be one of '\\n', '\\r\\n', or '\\r'."
141 return environment
142
143
144class Environment:
145 r"""The core component of Jinja is the `Environment`. It contains
146 important shared variables like configuration, filters, tests,
147 globals and others. Instances of this class may be modified if
148 they are not shared and if no template was loaded so far.
149 Modifications on environments after the first template was loaded
150 will lead to surprising effects and undefined behavior.
151
152 Here are the possible initialization parameters:
153
154 `block_start_string`
155 The string marking the beginning of a block. Defaults to ``'{%'``.
156
157 `block_end_string`
158 The string marking the end of a block. Defaults to ``'%}'``.
159
160 `variable_start_string`
161 The string marking the beginning of a print statement.
162 Defaults to ``'{{'``.
163
164 `variable_end_string`
165 The string marking the end of a print statement. Defaults to
166 ``'}}'``.
167
168 `comment_start_string`
169 The string marking the beginning of a comment. Defaults to ``'{#'``.
170
171 `comment_end_string`
172 The string marking the end of a comment. Defaults to ``'#}'``.
173
174 `line_statement_prefix`
175 If given and a string, this will be used as prefix for line based
176 statements. See also :ref:`line-statements`.
177
178 `line_comment_prefix`
179 If given and a string, this will be used as prefix for line based
180 comments. See also :ref:`line-statements`.
181
182 .. versionadded:: 2.2
183
184 `trim_blocks`
185 If this is set to ``True`` the first newline after a block is
186 removed (block, not variable tag!). Defaults to `False`.
187
188 `lstrip_blocks`
189 If this is set to ``True`` leading spaces and tabs are stripped
190 from the start of a line to a block. Defaults to `False`.
191
192 `newline_sequence`
193 The sequence that starts a newline. Must be one of ``'\r'``,
194 ``'\n'`` or ``'\r\n'``. The default is ``'\n'`` which is a
195 useful default for Linux and OS X systems as well as web
196 applications.
197
198 `keep_trailing_newline`
199 Preserve the trailing newline when rendering templates.
200 The default is ``False``, which causes a single newline,
201 if present, to be stripped from the end of the template.
202
203 .. versionadded:: 2.7
204
205 `extensions`
206 List of Jinja extensions to use. This can either be import paths
207 as strings or extension classes. For more information have a
208 look at :ref:`the extensions documentation <jinja-extensions>`.
209
210 `optimized`
211 should the optimizer be enabled? Default is ``True``.
212
213 `undefined`
214 :class:`Undefined` or a subclass of it that is used to represent
215 undefined values in the template.
216
217 `finalize`
218 A callable that can be used to process the result of a variable
219 expression before it is output. For example one can convert
220 ``None`` implicitly into an empty string here.
221
222 `autoescape`
223 If set to ``True`` the XML/HTML autoescaping feature is enabled by
224 default. For more details about autoescaping see
225 :class:`~markupsafe.Markup`. As of Jinja 2.4 this can also
226 be a callable that is passed the template name and has to
227 return ``True`` or ``False`` depending on autoescape should be
228 enabled by default.
229
230 .. versionchanged:: 2.4
231 `autoescape` can now be a function
232
233 `loader`
234 The template loader for this environment.
235
236 `cache_size`
237 The size of the cache. Per default this is ``400`` which means
238 that if more than 400 templates are loaded the loader will clean
239 out the least recently used template. If the cache size is set to
240 ``0`` templates are recompiled all the time, if the cache size is
241 ``-1`` the cache will not be cleaned.
242
243 .. versionchanged:: 2.8
244 The cache size was increased to 400 from a low 50.
245
246 `auto_reload`
247 Some loaders load templates from locations where the template
248 sources may change (ie: file system or database). If
249 ``auto_reload`` is set to ``True`` (default) every time a template is
250 requested the loader checks if the source changed and if yes, it
251 will reload the template. For higher performance it's possible to
252 disable that.
253
254 `bytecode_cache`
255 If set to a bytecode cache object, this object will provide a
256 cache for the internal Jinja bytecode so that templates don't
257 have to be parsed if they were not changed.
258
259 See :ref:`bytecode-cache` for more information.
260
261 `enable_async`
262 If set to true this enables async template execution which
263 allows using async functions and generators.
264 """
265
266 #: if this environment is sandboxed. Modifying this variable won't make
267 #: the environment sandboxed though. For a real sandboxed environment
268 #: have a look at jinja2.sandbox. This flag alone controls the code
269 #: generation by the compiler.
270 sandboxed = False
271
272 #: True if the environment is just an overlay
273 overlayed = False
274
275 #: the environment this environment is linked to if it is an overlay
276 linked_to: t.Optional["Environment"] = None
277
278 #: shared environments have this set to `True`. A shared environment
279 #: must not be modified
280 shared = False
281
282 #: the class that is used for code generation. See
283 #: :class:`~jinja2.compiler.CodeGenerator` for more information.
284 code_generator_class: t.Type["CodeGenerator"] = CodeGenerator
285
286 concat = "".join
287
288 #: the context class that is used for templates. See
289 #: :class:`~jinja2.runtime.Context` for more information.
290 context_class: t.Type[Context] = Context
291
292 template_class: t.Type["Template"]
293
294 def __init__(
295 self,
296 block_start_string: str = BLOCK_START_STRING,
297 block_end_string: str = BLOCK_END_STRING,
298 variable_start_string: str = VARIABLE_START_STRING,
299 variable_end_string: str = VARIABLE_END_STRING,
300 comment_start_string: str = COMMENT_START_STRING,
301 comment_end_string: str = COMMENT_END_STRING,
302 line_statement_prefix: t.Optional[str] = LINE_STATEMENT_PREFIX,
303 line_comment_prefix: t.Optional[str] = LINE_COMMENT_PREFIX,
304 trim_blocks: bool = TRIM_BLOCKS,
305 lstrip_blocks: bool = LSTRIP_BLOCKS,
306 newline_sequence: "te.Literal['\\n', '\\r\\n', '\\r']" = NEWLINE_SEQUENCE,
307 keep_trailing_newline: bool = KEEP_TRAILING_NEWLINE,
308 extensions: t.Sequence[t.Union[str, t.Type["Extension"]]] = (),
309 optimized: bool = True,
310 undefined: t.Type[Undefined] = Undefined,
311 finalize: t.Optional[t.Callable[..., t.Any]] = None,
312 autoescape: t.Union[bool, t.Callable[[t.Optional[str]], bool]] = False,
313 loader: t.Optional["BaseLoader"] = None,
314 cache_size: int = 400,
315 auto_reload: bool = True,
316 bytecode_cache: t.Optional["BytecodeCache"] = None,
317 enable_async: bool = False,
318 ):
319 # !!Important notice!!
320 # The constructor accepts quite a few arguments that should be
321 # passed by keyword rather than position. However it's important to
322 # not change the order of arguments because it's used at least
323 # internally in those cases:
324 # - spontaneous environments (i18n extension and Template)
325 # - unittests
326 # If parameter changes are required only add parameters at the end
327 # and don't change the arguments (or the defaults!) of the arguments
328 # existing already.
329
330 # lexer / parser information
331 self.block_start_string = block_start_string
332 self.block_end_string = block_end_string
333 self.variable_start_string = variable_start_string
334 self.variable_end_string = variable_end_string
335 self.comment_start_string = comment_start_string
336 self.comment_end_string = comment_end_string
337 self.line_statement_prefix = line_statement_prefix
338 self.line_comment_prefix = line_comment_prefix
339 self.trim_blocks = trim_blocks
340 self.lstrip_blocks = lstrip_blocks
341 self.newline_sequence = newline_sequence
342 self.keep_trailing_newline = keep_trailing_newline
343
344 # runtime information
345 self.undefined: t.Type[Undefined] = undefined
346 self.optimized = optimized
347 self.finalize = finalize
348 self.autoescape = autoescape
349
350 # defaults
351 self.filters = DEFAULT_FILTERS.copy()
352 self.tests = DEFAULT_TESTS.copy()
353 self.globals = DEFAULT_NAMESPACE.copy()
354
355 # set the loader provided
356 self.loader = loader
357 self.cache = create_cache(cache_size)
358 self.bytecode_cache = bytecode_cache
359 self.auto_reload = auto_reload
360
361 # configurable policies
362 self.policies = DEFAULT_POLICIES.copy()
363
364 # load extensions
365 self.extensions = load_extensions(self, extensions)
366
367 self.is_async = enable_async
368 _environment_config_check(self)
369
370 def add_extension(self, extension: t.Union[str, t.Type["Extension"]]) -> None:
371 """Adds an extension after the environment was created.
372
373 .. versionadded:: 2.5
374 """
375 self.extensions.update(load_extensions(self, [extension]))
376
377 def extend(self, **attributes: t.Any) -> None:
378 """Add the items to the instance of the environment if they do not exist
379 yet. This is used by :ref:`extensions <writing-extensions>` to register
380 callbacks and configuration values without breaking inheritance.
381 """
382 for key, value in attributes.items():
383 if not hasattr(self, key):
384 setattr(self, key, value)
385
386 def overlay(
387 self,
388 block_start_string: str = missing,
389 block_end_string: str = missing,
390 variable_start_string: str = missing,
391 variable_end_string: str = missing,
392 comment_start_string: str = missing,
393 comment_end_string: str = missing,
394 line_statement_prefix: t.Optional[str] = missing,
395 line_comment_prefix: t.Optional[str] = missing,
396 trim_blocks: bool = missing,
397 lstrip_blocks: bool = missing,
398 newline_sequence: "te.Literal['\\n', '\\r\\n', '\\r']" = missing,
399 keep_trailing_newline: bool = missing,
400 extensions: t.Sequence[t.Union[str, t.Type["Extension"]]] = missing,
401 optimized: bool = missing,
402 undefined: t.Type[Undefined] = missing,
403 finalize: t.Optional[t.Callable[..., t.Any]] = missing,
404 autoescape: t.Union[bool, t.Callable[[t.Optional[str]], bool]] = missing,
405 loader: t.Optional["BaseLoader"] = missing,
406 cache_size: int = missing,
407 auto_reload: bool = missing,
408 bytecode_cache: t.Optional["BytecodeCache"] = missing,
409 enable_async: bool = False,
410 ) -> "Environment":
411 """Create a new overlay environment that shares all the data with the
412 current environment except for cache and the overridden attributes.
413 Extensions cannot be removed for an overlayed environment. An overlayed
414 environment automatically gets all the extensions of the environment it
415 is linked to plus optional extra extensions.
416
417 Creating overlays should happen after the initial environment was set
418 up completely. Not all attributes are truly linked, some are just
419 copied over so modifications on the original environment may not shine
420 through.
421
422 .. versionchanged:: 3.1.2
423 Added the ``newline_sequence``,, ``keep_trailing_newline``,
424 and ``enable_async`` parameters to match ``__init__``.
425 """
426 args = dict(locals())
427 del args["self"], args["cache_size"], args["extensions"], args["enable_async"]
428
429 rv = object.__new__(self.__class__)
430 rv.__dict__.update(self.__dict__)
431 rv.overlayed = True
432 rv.linked_to = self
433
434 for key, value in args.items():
435 if value is not missing:
436 setattr(rv, key, value)
437
438 if cache_size is not missing:
439 rv.cache = create_cache(cache_size)
440 else:
441 rv.cache = copy_cache(self.cache)
442
443 rv.extensions = {}
444 for key, value in self.extensions.items():
445 rv.extensions[key] = value.bind(rv)
446 if extensions is not missing:
447 rv.extensions.update(load_extensions(rv, extensions))
448
449 if enable_async is not missing:
450 rv.is_async = enable_async
451
452 return _environment_config_check(rv)
453
454 @property
455 def lexer(self) -> Lexer:
456 """The lexer for this environment."""
457 return get_lexer(self)
458
459 def iter_extensions(self) -> t.Iterator["Extension"]:
460 """Iterates over the extensions by priority."""
461 return iter(sorted(self.extensions.values(), key=lambda x: x.priority))
462
463 def getitem(
464 self, obj: t.Any, argument: t.Union[str, t.Any]
465 ) -> t.Union[t.Any, Undefined]:
466 """Get an item or attribute of an object but prefer the item."""
467 try:
468 return obj[argument]
469 except (AttributeError, TypeError, LookupError):
470 if isinstance(argument, str):
471 try:
472 attr = str(argument)
473 except Exception:
474 pass
475 else:
476 try:
477 return getattr(obj, attr)
478 except AttributeError:
479 pass
480 return self.undefined(obj=obj, name=argument)
481
482 def getattr(self, obj: t.Any, attribute: str) -> t.Any:
483 """Get an item or attribute of an object but prefer the attribute.
484 Unlike :meth:`getitem` the attribute *must* be a string.
485 """
486 try:
487 return getattr(obj, attribute)
488 except AttributeError:
489 pass
490 try:
491 return obj[attribute]
492 except (TypeError, LookupError, AttributeError):
493 return self.undefined(obj=obj, name=attribute)
494
495 def _filter_test_common(
496 self,
497 name: t.Union[str, Undefined],
498 value: t.Any,
499 args: t.Optional[t.Sequence[t.Any]],
500 kwargs: t.Optional[t.Mapping[str, t.Any]],
501 context: t.Optional[Context],
502 eval_ctx: t.Optional[EvalContext],
503 is_filter: bool,
504 ) -> t.Any:
505 if is_filter:
506 env_map = self.filters
507 type_name = "filter"
508 else:
509 env_map = self.tests
510 type_name = "test"
511
512 func = env_map.get(name) # type: ignore
513
514 if func is None:
515 msg = f"No {type_name} named {name!r}."
516
517 if isinstance(name, Undefined):
518 try:
519 name._fail_with_undefined_error()
520 except Exception as e:
521 msg = f"{msg} ({e}; did you forget to quote the callable name?)"
522
523 raise TemplateRuntimeError(msg)
524
525 args = [value, *(args if args is not None else ())]
526 kwargs = kwargs if kwargs is not None else {}
527 pass_arg = _PassArg.from_obj(func)
528
529 if pass_arg is _PassArg.context:
530 if context is None:
531 raise TemplateRuntimeError(
532 f"Attempted to invoke a context {type_name} without context."
533 )
534
535 args.insert(0, context)
536 elif pass_arg is _PassArg.eval_context:
537 if eval_ctx is None:
538 if context is not None:
539 eval_ctx = context.eval_ctx
540 else:
541 eval_ctx = EvalContext(self)
542
543 args.insert(0, eval_ctx)
544 elif pass_arg is _PassArg.environment:
545 args.insert(0, self)
546
547 return func(*args, **kwargs)
548
549 def call_filter(
550 self,
551 name: str,
552 value: t.Any,
553 args: t.Optional[t.Sequence[t.Any]] = None,
554 kwargs: t.Optional[t.Mapping[str, t.Any]] = None,
555 context: t.Optional[Context] = None,
556 eval_ctx: t.Optional[EvalContext] = None,
557 ) -> t.Any:
558 """Invoke a filter on a value the same way the compiler does.
559
560 This might return a coroutine if the filter is running from an
561 environment in async mode and the filter supports async
562 execution. It's your responsibility to await this if needed.
563
564 .. versionadded:: 2.7
565 """
566 return self._filter_test_common(
567 name, value, args, kwargs, context, eval_ctx, True
568 )
569
570 def call_test(
571 self,
572 name: str,
573 value: t.Any,
574 args: t.Optional[t.Sequence[t.Any]] = None,
575 kwargs: t.Optional[t.Mapping[str, t.Any]] = None,
576 context: t.Optional[Context] = None,
577 eval_ctx: t.Optional[EvalContext] = None,
578 ) -> t.Any:
579 """Invoke a test on a value the same way the compiler does.
580
581 This might return a coroutine if the test is running from an
582 environment in async mode and the test supports async execution.
583 It's your responsibility to await this if needed.
584
585 .. versionchanged:: 3.0
586 Tests support ``@pass_context``, etc. decorators. Added
587 the ``context`` and ``eval_ctx`` parameters.
588
589 .. versionadded:: 2.7
590 """
591 return self._filter_test_common(
592 name, value, args, kwargs, context, eval_ctx, False
593 )
594
595 @internalcode
596 def parse(
597 self,
598 source: str,
599 name: t.Optional[str] = None,
600 filename: t.Optional[str] = None,
601 ) -> nodes.Template:
602 """Parse the sourcecode and return the abstract syntax tree. This
603 tree of nodes is used by the compiler to convert the template into
604 executable source- or bytecode. This is useful for debugging or to
605 extract information from templates.
606
607 If you are :ref:`developing Jinja extensions <writing-extensions>`
608 this gives you a good overview of the node tree generated.
609 """
610 try:
611 return self._parse(source, name, filename)
612 except TemplateSyntaxError:
613 self.handle_exception(source=source)
614
615 def _parse(
616 self, source: str, name: t.Optional[str], filename: t.Optional[str]
617 ) -> nodes.Template:
618 """Internal parsing function used by `parse` and `compile`."""
619 return Parser(self, source, name, filename).parse()
620
621 def lex(
622 self,
623 source: str,
624 name: t.Optional[str] = None,
625 filename: t.Optional[str] = None,
626 ) -> t.Iterator[t.Tuple[int, str, str]]:
627 """Lex the given sourcecode and return a generator that yields
628 tokens as tuples in the form ``(lineno, token_type, value)``.
629 This can be useful for :ref:`extension development <writing-extensions>`
630 and debugging templates.
631
632 This does not perform preprocessing. If you want the preprocessing
633 of the extensions to be applied you have to filter source through
634 the :meth:`preprocess` method.
635 """
636 source = str(source)
637 try:
638 return self.lexer.tokeniter(source, name, filename)
639 except TemplateSyntaxError:
640 self.handle_exception(source=source)
641
642 def preprocess(
643 self,
644 source: str,
645 name: t.Optional[str] = None,
646 filename: t.Optional[str] = None,
647 ) -> str:
648 """Preprocesses the source with all extensions. This is automatically
649 called for all parsing and compiling methods but *not* for :meth:`lex`
650 because there you usually only want the actual source tokenized.
651 """
652 return reduce(
653 lambda s, e: e.preprocess(s, name, filename),
654 self.iter_extensions(),
655 str(source),
656 )
657
658 def _tokenize(
659 self,
660 source: str,
661 name: t.Optional[str],
662 filename: t.Optional[str] = None,
663 state: t.Optional[str] = None,
664 ) -> TokenStream:
665 """Called by the parser to do the preprocessing and filtering
666 for all the extensions. Returns a :class:`~jinja2.lexer.TokenStream`.
667 """
668 source = self.preprocess(source, name, filename)
669 stream = self.lexer.tokenize(source, name, filename, state)
670
671 for ext in self.iter_extensions():
672 stream = ext.filter_stream(stream) # type: ignore
673
674 if not isinstance(stream, TokenStream):
675 stream = TokenStream(stream, name, filename)
676
677 return stream
678
679 def _generate(
680 self,
681 source: nodes.Template,
682 name: t.Optional[str],
683 filename: t.Optional[str],
684 defer_init: bool = False,
685 ) -> str:
686 """Internal hook that can be overridden to hook a different generate
687 method in.
688
689 .. versionadded:: 2.5
690 """
691 return generate( # type: ignore
692 source,
693 self,
694 name,
695 filename,
696 defer_init=defer_init,
697 optimized=self.optimized,
698 )
699
700 def _compile(self, source: str, filename: str) -> CodeType:
701 """Internal hook that can be overridden to hook a different compile
702 method in.
703
704 .. versionadded:: 2.5
705 """
706 return compile(source, filename, "exec")
707
708 @typing.overload
709 def compile( # type: ignore
710 self,
711 source: t.Union[str, nodes.Template],
712 name: t.Optional[str] = None,
713 filename: t.Optional[str] = None,
714 raw: "te.Literal[False]" = False,
715 defer_init: bool = False,
716 ) -> CodeType: ...
717
718 @typing.overload
719 def compile(
720 self,
721 source: t.Union[str, nodes.Template],
722 name: t.Optional[str] = None,
723 filename: t.Optional[str] = None,
724 raw: "te.Literal[True]" = ...,
725 defer_init: bool = False,
726 ) -> str: ...
727
728 @internalcode
729 def compile(
730 self,
731 source: t.Union[str, nodes.Template],
732 name: t.Optional[str] = None,
733 filename: t.Optional[str] = None,
734 raw: bool = False,
735 defer_init: bool = False,
736 ) -> t.Union[str, CodeType]:
737 """Compile a node or template source code. The `name` parameter is
738 the load name of the template after it was joined using
739 :meth:`join_path` if necessary, not the filename on the file system.
740 the `filename` parameter is the estimated filename of the template on
741 the file system. If the template came from a database or memory this
742 can be omitted.
743
744 The return value of this method is a python code object. If the `raw`
745 parameter is `True` the return value will be a string with python
746 code equivalent to the bytecode returned otherwise. This method is
747 mainly used internally.
748
749 `defer_init` is use internally to aid the module code generator. This
750 causes the generated code to be able to import without the global
751 environment variable to be set.
752
753 .. versionadded:: 2.4
754 `defer_init` parameter added.
755 """
756 source_hint = None
757 try:
758 if isinstance(source, str):
759 source_hint = source
760 source = self._parse(source, name, filename)
761 source = self._generate(source, name, filename, defer_init=defer_init)
762 if raw:
763 return source
764 if filename is None:
765 filename = "<template>"
766 return self._compile(source, filename)
767 except TemplateSyntaxError:
768 self.handle_exception(source=source_hint)
769
770 def compile_expression(
771 self, source: str, undefined_to_none: bool = True
772 ) -> "TemplateExpression":
773 """A handy helper method that returns a callable that accepts keyword
774 arguments that appear as variables in the expression. If called it
775 returns the result of the expression.
776
777 This is useful if applications want to use the same rules as Jinja
778 in template "configuration files" or similar situations.
779
780 Example usage:
781
782 >>> env = Environment()
783 >>> expr = env.compile_expression('foo == 42')
784 >>> expr(foo=23)
785 False
786 >>> expr(foo=42)
787 True
788
789 Per default the return value is converted to `None` if the
790 expression returns an undefined value. This can be changed
791 by setting `undefined_to_none` to `False`.
792
793 >>> env.compile_expression('var')() is None
794 True
795 >>> env.compile_expression('var', undefined_to_none=False)()
796 Undefined
797
798 .. versionadded:: 2.1
799 """
800 parser = Parser(self, source, state="variable")
801 try:
802 expr = parser.parse_expression()
803 if not parser.stream.eos:
804 raise TemplateSyntaxError(
805 "chunk after expression", parser.stream.current.lineno, None, None
806 )
807 expr.set_environment(self)
808 except TemplateSyntaxError:
809 self.handle_exception(source=source)
810
811 body = [nodes.Assign(nodes.Name("result", "store"), expr, lineno=1)]
812 template = self.from_string(nodes.Template(body, lineno=1))
813 return TemplateExpression(template, undefined_to_none)
814
815 def compile_templates(
816 self,
817 target: t.Union[str, "os.PathLike[str]"],
818 extensions: t.Optional[t.Collection[str]] = None,
819 filter_func: t.Optional[t.Callable[[str], bool]] = None,
820 zip: t.Optional[str] = "deflated",
821 log_function: t.Optional[t.Callable[[str], None]] = None,
822 ignore_errors: bool = True,
823 ) -> None:
824 """Finds all the templates the loader can find, compiles them
825 and stores them in `target`. If `zip` is `None`, instead of in a
826 zipfile, the templates will be stored in a directory.
827 By default a deflate zip algorithm is used. To switch to
828 the stored algorithm, `zip` can be set to ``'stored'``.
829
830 `extensions` and `filter_func` are passed to :meth:`list_templates`.
831 Each template returned will be compiled to the target folder or
832 zipfile.
833
834 By default template compilation errors are ignored. In case a
835 log function is provided, errors are logged. If you want template
836 syntax errors to abort the compilation you can set `ignore_errors`
837 to `False` and you will get an exception on syntax errors.
838
839 .. versionadded:: 2.4
840 """
841 from .loaders import ModuleLoader
842
843 if log_function is None:
844
845 def log_function(x: str) -> None:
846 pass
847
848 assert log_function is not None
849 assert self.loader is not None, "No loader configured."
850
851 def write_file(filename: str, data: str) -> None:
852 if zip:
853 info = ZipInfo(filename)
854 info.external_attr = 0o755 << 16
855 zip_file.writestr(info, data)
856 else:
857 with open(os.path.join(target, filename), "wb") as f:
858 f.write(data.encode("utf8"))
859
860 if zip is not None:
861 from zipfile import ZIP_DEFLATED
862 from zipfile import ZIP_STORED
863 from zipfile import ZipFile
864 from zipfile import ZipInfo
865
866 zip_file = ZipFile(
867 target, "w", dict(deflated=ZIP_DEFLATED, stored=ZIP_STORED)[zip]
868 )
869 log_function(f"Compiling into Zip archive {target!r}")
870 else:
871 if not os.path.isdir(target):
872 os.makedirs(target)
873 log_function(f"Compiling into folder {target!r}")
874
875 try:
876 for name in self.list_templates(extensions, filter_func):
877 source, filename, _ = self.loader.get_source(self, name)
878 try:
879 code = self.compile(source, name, filename, True, True)
880 except TemplateSyntaxError as e:
881 if not ignore_errors:
882 raise
883 log_function(f'Could not compile "{name}": {e}')
884 continue
885
886 filename = ModuleLoader.get_module_filename(name)
887
888 write_file(filename, code)
889 log_function(f'Compiled "{name}" as {filename}')
890 finally:
891 if zip:
892 zip_file.close()
893
894 log_function("Finished compiling templates")
895
896 def list_templates(
897 self,
898 extensions: t.Optional[t.Collection[str]] = None,
899 filter_func: t.Optional[t.Callable[[str], bool]] = None,
900 ) -> t.List[str]:
901 """Returns a list of templates for this environment. This requires
902 that the loader supports the loader's
903 :meth:`~BaseLoader.list_templates` method.
904
905 If there are other files in the template folder besides the
906 actual templates, the returned list can be filtered. There are two
907 ways: either `extensions` is set to a list of file extensions for
908 templates, or a `filter_func` can be provided which is a callable that
909 is passed a template name and should return `True` if it should end up
910 in the result list.
911
912 If the loader does not support that, a :exc:`TypeError` is raised.
913
914 .. versionadded:: 2.4
915 """
916 assert self.loader is not None, "No loader configured."
917 names = self.loader.list_templates()
918
919 if extensions is not None:
920 if filter_func is not None:
921 raise TypeError(
922 "either extensions or filter_func can be passed, but not both"
923 )
924
925 def filter_func(x: str) -> bool:
926 return "." in x and x.rsplit(".", 1)[1] in extensions
927
928 if filter_func is not None:
929 names = [name for name in names if filter_func(name)]
930
931 return names
932
933 def handle_exception(self, source: t.Optional[str] = None) -> "te.NoReturn":
934 """Exception handling helper. This is used internally to either raise
935 rewritten exceptions or return a rendered traceback for the template.
936 """
937 from .debug import rewrite_traceback_stack
938
939 raise rewrite_traceback_stack(source=source)
940
941 def join_path(self, template: str, parent: str) -> str:
942 """Join a template with the parent. By default all the lookups are
943 relative to the loader root so this method returns the `template`
944 parameter unchanged, but if the paths should be relative to the
945 parent template, this function can be used to calculate the real
946 template name.
947
948 Subclasses may override this method and implement template path
949 joining here.
950 """
951 return template
952
953 @internalcode
954 def _load_template(
955 self, name: str, globals: t.Optional[t.MutableMapping[str, t.Any]]
956 ) -> "Template":
957 if self.loader is None:
958 raise TypeError("no loader for this environment specified")
959 cache_key = (weakref.ref(self.loader), name)
960 if self.cache is not None:
961 template = self.cache.get(cache_key)
962 if template is not None and (
963 not self.auto_reload or template.is_up_to_date
964 ):
965 # template.globals is a ChainMap, modifying it will only
966 # affect the template, not the environment globals.
967 if globals:
968 template.globals.update(globals)
969
970 return template
971
972 template = self.loader.load(self, name, self.make_globals(globals))
973
974 if self.cache is not None:
975 self.cache[cache_key] = template
976 return template
977
978 @internalcode
979 def get_template(
980 self,
981 name: t.Union[str, "Template"],
982 parent: t.Optional[str] = None,
983 globals: t.Optional[t.MutableMapping[str, t.Any]] = None,
984 ) -> "Template":
985 """Load a template by name with :attr:`loader` and return a
986 :class:`Template`. If the template does not exist a
987 :exc:`TemplateNotFound` exception is raised.
988
989 :param name: Name of the template to load. When loading
990 templates from the filesystem, "/" is used as the path
991 separator, even on Windows.
992 :param parent: The name of the parent template importing this
993 template. :meth:`join_path` can be used to implement name
994 transformations with this.
995 :param globals: Extend the environment :attr:`globals` with
996 these extra variables available for all renders of this
997 template. If the template has already been loaded and
998 cached, its globals are updated with any new items.
999
1000 .. versionchanged:: 3.0
1001 If a template is loaded from cache, ``globals`` will update
1002 the template's globals instead of ignoring the new values.
1003
1004 .. versionchanged:: 2.4
1005 If ``name`` is a :class:`Template` object it is returned
1006 unchanged.
1007 """
1008 if isinstance(name, Template):
1009 return name
1010 if parent is not None:
1011 name = self.join_path(name, parent)
1012
1013 return self._load_template(name, globals)
1014
1015 @internalcode
1016 def select_template(
1017 self,
1018 names: t.Iterable[t.Union[str, "Template"]],
1019 parent: t.Optional[str] = None,
1020 globals: t.Optional[t.MutableMapping[str, t.Any]] = None,
1021 ) -> "Template":
1022 """Like :meth:`get_template`, but tries loading multiple names.
1023 If none of the names can be loaded a :exc:`TemplatesNotFound`
1024 exception is raised.
1025
1026 :param names: List of template names to try loading in order.
1027 :param parent: The name of the parent template importing this
1028 template. :meth:`join_path` can be used to implement name
1029 transformations with this.
1030 :param globals: Extend the environment :attr:`globals` with
1031 these extra variables available for all renders of this
1032 template. If the template has already been loaded and
1033 cached, its globals are updated with any new items.
1034
1035 .. versionchanged:: 3.0
1036 If a template is loaded from cache, ``globals`` will update
1037 the template's globals instead of ignoring the new values.
1038
1039 .. versionchanged:: 2.11
1040 If ``names`` is :class:`Undefined`, an :exc:`UndefinedError`
1041 is raised instead. If no templates were found and ``names``
1042 contains :class:`Undefined`, the message is more helpful.
1043
1044 .. versionchanged:: 2.4
1045 If ``names`` contains a :class:`Template` object it is
1046 returned unchanged.
1047
1048 .. versionadded:: 2.3
1049 """
1050 if isinstance(names, Undefined):
1051 names._fail_with_undefined_error()
1052
1053 if not names:
1054 raise TemplatesNotFound(
1055 message="Tried to select from an empty list of templates."
1056 )
1057
1058 for name in names:
1059 if isinstance(name, Template):
1060 return name
1061 if parent is not None:
1062 name = self.join_path(name, parent)
1063 try:
1064 return self._load_template(name, globals)
1065 except (TemplateNotFound, UndefinedError):
1066 pass
1067 raise TemplatesNotFound(names) # type: ignore
1068
1069 @internalcode
1070 def get_or_select_template(
1071 self,
1072 template_name_or_list: t.Union[
1073 str, "Template", t.List[t.Union[str, "Template"]]
1074 ],
1075 parent: t.Optional[str] = None,
1076 globals: t.Optional[t.MutableMapping[str, t.Any]] = None,
1077 ) -> "Template":
1078 """Use :meth:`select_template` if an iterable of template names
1079 is given, or :meth:`get_template` if one name is given.
1080
1081 .. versionadded:: 2.3
1082 """
1083 if isinstance(template_name_or_list, (str, Undefined)):
1084 return self.get_template(template_name_or_list, parent, globals)
1085 elif isinstance(template_name_or_list, Template):
1086 return template_name_or_list
1087 return self.select_template(template_name_or_list, parent, globals)
1088
1089 def from_string(
1090 self,
1091 source: t.Union[str, nodes.Template],
1092 globals: t.Optional[t.MutableMapping[str, t.Any]] = None,
1093 template_class: t.Optional[t.Type["Template"]] = None,
1094 ) -> "Template":
1095 """Load a template from a source string without using
1096 :attr:`loader`.
1097
1098 :param source: Jinja source to compile into a template.
1099 :param globals: Extend the environment :attr:`globals` with
1100 these extra variables available for all renders of this
1101 template. If the template has already been loaded and
1102 cached, its globals are updated with any new items.
1103 :param template_class: Return an instance of this
1104 :class:`Template` class.
1105 """
1106 gs = self.make_globals(globals)
1107 cls = template_class or self.template_class
1108 return cls.from_code(self, self.compile(source), gs, None)
1109
1110 def make_globals(
1111 self, d: t.Optional[t.MutableMapping[str, t.Any]]
1112 ) -> t.MutableMapping[str, t.Any]:
1113 """Make the globals map for a template. Any given template
1114 globals overlay the environment :attr:`globals`.
1115
1116 Returns a :class:`collections.ChainMap`. This allows any changes
1117 to a template's globals to only affect that template, while
1118 changes to the environment's globals are still reflected.
1119 However, avoid modifying any globals after a template is loaded.
1120
1121 :param d: Dict of template-specific globals.
1122
1123 .. versionchanged:: 3.0
1124 Use :class:`collections.ChainMap` to always prevent mutating
1125 environment globals.
1126 """
1127 if d is None:
1128 d = {}
1129
1130 return ChainMap(d, self.globals)
1131
1132
1133class Template:
1134 """A compiled template that can be rendered.
1135
1136 Use the methods on :class:`Environment` to create or load templates.
1137 The environment is used to configure how templates are compiled and
1138 behave.
1139
1140 It is also possible to create a template object directly. This is
1141 not usually recommended. The constructor takes most of the same
1142 arguments as :class:`Environment`. All templates created with the
1143 same environment arguments share the same ephemeral ``Environment``
1144 instance behind the scenes.
1145
1146 A template object should be considered immutable. Modifications on
1147 the object are not supported.
1148 """
1149
1150 #: Type of environment to create when creating a template directly
1151 #: rather than through an existing environment.
1152 environment_class: t.Type[Environment] = Environment
1153
1154 environment: Environment
1155 globals: t.MutableMapping[str, t.Any]
1156 name: t.Optional[str]
1157 filename: t.Optional[str]
1158 blocks: t.Dict[str, t.Callable[[Context], t.Iterator[str]]]
1159 root_render_func: t.Callable[[Context], t.Iterator[str]]
1160 _module: t.Optional["TemplateModule"]
1161 _debug_info: str
1162 _uptodate: t.Optional[t.Callable[[], bool]]
1163
1164 def __new__(
1165 cls,
1166 source: t.Union[str, nodes.Template],
1167 block_start_string: str = BLOCK_START_STRING,
1168 block_end_string: str = BLOCK_END_STRING,
1169 variable_start_string: str = VARIABLE_START_STRING,
1170 variable_end_string: str = VARIABLE_END_STRING,
1171 comment_start_string: str = COMMENT_START_STRING,
1172 comment_end_string: str = COMMENT_END_STRING,
1173 line_statement_prefix: t.Optional[str] = LINE_STATEMENT_PREFIX,
1174 line_comment_prefix: t.Optional[str] = LINE_COMMENT_PREFIX,
1175 trim_blocks: bool = TRIM_BLOCKS,
1176 lstrip_blocks: bool = LSTRIP_BLOCKS,
1177 newline_sequence: "te.Literal['\\n', '\\r\\n', '\\r']" = NEWLINE_SEQUENCE,
1178 keep_trailing_newline: bool = KEEP_TRAILING_NEWLINE,
1179 extensions: t.Sequence[t.Union[str, t.Type["Extension"]]] = (),
1180 optimized: bool = True,
1181 undefined: t.Type[Undefined] = Undefined,
1182 finalize: t.Optional[t.Callable[..., t.Any]] = None,
1183 autoescape: t.Union[bool, t.Callable[[t.Optional[str]], bool]] = False,
1184 enable_async: bool = False,
1185 ) -> t.Any: # it returns a `Template`, but this breaks the sphinx build...
1186 env = get_spontaneous_environment(
1187 cls.environment_class, # type: ignore
1188 block_start_string,
1189 block_end_string,
1190 variable_start_string,
1191 variable_end_string,
1192 comment_start_string,
1193 comment_end_string,
1194 line_statement_prefix,
1195 line_comment_prefix,
1196 trim_blocks,
1197 lstrip_blocks,
1198 newline_sequence,
1199 keep_trailing_newline,
1200 frozenset(extensions),
1201 optimized,
1202 undefined, # type: ignore
1203 finalize,
1204 autoescape,
1205 None,
1206 0,
1207 False,
1208 None,
1209 enable_async,
1210 )
1211 return env.from_string(source, template_class=cls)
1212
1213 @classmethod
1214 def from_code(
1215 cls,
1216 environment: Environment,
1217 code: CodeType,
1218 globals: t.MutableMapping[str, t.Any],
1219 uptodate: t.Optional[t.Callable[[], bool]] = None,
1220 ) -> "Template":
1221 """Creates a template object from compiled code and the globals. This
1222 is used by the loaders and environment to create a template object.
1223 """
1224 namespace = {"environment": environment, "__file__": code.co_filename}
1225 exec(code, namespace)
1226 rv = cls._from_namespace(environment, namespace, globals)
1227 rv._uptodate = uptodate
1228 return rv
1229
1230 @classmethod
1231 def from_module_dict(
1232 cls,
1233 environment: Environment,
1234 module_dict: t.MutableMapping[str, t.Any],
1235 globals: t.MutableMapping[str, t.Any],
1236 ) -> "Template":
1237 """Creates a template object from a module. This is used by the
1238 module loader to create a template object.
1239
1240 .. versionadded:: 2.4
1241 """
1242 return cls._from_namespace(environment, module_dict, globals)
1243
1244 @classmethod
1245 def _from_namespace(
1246 cls,
1247 environment: Environment,
1248 namespace: t.MutableMapping[str, t.Any],
1249 globals: t.MutableMapping[str, t.Any],
1250 ) -> "Template":
1251 t: "Template" = object.__new__(cls)
1252 t.environment = environment
1253 t.globals = globals
1254 t.name = namespace["name"]
1255 t.filename = namespace["__file__"]
1256 t.blocks = namespace["blocks"]
1257
1258 # render function and module
1259 t.root_render_func = namespace["root"]
1260 t._module = None
1261
1262 # debug and loader helpers
1263 t._debug_info = namespace["debug_info"]
1264 t._uptodate = None
1265
1266 # store the reference
1267 namespace["environment"] = environment
1268 namespace["__jinja_template__"] = t
1269
1270 return t
1271
1272 def render(self, *args: t.Any, **kwargs: t.Any) -> str:
1273 """This method accepts the same arguments as the `dict` constructor:
1274 A dict, a dict subclass or some keyword arguments. If no arguments
1275 are given the context will be empty. These two calls do the same::
1276
1277 template.render(knights='that say nih')
1278 template.render({'knights': 'that say nih'})
1279
1280 This will return the rendered template as a string.
1281 """
1282 if self.environment.is_async:
1283 import asyncio
1284
1285 close = False
1286
1287 try:
1288 loop = asyncio.get_running_loop()
1289 except RuntimeError:
1290 loop = asyncio.new_event_loop()
1291 close = True
1292
1293 try:
1294 return loop.run_until_complete(self.render_async(*args, **kwargs))
1295 finally:
1296 if close:
1297 loop.close()
1298
1299 ctx = self.new_context(dict(*args, **kwargs))
1300
1301 try:
1302 return self.environment.concat(self.root_render_func(ctx)) # type: ignore
1303 except Exception:
1304 self.environment.handle_exception()
1305
1306 async def render_async(self, *args: t.Any, **kwargs: t.Any) -> str:
1307 """This works similar to :meth:`render` but returns a coroutine
1308 that when awaited returns the entire rendered template string. This
1309 requires the async feature to be enabled.
1310
1311 Example usage::
1312
1313 await template.render_async(knights='that say nih; asynchronously')
1314 """
1315 if not self.environment.is_async:
1316 raise RuntimeError(
1317 "The environment was not created with async mode enabled."
1318 )
1319
1320 ctx = self.new_context(dict(*args, **kwargs))
1321
1322 try:
1323 return self.environment.concat( # type: ignore
1324 [n async for n in self.root_render_func(ctx)] # type: ignore
1325 )
1326 except Exception:
1327 return self.environment.handle_exception()
1328
1329 def stream(self, *args: t.Any, **kwargs: t.Any) -> "TemplateStream":
1330 """Works exactly like :meth:`generate` but returns a
1331 :class:`TemplateStream`.
1332 """
1333 return TemplateStream(self.generate(*args, **kwargs))
1334
1335 def generate(self, *args: t.Any, **kwargs: t.Any) -> t.Iterator[str]:
1336 """For very large templates it can be useful to not render the whole
1337 template at once but evaluate each statement after another and yield
1338 piece for piece. This method basically does exactly that and returns
1339 a generator that yields one item after another as strings.
1340
1341 It accepts the same arguments as :meth:`render`.
1342 """
1343 if self.environment.is_async:
1344 import asyncio
1345
1346 async def to_list() -> t.List[str]:
1347 return [x async for x in self.generate_async(*args, **kwargs)]
1348
1349 yield from asyncio.run(to_list())
1350 return
1351
1352 ctx = self.new_context(dict(*args, **kwargs))
1353
1354 try:
1355 yield from self.root_render_func(ctx)
1356 except Exception:
1357 yield self.environment.handle_exception()
1358
1359 async def generate_async(
1360 self, *args: t.Any, **kwargs: t.Any
1361 ) -> t.AsyncIterator[str]:
1362 """An async version of :meth:`generate`. Works very similarly but
1363 returns an async iterator instead.
1364 """
1365 if not self.environment.is_async:
1366 raise RuntimeError(
1367 "The environment was not created with async mode enabled."
1368 )
1369
1370 ctx = self.new_context(dict(*args, **kwargs))
1371
1372 try:
1373 async for event in self.root_render_func(ctx): # type: ignore
1374 yield event
1375 except Exception:
1376 yield self.environment.handle_exception()
1377
1378 def new_context(
1379 self,
1380 vars: t.Optional[t.Dict[str, t.Any]] = None,
1381 shared: bool = False,
1382 locals: t.Optional[t.Mapping[str, t.Any]] = None,
1383 ) -> Context:
1384 """Create a new :class:`Context` for this template. The vars
1385 provided will be passed to the template. Per default the globals
1386 are added to the context. If shared is set to `True` the data
1387 is passed as is to the context without adding the globals.
1388
1389 `locals` can be a dict of local variables for internal usage.
1390 """
1391 return new_context(
1392 self.environment, self.name, self.blocks, vars, shared, self.globals, locals
1393 )
1394
1395 def make_module(
1396 self,
1397 vars: t.Optional[t.Dict[str, t.Any]] = None,
1398 shared: bool = False,
1399 locals: t.Optional[t.Mapping[str, t.Any]] = None,
1400 ) -> "TemplateModule":
1401 """This method works like the :attr:`module` attribute when called
1402 without arguments but it will evaluate the template on every call
1403 rather than caching it. It's also possible to provide
1404 a dict which is then used as context. The arguments are the same
1405 as for the :meth:`new_context` method.
1406 """
1407 ctx = self.new_context(vars, shared, locals)
1408 return TemplateModule(self, ctx)
1409
1410 async def make_module_async(
1411 self,
1412 vars: t.Optional[t.Dict[str, t.Any]] = None,
1413 shared: bool = False,
1414 locals: t.Optional[t.Mapping[str, t.Any]] = None,
1415 ) -> "TemplateModule":
1416 """As template module creation can invoke template code for
1417 asynchronous executions this method must be used instead of the
1418 normal :meth:`make_module` one. Likewise the module attribute
1419 becomes unavailable in async mode.
1420 """
1421 ctx = self.new_context(vars, shared, locals)
1422 return TemplateModule(
1423 self,
1424 ctx,
1425 [x async for x in self.root_render_func(ctx)], # type: ignore
1426 )
1427
1428 @internalcode
1429 def _get_default_module(self, ctx: t.Optional[Context] = None) -> "TemplateModule":
1430 """If a context is passed in, this means that the template was
1431 imported. Imported templates have access to the current
1432 template's globals by default, but they can only be accessed via
1433 the context during runtime.
1434
1435 If there are new globals, we need to create a new module because
1436 the cached module is already rendered and will not have access
1437 to globals from the current context. This new module is not
1438 cached because the template can be imported elsewhere, and it
1439 should have access to only the current template's globals.
1440 """
1441 if self.environment.is_async:
1442 raise RuntimeError("Module is not available in async mode.")
1443
1444 if ctx is not None:
1445 keys = ctx.globals_keys - self.globals.keys()
1446
1447 if keys:
1448 return self.make_module({k: ctx.parent[k] for k in keys})
1449
1450 if self._module is None:
1451 self._module = self.make_module()
1452
1453 return self._module
1454
1455 async def _get_default_module_async(
1456 self, ctx: t.Optional[Context] = None
1457 ) -> "TemplateModule":
1458 if ctx is not None:
1459 keys = ctx.globals_keys - self.globals.keys()
1460
1461 if keys:
1462 return await self.make_module_async({k: ctx.parent[k] for k in keys})
1463
1464 if self._module is None:
1465 self._module = await self.make_module_async()
1466
1467 return self._module
1468
1469 @property
1470 def module(self) -> "TemplateModule":
1471 """The template as module. This is used for imports in the
1472 template runtime but is also useful if one wants to access
1473 exported template variables from the Python layer:
1474
1475 >>> t = Template('{% macro foo() %}42{% endmacro %}23')
1476 >>> str(t.module)
1477 '23'
1478 >>> t.module.foo() == u'42'
1479 True
1480
1481 This attribute is not available if async mode is enabled.
1482 """
1483 return self._get_default_module()
1484
1485 def get_corresponding_lineno(self, lineno: int) -> int:
1486 """Return the source line number of a line number in the
1487 generated bytecode as they are not in sync.
1488 """
1489 for template_line, code_line in reversed(self.debug_info):
1490 if code_line <= lineno:
1491 return template_line
1492 return 1
1493
1494 @property
1495 def is_up_to_date(self) -> bool:
1496 """If this variable is `False` there is a newer version available."""
1497 if self._uptodate is None:
1498 return True
1499 return self._uptodate()
1500
1501 @property
1502 def debug_info(self) -> t.List[t.Tuple[int, int]]:
1503 """The debug info mapping."""
1504 if self._debug_info:
1505 return [
1506 tuple(map(int, x.split("="))) # type: ignore
1507 for x in self._debug_info.split("&")
1508 ]
1509
1510 return []
1511
1512 def __repr__(self) -> str:
1513 if self.name is None:
1514 name = f"memory:{id(self):x}"
1515 else:
1516 name = repr(self.name)
1517 return f"<{type(self).__name__} {name}>"
1518
1519
1520class TemplateModule:
1521 """Represents an imported template. All the exported names of the
1522 template are available as attributes on this object. Additionally
1523 converting it into a string renders the contents.
1524 """
1525
1526 def __init__(
1527 self,
1528 template: Template,
1529 context: Context,
1530 body_stream: t.Optional[t.Iterable[str]] = None,
1531 ) -> None:
1532 if body_stream is None:
1533 if context.environment.is_async:
1534 raise RuntimeError(
1535 "Async mode requires a body stream to be passed to"
1536 " a template module. Use the async methods of the"
1537 " API you are using."
1538 )
1539
1540 body_stream = list(template.root_render_func(context))
1541
1542 self._body_stream = body_stream
1543 self.__dict__.update(context.get_exported())
1544 self.__name__ = template.name
1545
1546 def __html__(self) -> Markup:
1547 return Markup(concat(self._body_stream))
1548
1549 def __str__(self) -> str:
1550 return concat(self._body_stream)
1551
1552 def __repr__(self) -> str:
1553 if self.__name__ is None:
1554 name = f"memory:{id(self):x}"
1555 else:
1556 name = repr(self.__name__)
1557 return f"<{type(self).__name__} {name}>"
1558
1559
1560class TemplateExpression:
1561 """The :meth:`jinja2.Environment.compile_expression` method returns an
1562 instance of this object. It encapsulates the expression-like access
1563 to the template with an expression it wraps.
1564 """
1565
1566 def __init__(self, template: Template, undefined_to_none: bool) -> None:
1567 self._template = template
1568 self._undefined_to_none = undefined_to_none
1569
1570 def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.Optional[t.Any]:
1571 context = self._template.new_context(dict(*args, **kwargs))
1572 consume(self._template.root_render_func(context))
1573 rv = context.vars["result"]
1574 if self._undefined_to_none and isinstance(rv, Undefined):
1575 rv = None
1576 return rv
1577
1578
1579class TemplateStream:
1580 """A template stream works pretty much like an ordinary python generator
1581 but it can buffer multiple items to reduce the number of total iterations.
1582 Per default the output is unbuffered which means that for every unbuffered
1583 instruction in the template one string is yielded.
1584
1585 If buffering is enabled with a buffer size of 5, five items are combined
1586 into a new string. This is mainly useful if you are streaming
1587 big templates to a client via WSGI which flushes after each iteration.
1588 """
1589
1590 def __init__(self, gen: t.Iterator[str]) -> None:
1591 self._gen = gen
1592 self.disable_buffering()
1593
1594 def dump(
1595 self,
1596 fp: t.Union[str, t.IO[bytes]],
1597 encoding: t.Optional[str] = None,
1598 errors: t.Optional[str] = "strict",
1599 ) -> None:
1600 """Dump the complete stream into a file or file-like object.
1601 Per default strings are written, if you want to encode
1602 before writing specify an `encoding`.
1603
1604 Example usage::
1605
1606 Template('Hello {{ name }}!').stream(name='foo').dump('hello.html')
1607 """
1608 close = False
1609
1610 if isinstance(fp, str):
1611 if encoding is None:
1612 encoding = "utf-8"
1613
1614 real_fp: t.IO[bytes] = open(fp, "wb")
1615 close = True
1616 else:
1617 real_fp = fp
1618
1619 try:
1620 if encoding is not None:
1621 iterable = (x.encode(encoding, errors) for x in self) # type: ignore
1622 else:
1623 iterable = self # type: ignore
1624
1625 if hasattr(real_fp, "writelines"):
1626 real_fp.writelines(iterable)
1627 else:
1628 for item in iterable:
1629 real_fp.write(item)
1630 finally:
1631 if close:
1632 real_fp.close()
1633
1634 def disable_buffering(self) -> None:
1635 """Disable the output buffering."""
1636 self._next = partial(next, self._gen)
1637 self.buffered = False
1638
1639 def _buffered_generator(self, size: int) -> t.Iterator[str]:
1640 buf: t.List[str] = []
1641 c_size = 0
1642 push = buf.append
1643
1644 while True:
1645 try:
1646 while c_size < size:
1647 c = next(self._gen)
1648 push(c)
1649 if c:
1650 c_size += 1
1651 except StopIteration:
1652 if not c_size:
1653 return
1654 yield concat(buf)
1655 del buf[:]
1656 c_size = 0
1657
1658 def enable_buffering(self, size: int = 5) -> None:
1659 """Enable buffering. Buffer `size` items before yielding them."""
1660 if size <= 1:
1661 raise ValueError("buffer size too small")
1662
1663 self.buffered = True
1664 self._next = partial(next, self._buffered_generator(size))
1665
1666 def __iter__(self) -> "TemplateStream":
1667 return self
1668
1669 def __next__(self) -> str:
1670 return self._next() # type: ignore
1671
1672
1673# hook in default template class. if anyone reads this comment: ignore that
1674# it's possible to use custom templates ;-)
1675Environment.template_class = Template