1import enum
2import json
3import os
4import re
5import typing as t
6from collections import abc
7from collections import deque
8from random import choice
9from random import randrange
10from threading import Lock
11from types import CodeType
12from urllib.parse import quote_from_bytes
13
14import markupsafe
15
16if t.TYPE_CHECKING:
17 import typing_extensions as te
18
19F = t.TypeVar("F", bound=t.Callable[..., t.Any])
20
21
22class _MissingType:
23 def __repr__(self) -> str:
24 return "missing"
25
26 def __reduce__(self) -> str:
27 return "missing"
28
29
30missing: t.Any = _MissingType()
31"""Special singleton representing missing values for the runtime."""
32
33internal_code: t.MutableSet[CodeType] = set()
34
35concat = "".join
36
37
38def pass_context(f: F) -> F:
39 """Pass the :class:`~jinja2.runtime.Context` as the first argument
40 to the decorated function when called while rendering a template.
41
42 Can be used on functions, filters, and tests.
43
44 If only ``Context.eval_context`` is needed, use
45 :func:`pass_eval_context`. If only ``Context.environment`` is
46 needed, use :func:`pass_environment`.
47
48 .. versionadded:: 3.0.0
49 Replaces ``contextfunction`` and ``contextfilter``.
50 """
51 f.jinja_pass_arg = _PassArg.context # type: ignore
52 return f
53
54
55def pass_eval_context(f: F) -> F:
56 """Pass the :class:`~jinja2.nodes.EvalContext` as the first argument
57 to the decorated function when called while rendering a template.
58 See :ref:`eval-context`.
59
60 Can be used on functions, filters, and tests.
61
62 If only ``EvalContext.environment`` is needed, use
63 :func:`pass_environment`.
64
65 .. versionadded:: 3.0.0
66 Replaces ``evalcontextfunction`` and ``evalcontextfilter``.
67 """
68 f.jinja_pass_arg = _PassArg.eval_context # type: ignore
69 return f
70
71
72def pass_environment(f: F) -> F:
73 """Pass the :class:`~jinja2.Environment` as the first argument to
74 the decorated function when called while rendering a template.
75
76 Can be used on functions, filters, and tests.
77
78 .. versionadded:: 3.0.0
79 Replaces ``environmentfunction`` and ``environmentfilter``.
80 """
81 f.jinja_pass_arg = _PassArg.environment # type: ignore
82 return f
83
84
85class _PassArg(enum.Enum):
86 context = enum.auto()
87 eval_context = enum.auto()
88 environment = enum.auto()
89
90 @classmethod
91 def from_obj(cls, obj: F) -> t.Optional["_PassArg"]:
92 if hasattr(obj, "jinja_pass_arg"):
93 return obj.jinja_pass_arg # type: ignore
94
95 return None
96
97
98def internalcode(f: F) -> F:
99 """Marks the function as internally used"""
100 internal_code.add(f.__code__)
101 return f
102
103
104def is_undefined(obj: t.Any) -> bool:
105 """Check if the object passed is undefined. This does nothing more than
106 performing an instance check against :class:`Undefined` but looks nicer.
107 This can be used for custom filters or tests that want to react to
108 undefined variables. For example a custom default filter can look like
109 this::
110
111 def default(var, default=''):
112 if is_undefined(var):
113 return default
114 return var
115 """
116 from .runtime import Undefined
117
118 return isinstance(obj, Undefined)
119
120
121def consume(iterable: t.Iterable[t.Any]) -> None:
122 """Consumes an iterable without doing anything with it."""
123 for _ in iterable:
124 pass
125
126
127def clear_caches() -> None:
128 """Jinja keeps internal caches for environments and lexers. These are
129 used so that Jinja doesn't have to recreate environments and lexers all
130 the time. Normally you don't have to care about that but if you are
131 measuring memory consumption you may want to clean the caches.
132 """
133 from .environment import get_spontaneous_environment
134 from .lexer import _lexer_cache
135
136 get_spontaneous_environment.cache_clear()
137 _lexer_cache.clear()
138
139
140def import_string(import_name: str, silent: bool = False) -> t.Any:
141 """Imports an object based on a string. This is useful if you want to
142 use import paths as endpoints or something similar. An import path can
143 be specified either in dotted notation (``xml.sax.saxutils.escape``)
144 or with a colon as object delimiter (``xml.sax.saxutils:escape``).
145
146 If the `silent` is True the return value will be `None` if the import
147 fails.
148
149 :return: imported object
150 """
151 try:
152 if ":" in import_name:
153 module, obj = import_name.split(":", 1)
154 elif "." in import_name:
155 module, _, obj = import_name.rpartition(".")
156 else:
157 return __import__(import_name)
158 return getattr(__import__(module, None, None, [obj]), obj)
159 except (ImportError, AttributeError):
160 if not silent:
161 raise
162
163
164def open_if_exists(filename: str, mode: str = "rb") -> t.Optional[t.IO[t.Any]]:
165 """Returns a file descriptor for the filename if that file exists,
166 otherwise ``None``.
167 """
168 if not os.path.isfile(filename):
169 return None
170
171 return open(filename, mode)
172
173
174def object_type_repr(obj: t.Any) -> str:
175 """Returns the name of the object's type. For some recognized
176 singletons the name of the object is returned instead. (For
177 example for `None` and `Ellipsis`).
178 """
179 if obj is None:
180 return "None"
181 elif obj is Ellipsis:
182 return "Ellipsis"
183
184 cls = type(obj)
185
186 if cls.__module__ == "builtins":
187 return f"{cls.__name__} object"
188
189 return f"{cls.__module__}.{cls.__name__} object"
190
191
192def pformat(obj: t.Any) -> str:
193 """Format an object using :func:`pprint.pformat`."""
194 from pprint import pformat
195
196 return pformat(obj)
197
198
199_http_re = re.compile(
200 r"""
201 ^
202 (
203 (https?://|www\.) # scheme or www
204 (([\w%-]+\.)+)? # subdomain
205 (
206 [a-z]{2,63} # basic tld
207 |
208 xn--[\w%]{2,59} # idna tld
209 )
210 |
211 ([\w%-]{2,63}\.)+ # basic domain
212 (com|net|int|edu|gov|org|info|mil) # basic tld
213 |
214 (https?://) # scheme
215 (
216 (([\d]{1,3})(\.[\d]{1,3}){3}) # IPv4
217 |
218 (\[([\da-f]{0,4}:){2}([\da-f]{0,4}:?){1,6}]) # IPv6
219 )
220 )
221 (?::[\d]{1,5})? # port
222 (?:[/?#]\S*)? # path, query, and fragment
223 $
224 """,
225 re.IGNORECASE | re.VERBOSE,
226)
227_email_re = re.compile(r"^\S+@\w[\w.-]*\.\w+$")
228
229
230def urlize(
231 text: str,
232 trim_url_limit: t.Optional[int] = None,
233 rel: t.Optional[str] = None,
234 target: t.Optional[str] = None,
235 extra_schemes: t.Optional[t.Iterable[str]] = None,
236) -> str:
237 """Convert URLs in text into clickable links.
238
239 This may not recognize links in some situations. Usually, a more
240 comprehensive formatter, such as a Markdown library, is a better
241 choice.
242
243 Works on ``http://``, ``https://``, ``www.``, ``mailto:``, and email
244 addresses. Links with trailing punctuation (periods, commas, closing
245 parentheses) and leading punctuation (opening parentheses) are
246 recognized excluding the punctuation. Email addresses that include
247 header fields are not recognized (for example,
248 ``mailto:address@example.com?cc=copy@example.com``).
249
250 :param text: Original text containing URLs to link.
251 :param trim_url_limit: Shorten displayed URL values to this length.
252 :param target: Add the ``target`` attribute to links.
253 :param rel: Add the ``rel`` attribute to links.
254 :param extra_schemes: Recognize URLs that start with these schemes
255 in addition to the default behavior.
256
257 .. versionchanged:: 3.0
258 The ``extra_schemes`` parameter was added.
259
260 .. versionchanged:: 3.0
261 Generate ``https://`` links for URLs without a scheme.
262
263 .. versionchanged:: 3.0
264 The parsing rules were updated. Recognize email addresses with
265 or without the ``mailto:`` scheme. Validate IP addresses. Ignore
266 parentheses and brackets in more cases.
267 """
268 if trim_url_limit is not None:
269
270 def trim_url(x: str) -> str:
271 if len(x) > trim_url_limit:
272 return f"{x[:trim_url_limit]}..."
273
274 return x
275
276 else:
277
278 def trim_url(x: str) -> str:
279 return x
280
281 words = re.split(r"(\s+)", str(markupsafe.escape(text)))
282 rel_attr = f' rel="{markupsafe.escape(rel)}"' if rel else ""
283 target_attr = f' target="{markupsafe.escape(target)}"' if target else ""
284
285 for i, word in enumerate(words):
286 head, middle, tail = "", word, ""
287 match = re.match(r"^([(<]|<)+", middle)
288
289 if match:
290 head = match.group()
291 middle = middle[match.end() :]
292
293 # Unlike lead, which is anchored to the start of the string,
294 # need to check that the string ends with any of the characters
295 # before trying to match all of them, to avoid backtracking.
296 if middle.endswith((")", ">", ".", ",", "\n", ">")):
297 match = re.search(r"([)>.,\n]|>)+$", middle)
298
299 if match:
300 tail = match.group()
301 middle = middle[: match.start()]
302
303 # Prefer balancing parentheses in URLs instead of ignoring a
304 # trailing character.
305 for start_char, end_char in ("(", ")"), ("<", ">"), ("<", ">"):
306 start_count = middle.count(start_char)
307
308 if start_count <= middle.count(end_char):
309 # Balanced, or lighter on the left
310 continue
311
312 # Move as many as possible from the tail to balance
313 for _ in range(min(start_count, tail.count(end_char))):
314 end_index = tail.index(end_char) + len(end_char)
315 # Move anything in the tail before the end char too
316 middle += tail[:end_index]
317 tail = tail[end_index:]
318
319 if _http_re.match(middle):
320 if middle.startswith("https://") or middle.startswith("http://"):
321 middle = (
322 f'<a href="{middle}"{rel_attr}{target_attr}>{trim_url(middle)}</a>'
323 )
324 else:
325 middle = (
326 f'<a href="https://{middle}"{rel_attr}{target_attr}>'
327 f"{trim_url(middle)}</a>"
328 )
329
330 elif middle.startswith("mailto:") and _email_re.match(middle[7:]):
331 middle = f'<a href="{middle}">{middle[7:]}</a>'
332
333 elif (
334 "@" in middle
335 and not middle.startswith("www.")
336 # ignore values like `@a@b`
337 and not middle.startswith("@")
338 and ":" not in middle
339 and _email_re.match(middle)
340 ):
341 middle = f'<a href="mailto:{middle}">{middle}</a>'
342
343 elif extra_schemes is not None:
344 for scheme in extra_schemes:
345 if middle != scheme and middle.startswith(scheme):
346 middle = f'<a href="{middle}"{rel_attr}{target_attr}>{middle}</a>'
347
348 words[i] = f"{head}{middle}{tail}"
349
350 return "".join(words)
351
352
353def generate_lorem_ipsum(
354 n: int = 5, html: bool = True, min: int = 20, max: int = 100
355) -> str:
356 """Generate some lorem ipsum for the template."""
357 from .constants import LOREM_IPSUM_WORDS
358
359 words = LOREM_IPSUM_WORDS.split()
360 result = []
361
362 for _ in range(n):
363 next_capitalized = True
364 last_comma = last_fullstop = 0
365 word = None
366 last = None
367 p = []
368
369 # each paragraph contains out of 20 to 100 words.
370 for idx, _ in enumerate(range(randrange(min, max))):
371 while True:
372 word = choice(words)
373 if word != last:
374 last = word
375 break
376 if next_capitalized:
377 word = word.capitalize()
378 next_capitalized = False
379 # add commas
380 if idx - randrange(3, 8) > last_comma:
381 last_comma = idx
382 last_fullstop += 2
383 word += ","
384 # add end of sentences
385 if idx - randrange(10, 20) > last_fullstop:
386 last_comma = last_fullstop = idx
387 word += "."
388 next_capitalized = True
389 p.append(word)
390
391 # ensure that the paragraph ends with a dot.
392 p_str = " ".join(p)
393
394 if p_str.endswith(","):
395 p_str = p_str[:-1] + "."
396 elif not p_str.endswith("."):
397 p_str += "."
398
399 result.append(p_str)
400
401 if not html:
402 return "\n\n".join(result)
403 return markupsafe.Markup(
404 "\n".join(f"<p>{markupsafe.escape(x)}</p>" for x in result)
405 )
406
407
408def url_quote(obj: t.Any, charset: str = "utf-8", for_qs: bool = False) -> str:
409 """Quote a string for use in a URL using the given charset.
410
411 :param obj: String or bytes to quote. Other types are converted to
412 string then encoded to bytes using the given charset.
413 :param charset: Encode text to bytes using this charset.
414 :param for_qs: Quote "/" and use "+" for spaces.
415 """
416 if not isinstance(obj, bytes):
417 if not isinstance(obj, str):
418 obj = str(obj)
419
420 obj = obj.encode(charset)
421
422 safe = b"" if for_qs else b"/"
423 rv = quote_from_bytes(obj, safe)
424
425 if for_qs:
426 rv = rv.replace("%20", "+")
427
428 return rv
429
430
431@abc.MutableMapping.register
432class LRUCache:
433 """A simple LRU Cache implementation."""
434
435 # this is fast for small capacities (something below 1000) but doesn't
436 # scale. But as long as it's only used as storage for templates this
437 # won't do any harm.
438
439 def __init__(self, capacity: int) -> None:
440 self.capacity = capacity
441 self._mapping: t.Dict[t.Any, t.Any] = {}
442 self._queue: te.Deque[t.Any] = deque()
443 self._postinit()
444
445 def _postinit(self) -> None:
446 # alias all queue methods for faster lookup
447 self._popleft = self._queue.popleft
448 self._pop = self._queue.pop
449 self._remove = self._queue.remove
450 self._wlock = Lock()
451 self._append = self._queue.append
452
453 def __getstate__(self) -> t.Mapping[str, t.Any]:
454 return {
455 "capacity": self.capacity,
456 "_mapping": self._mapping,
457 "_queue": self._queue,
458 }
459
460 def __setstate__(self, d: t.Mapping[str, t.Any]) -> None:
461 self.__dict__.update(d)
462 self._postinit()
463
464 def __getnewargs__(self) -> t.Tuple[t.Any, ...]:
465 return (self.capacity,)
466
467 def copy(self) -> "te.Self":
468 """Return a shallow copy of the instance."""
469 rv = self.__class__(self.capacity)
470 rv._mapping.update(self._mapping)
471 rv._queue.extend(self._queue)
472 return rv
473
474 def get(self, key: t.Any, default: t.Any = None) -> t.Any:
475 """Return an item from the cache dict or `default`"""
476 try:
477 return self[key]
478 except KeyError:
479 return default
480
481 def setdefault(self, key: t.Any, default: t.Any = None) -> t.Any:
482 """Set `default` if the key is not in the cache otherwise
483 leave unchanged. Return the value of this key.
484 """
485 try:
486 return self[key]
487 except KeyError:
488 self[key] = default
489 return default
490
491 def clear(self) -> None:
492 """Clear the cache."""
493 with self._wlock:
494 self._mapping.clear()
495 self._queue.clear()
496
497 def __contains__(self, key: t.Any) -> bool:
498 """Check if a key exists in this cache."""
499 return key in self._mapping
500
501 def __len__(self) -> int:
502 """Return the current size of the cache."""
503 return len(self._mapping)
504
505 def __repr__(self) -> str:
506 return f"<{type(self).__name__} {self._mapping!r}>"
507
508 def __getitem__(self, key: t.Any) -> t.Any:
509 """Get an item from the cache. Moves the item up so that it has the
510 highest priority then.
511
512 Raise a `KeyError` if it does not exist.
513 """
514 with self._wlock:
515 rv = self._mapping[key]
516
517 if self._queue[-1] != key:
518 try:
519 self._remove(key)
520 except ValueError:
521 # if something removed the key from the container
522 # when we read, ignore the ValueError that we would
523 # get otherwise.
524 pass
525
526 self._append(key)
527
528 return rv
529
530 def __setitem__(self, key: t.Any, value: t.Any) -> None:
531 """Sets the value for an item. Moves the item up so that it
532 has the highest priority then.
533 """
534 with self._wlock:
535 if key in self._mapping:
536 self._remove(key)
537 elif len(self._mapping) == self.capacity:
538 del self._mapping[self._popleft()]
539
540 self._append(key)
541 self._mapping[key] = value
542
543 def __delitem__(self, key: t.Any) -> None:
544 """Remove an item from the cache dict.
545 Raise a `KeyError` if it does not exist.
546 """
547 with self._wlock:
548 del self._mapping[key]
549
550 try:
551 self._remove(key)
552 except ValueError:
553 pass
554
555 def items(self) -> t.Iterable[t.Tuple[t.Any, t.Any]]:
556 """Return a list of items."""
557 result = [(key, self._mapping[key]) for key in list(self._queue)]
558 result.reverse()
559 return result
560
561 def values(self) -> t.Iterable[t.Any]:
562 """Return a list of all values."""
563 return [x[1] for x in self.items()]
564
565 def keys(self) -> t.Iterable[t.Any]:
566 """Return a list of all keys ordered by most recent usage."""
567 return list(self)
568
569 def __iter__(self) -> t.Iterator[t.Any]:
570 return reversed(tuple(self._queue))
571
572 def __reversed__(self) -> t.Iterator[t.Any]:
573 """Iterate over the keys in the cache dict, oldest items
574 coming first.
575 """
576 return iter(tuple(self._queue))
577
578 __copy__ = copy
579
580
581def select_autoescape(
582 enabled_extensions: t.Collection[str] = ("html", "htm", "xml"),
583 disabled_extensions: t.Collection[str] = (),
584 default_for_string: bool = True,
585 default: bool = False,
586) -> t.Callable[[t.Optional[str]], bool]:
587 """Intelligently sets the initial value of autoescaping based on the
588 filename of the template. This is the recommended way to configure
589 autoescaping if you do not want to write a custom function yourself.
590
591 If you want to enable it for all templates created from strings or
592 for all templates with `.html` and `.xml` extensions::
593
594 from jinja2 import Environment, select_autoescape
595 env = Environment(autoescape=select_autoescape(
596 enabled_extensions=('html', 'xml'),
597 default_for_string=True,
598 ))
599
600 Example configuration to turn it on at all times except if the template
601 ends with `.txt`::
602
603 from jinja2 import Environment, select_autoescape
604 env = Environment(autoescape=select_autoescape(
605 disabled_extensions=('txt',),
606 default_for_string=True,
607 default=True,
608 ))
609
610 The `enabled_extensions` is an iterable of all the extensions that
611 autoescaping should be enabled for. Likewise `disabled_extensions` is
612 a list of all templates it should be disabled for. If a template is
613 loaded from a string then the default from `default_for_string` is used.
614 If nothing matches then the initial value of autoescaping is set to the
615 value of `default`.
616
617 For security reasons this function operates case insensitive.
618
619 .. versionadded:: 2.9
620 """
621 enabled_patterns = tuple(f".{x.lstrip('.').lower()}" for x in enabled_extensions)
622 disabled_patterns = tuple(f".{x.lstrip('.').lower()}" for x in disabled_extensions)
623
624 def autoescape(template_name: t.Optional[str]) -> bool:
625 if template_name is None:
626 return default_for_string
627 template_name = template_name.lower()
628 if template_name.endswith(enabled_patterns):
629 return True
630 if template_name.endswith(disabled_patterns):
631 return False
632 return default
633
634 return autoescape
635
636
637def htmlsafe_json_dumps(
638 obj: t.Any, dumps: t.Optional[t.Callable[..., str]] = None, **kwargs: t.Any
639) -> markupsafe.Markup:
640 """Serialize an object to a string of JSON with :func:`json.dumps`,
641 then replace HTML-unsafe characters with Unicode escapes and mark
642 the result safe with :class:`~markupsafe.Markup`.
643
644 This is available in templates as the ``|tojson`` filter.
645
646 The following characters are escaped: ``<``, ``>``, ``&``, ``'``.
647
648 The returned string is safe to render in HTML documents and
649 ``<script>`` tags. The exception is in HTML attributes that are
650 double quoted; either use single quotes or the ``|forceescape``
651 filter.
652
653 :param obj: The object to serialize to JSON.
654 :param dumps: The ``dumps`` function to use. Defaults to
655 ``env.policies["json.dumps_function"]``, which defaults to
656 :func:`json.dumps`.
657 :param kwargs: Extra arguments to pass to ``dumps``. Merged onto
658 ``env.policies["json.dumps_kwargs"]``.
659
660 .. versionchanged:: 3.0
661 The ``dumper`` parameter is renamed to ``dumps``.
662
663 .. versionadded:: 2.9
664 """
665 if dumps is None:
666 dumps = json.dumps
667
668 return markupsafe.Markup(
669 dumps(obj, **kwargs)
670 .replace("<", "\\u003c")
671 .replace(">", "\\u003e")
672 .replace("&", "\\u0026")
673 .replace("'", "\\u0027")
674 )
675
676
677class Cycler:
678 """Cycle through values by yield them one at a time, then restarting
679 once the end is reached. Available as ``cycler`` in templates.
680
681 Similar to ``loop.cycle``, but can be used outside loops or across
682 multiple loops. For example, render a list of folders and files in a
683 list, alternating giving them "odd" and "even" classes.
684
685 .. code-block:: html+jinja
686
687 {% set row_class = cycler("odd", "even") %}
688 <ul class="browser">
689 {% for folder in folders %}
690 <li class="folder {{ row_class.next() }}">{{ folder }}
691 {% endfor %}
692 {% for file in files %}
693 <li class="file {{ row_class.next() }}">{{ file }}
694 {% endfor %}
695 </ul>
696
697 :param items: Each positional argument will be yielded in the order
698 given for each cycle.
699
700 .. versionadded:: 2.1
701 """
702
703 def __init__(self, *items: t.Any) -> None:
704 if not items:
705 raise RuntimeError("at least one item has to be provided")
706 self.items = items
707 self.pos = 0
708
709 def reset(self) -> None:
710 """Resets the current item to the first item."""
711 self.pos = 0
712
713 @property
714 def current(self) -> t.Any:
715 """Return the current item. Equivalent to the item that will be
716 returned next time :meth:`next` is called.
717 """
718 return self.items[self.pos]
719
720 def next(self) -> t.Any:
721 """Return the current item, then advance :attr:`current` to the
722 next item.
723 """
724 rv = self.current
725 self.pos = (self.pos + 1) % len(self.items)
726 return rv
727
728 __next__ = next
729
730
731class Joiner:
732 """A joining helper for templates."""
733
734 def __init__(self, sep: str = ", ") -> None:
735 self.sep = sep
736 self.used = False
737
738 def __call__(self) -> str:
739 if not self.used:
740 self.used = True
741 return ""
742 return self.sep
743
744
745class Namespace:
746 """A namespace object that can hold arbitrary attributes. It may be
747 initialized from a dictionary or with keyword arguments."""
748
749 def __init__(*args: t.Any, **kwargs: t.Any) -> None: # noqa: B902
750 self, args = args[0], args[1:]
751 self.__attrs = dict(*args, **kwargs)
752
753 def __getattribute__(self, name: str) -> t.Any:
754 # __class__ is needed for the awaitable check in async mode
755 if name in {"_Namespace__attrs", "__class__"}:
756 return object.__getattribute__(self, name)
757 try:
758 return self.__attrs[name]
759 except KeyError:
760 raise AttributeError(name) from None
761
762 def __setitem__(self, name: str, value: t.Any) -> None:
763 self.__attrs[name] = value
764
765 def __repr__(self) -> str:
766 return f"<Namespace {self.__attrs!r}>"