Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/markdown_it/ruler.py: 66%
120 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:07 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:07 +0000
1"""
2class Ruler
4Helper class, used by [[MarkdownIt#core]], [[MarkdownIt#block]] and
5[[MarkdownIt#inline]] to manage sequences of functions (rules):
7- keep rules in defined order
8- assign the name to each rule
9- enable/disable rules
10- add/replace rules
11- allow assign rules to additional named chains (in the same)
12- caching lists of active rules
14You will not need use this class directly until write plugins. For simple
15rules control use [[MarkdownIt.disable]], [[MarkdownIt.enable]] and
16[[MarkdownIt.use]].
17"""
18from __future__ import annotations
20from collections.abc import Callable, Iterable, MutableMapping
21from dataclasses import dataclass, field
22from typing import TYPE_CHECKING
24from markdown_it._compat import DATACLASS_KWARGS
26if TYPE_CHECKING:
27 from markdown_it import MarkdownIt
30class StateBase:
31 srcCharCode: tuple[int, ...]
33 def __init__(self, src: str, md: MarkdownIt, env: MutableMapping):
34 self.src = src
35 self.env = env
36 self.md = md
38 @property
39 def src(self) -> str:
40 return self._src
42 @src.setter
43 def src(self, value: str) -> None:
44 self._src = value
45 self.srcCharCode = tuple(ord(c) for c in self.src)
48# The first positional arg is always a subtype of `StateBase`. Other
49# arguments may or may not exist, based on the rule's type (block,
50# core, inline). Return type is either `None` or `bool` based on the
51# rule's type.
52RuleFunc = Callable
55@dataclass(**DATACLASS_KWARGS)
56class Rule:
57 name: str
58 enabled: bool
59 fn: RuleFunc = field(repr=False)
60 alt: list[str]
63class Ruler:
64 def __init__(self):
65 # List of added rules.
66 self.__rules__: list[Rule] = []
67 # Cached rule chains.
68 # First level - chain name, '' for default.
69 # Second level - diginal anchor for fast filtering by charcodes.
70 self.__cache__: dict[str, list[RuleFunc]] | None = None
72 def __find__(self, name: str) -> int:
73 """Find rule index by name"""
74 for i, rule in enumerate(self.__rules__):
75 if rule.name == name:
76 return i
77 return -1
79 def __compile__(self) -> None:
80 """Build rules lookup cache"""
81 chains = {""}
82 # collect unique names
83 for rule in self.__rules__:
84 if not rule.enabled:
85 continue
86 for name in rule.alt:
87 chains.add(name)
88 self.__cache__ = {}
89 for chain in chains:
90 self.__cache__[chain] = []
91 for rule in self.__rules__:
92 if not rule.enabled:
93 continue
94 if chain and (chain not in rule.alt):
95 continue
96 self.__cache__[chain].append(rule.fn)
98 def at(self, ruleName: str, fn: RuleFunc, options=None):
99 """Replace rule by name with new function & options.
101 :param ruleName: rule name to replace.
102 :param fn: new rule function.
103 :param options: new rule options (not mandatory).
104 :raises: KeyError if name not found
105 """
106 index = self.__find__(ruleName)
107 options = options or {}
108 if index == -1:
109 raise KeyError(f"Parser rule not found: {ruleName}")
110 self.__rules__[index].fn = fn
111 self.__rules__[index].alt = options.get("alt", [])
112 self.__cache__ = None
114 def before(self, beforeName: str, ruleName: str, fn: RuleFunc, options=None):
115 """Add new rule to chain before one with given name.
117 :param beforeName: new rule will be added before this one.
118 :param ruleName: new rule will be added before this one.
119 :param fn: new rule function.
120 :param options: new rule options (not mandatory).
121 :raises: KeyError if name not found
122 """
123 index = self.__find__(beforeName)
124 options = options or {}
125 if index == -1:
126 raise KeyError(f"Parser rule not found: {beforeName}")
127 self.__rules__.insert(index, Rule(ruleName, True, fn, options.get("alt", [])))
128 self.__cache__ = None
130 def after(self, afterName: str, ruleName: str, fn: RuleFunc, options=None):
131 """Add new rule to chain after one with given name.
133 :param afterName: new rule will be added after this one.
134 :param ruleName: new rule will be added after this one.
135 :param fn: new rule function.
136 :param options: new rule options (not mandatory).
137 :raises: KeyError if name not found
138 """
139 index = self.__find__(afterName)
140 options = options or {}
141 if index == -1:
142 raise KeyError(f"Parser rule not found: {afterName}")
143 self.__rules__.insert(
144 index + 1, Rule(ruleName, True, fn, options.get("alt", []))
145 )
146 self.__cache__ = None
148 def push(self, ruleName: str, fn: RuleFunc, options=None):
149 """Push new rule to the end of chain.
151 :param ruleName: new rule will be added to the end of chain.
152 :param fn: new rule function.
153 :param options: new rule options (not mandatory).
155 """
156 self.__rules__.append(Rule(ruleName, True, fn, (options or {}).get("alt", [])))
157 self.__cache__ = None
159 def enable(self, names: str | Iterable[str], ignoreInvalid: bool = False):
160 """Enable rules with given names.
162 :param names: name or list of rule names to enable.
163 :param ignoreInvalid: ignore errors when rule not found
164 :raises: KeyError if name not found and not ignoreInvalid
165 :return: list of found rule names
166 """
167 if isinstance(names, str):
168 names = [names]
169 result = []
170 for name in names:
171 idx = self.__find__(name)
172 if (idx < 0) and ignoreInvalid:
173 continue
174 if (idx < 0) and not ignoreInvalid:
175 raise KeyError(f"Rules manager: invalid rule name {name}")
176 self.__rules__[idx].enabled = True
177 result.append(name)
178 self.__cache__ = None
179 return result
181 def enableOnly(self, names: str | Iterable[str], ignoreInvalid: bool = False):
182 """Enable rules with given names, and disable everything else.
184 :param names: name or list of rule names to enable.
185 :param ignoreInvalid: ignore errors when rule not found
186 :raises: KeyError if name not found and not ignoreInvalid
187 :return: list of found rule names
188 """
189 if isinstance(names, str):
190 names = [names]
191 for rule in self.__rules__:
192 rule.enabled = False
193 self.enable(names, ignoreInvalid)
195 def disable(self, names: str | Iterable[str], ignoreInvalid: bool = False):
196 """Disable rules with given names.
198 :param names: name or list of rule names to enable.
199 :param ignoreInvalid: ignore errors when rule not found
200 :raises: KeyError if name not found and not ignoreInvalid
201 :return: list of found rule names
202 """
203 if isinstance(names, str):
204 names = [names]
205 result = []
206 for name in names:
207 idx = self.__find__(name)
208 if (idx < 0) and ignoreInvalid:
209 continue
210 if (idx < 0) and not ignoreInvalid:
211 raise KeyError(f"Rules manager: invalid rule name {name}")
212 self.__rules__[idx].enabled = False
213 result.append(name)
214 self.__cache__ = None
215 return result
217 def getRules(self, chainName: str) -> list[RuleFunc]:
218 """Return array of active functions (rules) for given chain name.
219 It analyzes rules configuration, compiles caches if not exists and returns result.
221 Default chain name is `''` (empty string). It can't be skipped.
222 That's done intentionally, to keep signature monomorphic for high speed.
224 """
225 if self.__cache__ is None:
226 self.__compile__()
227 assert self.__cache__ is not None
228 # Chain can be empty, if rules disabled. But we still have to return Array.
229 return self.__cache__.get(chainName, []) or []
231 def get_all_rules(self) -> list[str]:
232 """Return all available rule names."""
233 return [r.name for r in self.__rules__]
235 def get_active_rules(self) -> list[str]:
236 """Return the active rule names."""
237 return [r.name for r in self.__rules__ if r.enabled]