Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/pyparsing/util.py: 53%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# util.py
2import contextlib
3from functools import lru_cache, wraps
4import inspect
5import itertools
6import types
7from typing import Callable, Union, Iterable, TypeVar, cast
8import warnings
10_bslash = chr(92)
11C = TypeVar("C", bound=Callable)
14class __config_flags:
15 """Internal class for defining compatibility and debugging flags"""
17 _all_names: list[str] = []
18 _fixed_names: list[str] = []
19 _type_desc = "configuration"
21 @classmethod
22 def _set(cls, dname, value):
23 if dname in cls._fixed_names:
24 warnings.warn(
25 f"{cls.__name__}.{dname} {cls._type_desc} is {str(getattr(cls, dname)).upper()}"
26 f" and cannot be overridden",
27 stacklevel=3,
28 )
29 return
30 if dname in cls._all_names:
31 setattr(cls, dname, value)
32 else:
33 raise ValueError(f"no such {cls._type_desc} {dname!r}")
35 enable = classmethod(lambda cls, name: cls._set(name, True))
36 disable = classmethod(lambda cls, name: cls._set(name, False))
39@lru_cache(maxsize=128)
40def col(loc: int, strg: str) -> int:
41 """
42 Returns current column within a string, counting newlines as line separators.
43 The first column is number 1.
45 Note: the default parsing behavior is to expand tabs in the input string
46 before starting the parsing process. See
47 :class:`ParserElement.parse_string` for more
48 information on parsing strings containing ``<TAB>`` s, and suggested
49 methods to maintain a consistent view of the parsed string, the parse
50 location, and line and column positions within the parsed string.
51 """
52 s = strg
53 return 1 if 0 < loc < len(s) and s[loc - 1] == "\n" else loc - s.rfind("\n", 0, loc)
56@lru_cache(maxsize=128)
57def lineno(loc: int, strg: str) -> int:
58 """Returns current line number within a string, counting newlines as line separators.
59 The first line is number 1.
61 Note - the default parsing behavior is to expand tabs in the input string
62 before starting the parsing process. See :class:`ParserElement.parse_string`
63 for more information on parsing strings containing ``<TAB>`` s, and
64 suggested methods to maintain a consistent view of the parsed string, the
65 parse location, and line and column positions within the parsed string.
66 """
67 return strg.count("\n", 0, loc) + 1
70@lru_cache(maxsize=128)
71def line(loc: int, strg: str) -> str:
72 """
73 Returns the line of text containing loc within a string, counting newlines as line separators.
74 """
75 last_cr = strg.rfind("\n", 0, loc)
76 next_cr = strg.find("\n", loc)
77 return strg[last_cr + 1 : next_cr] if next_cr >= 0 else strg[last_cr + 1 :]
80class _UnboundedCache:
81 def __init__(self):
82 cache = {}
83 cache_get = cache.get
84 self.not_in_cache = not_in_cache = object()
86 def get(_, key):
87 return cache_get(key, not_in_cache)
89 def set_(_, key, value):
90 cache[key] = value
92 def clear(_):
93 cache.clear()
95 self.size = None
96 self.get = types.MethodType(get, self)
97 self.set = types.MethodType(set_, self)
98 self.clear = types.MethodType(clear, self)
101class _FifoCache:
102 def __init__(self, size):
103 cache = {}
104 self.size = size
105 self.not_in_cache = not_in_cache = object()
106 cache_get = cache.get
107 cache_pop = cache.pop
109 def get(_, key):
110 return cache_get(key, not_in_cache)
112 def set_(_, key, value):
113 cache[key] = value
114 while len(cache) > size:
115 # pop oldest element in cache by getting the first key
116 cache_pop(next(iter(cache)))
118 def clear(_):
119 cache.clear()
121 self.get = types.MethodType(get, self)
122 self.set = types.MethodType(set_, self)
123 self.clear = types.MethodType(clear, self)
126class LRUMemo:
127 """
128 A memoizing mapping that retains `capacity` deleted items
130 The memo tracks retained items by their access order; once `capacity` items
131 are retained, the least recently used item is discarded.
132 """
134 def __init__(self, capacity):
135 self._capacity = capacity
136 self._active = {}
137 self._memory = {}
139 def __getitem__(self, key):
140 try:
141 return self._active[key]
142 except KeyError:
143 self._memory[key] = self._memory.pop(key)
144 return self._memory[key]
146 def __setitem__(self, key, value):
147 self._memory.pop(key, None)
148 self._active[key] = value
150 def __delitem__(self, key):
151 try:
152 value = self._active.pop(key)
153 except KeyError:
154 pass
155 else:
156 oldest_keys = list(self._memory)[: -(self._capacity + 1)]
157 for key_to_delete in oldest_keys:
158 self._memory.pop(key_to_delete)
159 self._memory[key] = value
161 def clear(self):
162 self._active.clear()
163 self._memory.clear()
166class UnboundedMemo(dict):
167 """
168 A memoizing mapping that retains all deleted items
169 """
171 def __delitem__(self, key):
172 pass
175def _escape_regex_range_chars(s: str) -> str:
176 # escape these chars: ^-[]
177 for c in r"\^-[]":
178 s = s.replace(c, _bslash + c)
179 s = s.replace("\n", r"\n")
180 s = s.replace("\t", r"\t")
181 return str(s)
184class _GroupConsecutive:
185 """
186 Used as a callable `key` for itertools.groupby to group
187 characters that are consecutive:
188 itertools.groupby("abcdejkmpqrs", key=IsConsecutive())
189 yields:
190 (0, iter(['a', 'b', 'c', 'd', 'e']))
191 (1, iter(['j', 'k']))
192 (2, iter(['m']))
193 (3, iter(['p', 'q', 'r', 's']))
194 """
196 def __init__(self):
197 self.prev = 0
198 self.counter = itertools.count()
199 self.value = -1
201 def __call__(self, char: str) -> int:
202 c_int = ord(char)
203 self.prev, prev = c_int, self.prev
204 if c_int - prev > 1:
205 self.value = next(self.counter)
206 return self.value
209def _collapse_string_to_ranges(
210 s: Union[str, Iterable[str]], re_escape: bool = True
211) -> str:
212 r"""
213 Take a string or list of single-character strings, and return
214 a string of the consecutive characters in that string collapsed
215 into groups, as might be used in a regular expression '[a-z]'
216 character set:
217 'a' -> 'a' -> '[a]'
218 'bc' -> 'bc' -> '[bc]'
219 'defgh' -> 'd-h' -> '[d-h]'
220 'fdgeh' -> 'd-h' -> '[d-h]'
221 'jklnpqrtu' -> 'j-lnp-rtu' -> '[j-lnp-rtu]'
222 Duplicates get collapsed out:
223 'aaa' -> 'a' -> '[a]'
224 'bcbccb' -> 'bc' -> '[bc]'
225 'defghhgf' -> 'd-h' -> '[d-h]'
226 'jklnpqrjjjtu' -> 'j-lnp-rtu' -> '[j-lnp-rtu]'
227 Spaces are preserved:
228 'ab c' -> ' a-c' -> '[ a-c]'
229 Characters that are significant when defining regex ranges
230 get escaped:
231 'acde[]-' -> r'\-\[\]ac-e' -> r'[\-\[\]ac-e]'
232 """
234 # Developer notes:
235 # - Do not optimize this code assuming that the given input string
236 # or internal lists will be short (such as in loading generators into
237 # lists to make it easier to find the last element); this method is also
238 # used to generate regex ranges for character sets in the pyparsing.unicode
239 # classes, and these can be _very_ long lists of strings
241 def escape_re_range_char(c: str) -> str:
242 return "\\" + c if c in r"\^-][" else c
244 def no_escape_re_range_char(c: str) -> str:
245 return c
247 if not re_escape:
248 escape_re_range_char = no_escape_re_range_char
250 ret = []
252 # reduce input string to remove duplicates, and put in sorted order
253 s_chars: list[str] = sorted(set(s))
255 if len(s_chars) > 2:
256 # find groups of characters that are consecutive (can be collapsed
257 # down to "<first>-<last>")
258 for _, chars in itertools.groupby(s_chars, key=_GroupConsecutive()):
259 # _ is unimportant, is just used to identify groups
260 # chars is an iterator of one or more consecutive characters
261 # that comprise the current group
262 first = last = next(chars)
263 with contextlib.suppress(ValueError):
264 *_, last = chars
266 if first == last:
267 # there was only a single char in this group
268 ret.append(escape_re_range_char(first))
270 elif last == chr(ord(first) + 1):
271 # there were only 2 characters in this group
272 # 'a','b' -> 'ab'
273 ret.append(f"{escape_re_range_char(first)}{escape_re_range_char(last)}")
275 else:
276 # there were > 2 characters in this group, make into a range
277 # 'c','d','e' -> 'c-e'
278 ret.append(
279 f"{escape_re_range_char(first)}-{escape_re_range_char(last)}"
280 )
281 else:
282 # only 1 or 2 chars were given to form into groups
283 # 'a' -> ['a']
284 # 'bc' -> ['b', 'c']
285 # 'dg' -> ['d', 'g']
286 # no need to list them with "-", just return as a list
287 # (after escaping)
288 ret = [escape_re_range_char(c) for c in s_chars]
290 return "".join(ret)
293def _flatten(ll: Iterable) -> list:
294 ret = []
295 to_visit = [*ll]
296 while to_visit:
297 i = to_visit.pop(0)
298 if isinstance(i, Iterable) and not isinstance(i, str):
299 to_visit[:0] = i
300 else:
301 ret.append(i)
302 return ret
305def make_compressed_re(
306 word_list: Iterable[str], max_level: int = 2, _level: int = 1
307) -> str:
308 """
309 Create a regular expression string from a list of words, collapsing by common
310 prefixes and optional suffixes.
312 Calls itself recursively to build nested sublists for each group of suffixes
313 that have a shared prefix.
314 """
316 def get_suffixes_from_common_prefixes(namelist: list[str]):
317 if len(namelist) > 1:
318 for prefix, suffixes in itertools.groupby(namelist, key=lambda s: s[:1]):
319 yield prefix, sorted([s[1:] for s in suffixes], key=len, reverse=True)
320 else:
321 yield namelist[0][0], [namelist[0][1:]]
323 if max_level == 0:
324 return "|".join(sorted(word_list, key=len, reverse=True))
326 ret = []
327 sep = ""
328 for initial, suffixes in get_suffixes_from_common_prefixes(sorted(word_list)):
329 ret.append(sep)
330 sep = "|"
332 trailing = ""
333 if "" in suffixes:
334 trailing = "?"
335 suffixes.remove("")
337 if len(suffixes) > 1:
338 if all(len(s) == 1 for s in suffixes):
339 ret.append(f"{initial}[{''.join(suffixes)}]{trailing}")
340 else:
341 if _level < max_level:
342 suffix_re = make_compressed_re(
343 sorted(suffixes), max_level, _level + 1
344 )
345 ret.append(f"{initial}({suffix_re}){trailing}")
346 else:
347 suffixes.sort(key=len, reverse=True)
348 ret.append(f"{initial}({'|'.join(suffixes)}){trailing}")
349 else:
350 if suffixes:
351 suffix = suffixes[0]
352 if len(suffix) > 1 and trailing:
353 ret.append(f"{initial}({suffix}){trailing}")
354 else:
355 ret.append(f"{initial}{suffix}{trailing}")
356 else:
357 ret.append(initial)
358 return "".join(ret)
361def replaced_by_pep8(compat_name: str, fn: C) -> C:
362 # In a future version, uncomment the code in the internal _inner() functions
363 # to begin emitting DeprecationWarnings.
365 # Unwrap staticmethod/classmethod
366 fn = getattr(fn, "__func__", fn)
368 # (Presence of 'self' arg in signature is used by explain_exception() methods, so we take
369 # some extra steps to add it if present in decorated function.)
370 if ["self"] == list(inspect.signature(fn).parameters)[:1]:
372 @wraps(fn)
373 def _inner(self, *args, **kwargs):
374 # warnings.warn(
375 # f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=2
376 # )
377 return fn(self, *args, **kwargs)
379 else:
381 @wraps(fn)
382 def _inner(*args, **kwargs):
383 # warnings.warn(
384 # f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=2
385 # )
386 return fn(*args, **kwargs)
388 _inner.__doc__ = f"""Deprecated - use :class:`{fn.__name__}`"""
389 _inner.__name__ = compat_name
390 _inner.__annotations__ = fn.__annotations__
391 if isinstance(fn, types.FunctionType):
392 _inner.__kwdefaults__ = fn.__kwdefaults__ # type: ignore [attr-defined]
393 elif isinstance(fn, type) and hasattr(fn, "__init__"):
394 _inner.__kwdefaults__ = fn.__init__.__kwdefaults__ # type: ignore [misc,attr-defined]
395 else:
396 _inner.__kwdefaults__ = None # type: ignore [attr-defined]
397 _inner.__qualname__ = fn.__qualname__
398 return cast(C, _inner)