Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pyparsing/util.py: 42%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# util.py
2import contextlib
3import re
4from functools import lru_cache, wraps
5import inspect
6import itertools
7import types
8from typing import Callable, Union, Iterable, TypeVar, cast
9import warnings
11_bslash = chr(92)
12C = TypeVar("C", bound=Callable)
15class __config_flags:
16 """Internal class for defining compatibility and debugging flags"""
18 _all_names: list[str] = []
19 _fixed_names: list[str] = []
20 _type_desc = "configuration"
22 @classmethod
23 def _set(cls, dname, value):
24 if dname in cls._fixed_names:
25 warnings.warn(
26 f"{cls.__name__}.{dname} {cls._type_desc} is {str(getattr(cls, dname)).upper()}"
27 f" and cannot be overridden",
28 stacklevel=3,
29 )
30 return
31 if dname in cls._all_names:
32 setattr(cls, dname, value)
33 else:
34 raise ValueError(f"no such {cls._type_desc} {dname!r}")
36 enable = classmethod(lambda cls, name: cls._set(name, True))
37 disable = classmethod(lambda cls, name: cls._set(name, False))
40@lru_cache(maxsize=128)
41def col(loc: int, strg: str) -> int:
42 """
43 Returns current column within a string, counting newlines as line separators.
44 The first column is number 1.
46 Note: the default parsing behavior is to expand tabs in the input string
47 before starting the parsing process. See
48 :class:`ParserElement.parse_string` for more
49 information on parsing strings containing ``<TAB>`` s, and suggested
50 methods to maintain a consistent view of the parsed string, the parse
51 location, and line and column positions within the parsed string.
52 """
53 s = strg
54 return 1 if 0 < loc < len(s) and s[loc - 1] == "\n" else loc - s.rfind("\n", 0, loc)
57@lru_cache(maxsize=128)
58def lineno(loc: int, strg: str) -> int:
59 """Returns current line number within a string, counting newlines as line separators.
60 The first line is number 1.
62 Note - the default parsing behavior is to expand tabs in the input string
63 before starting the parsing process. See :class:`ParserElement.parse_string`
64 for more information on parsing strings containing ``<TAB>`` s, and
65 suggested methods to maintain a consistent view of the parsed string, the
66 parse location, and line and column positions within the parsed string.
67 """
68 return strg.count("\n", 0, loc) + 1
71@lru_cache(maxsize=128)
72def line(loc: int, strg: str) -> str:
73 """
74 Returns the line of text containing loc within a string, counting newlines as line separators.
75 """
76 last_cr = strg.rfind("\n", 0, loc)
77 next_cr = strg.find("\n", loc)
78 return strg[last_cr + 1 : next_cr] if next_cr >= 0 else strg[last_cr + 1 :]
81class _UnboundedCache:
82 def __init__(self):
83 cache = {}
84 cache_get = cache.get
85 self.not_in_cache = not_in_cache = object()
87 def get(_, key):
88 return cache_get(key, not_in_cache)
90 def set_(_, key, value):
91 cache[key] = value
93 def clear(_):
94 cache.clear()
96 self.size = None
97 self.get = types.MethodType(get, self)
98 self.set = types.MethodType(set_, self)
99 self.clear = types.MethodType(clear, self)
102class _FifoCache:
103 def __init__(self, size):
104 cache = {}
105 self.size = size
106 self.not_in_cache = not_in_cache = object()
107 cache_get = cache.get
108 cache_pop = cache.pop
110 def get(_, key):
111 return cache_get(key, not_in_cache)
113 def set_(_, key, value):
114 cache[key] = value
115 while len(cache) > size:
116 # pop oldest element in cache by getting the first key
117 cache_pop(next(iter(cache)))
119 def clear(_):
120 cache.clear()
122 self.get = types.MethodType(get, self)
123 self.set = types.MethodType(set_, self)
124 self.clear = types.MethodType(clear, self)
127class LRUMemo:
128 """
129 A memoizing mapping that retains `capacity` deleted items
131 The memo tracks retained items by their access order; once `capacity` items
132 are retained, the least recently used item is discarded.
133 """
135 def __init__(self, capacity):
136 self._capacity = capacity
137 self._active = {}
138 self._memory = {}
140 def __getitem__(self, key):
141 try:
142 return self._active[key]
143 except KeyError:
144 self._memory[key] = self._memory.pop(key)
145 return self._memory[key]
147 def __setitem__(self, key, value):
148 self._memory.pop(key, None)
149 self._active[key] = value
151 def __delitem__(self, key):
152 try:
153 value = self._active.pop(key)
154 except KeyError:
155 pass
156 else:
157 oldest_keys = list(self._memory)[: -(self._capacity + 1)]
158 for key_to_delete in oldest_keys:
159 self._memory.pop(key_to_delete)
160 self._memory[key] = value
162 def clear(self):
163 self._active.clear()
164 self._memory.clear()
167class UnboundedMemo(dict):
168 """
169 A memoizing mapping that retains all deleted items
170 """
172 def __delitem__(self, key):
173 pass
176def _escape_regex_range_chars(s: str) -> str:
177 # escape these chars: ^-[]
178 for c in r"\^-[]":
179 s = s.replace(c, _bslash + c)
180 s = s.replace("\n", r"\n")
181 s = s.replace("\t", r"\t")
182 return str(s)
185class _GroupConsecutive:
186 """
187 Used as a callable `key` for itertools.groupby to group
188 characters that are consecutive:
189 itertools.groupby("abcdejkmpqrs", key=IsConsecutive())
190 yields:
191 (0, iter(['a', 'b', 'c', 'd', 'e']))
192 (1, iter(['j', 'k']))
193 (2, iter(['m']))
194 (3, iter(['p', 'q', 'r', 's']))
195 """
197 def __init__(self) -> None:
198 self.prev = 0
199 self.counter = itertools.count()
200 self.value = -1
202 def __call__(self, char: str) -> int:
203 c_int = ord(char)
204 self.prev, prev = c_int, self.prev
205 if c_int - prev > 1:
206 self.value = next(self.counter)
207 return self.value
210def _collapse_string_to_ranges(
211 s: Union[str, Iterable[str]], re_escape: bool = True
212) -> str:
213 r"""
214 Take a string or list of single-character strings, and return
215 a string of the consecutive characters in that string collapsed
216 into groups, as might be used in a regular expression '[a-z]'
217 character set:
218 'a' -> 'a' -> '[a]'
219 'bc' -> 'bc' -> '[bc]'
220 'defgh' -> 'd-h' -> '[d-h]'
221 'fdgeh' -> 'd-h' -> '[d-h]'
222 'jklnpqrtu' -> 'j-lnp-rtu' -> '[j-lnp-rtu]'
223 Duplicates get collapsed out:
224 'aaa' -> 'a' -> '[a]'
225 'bcbccb' -> 'bc' -> '[bc]'
226 'defghhgf' -> 'd-h' -> '[d-h]'
227 'jklnpqrjjjtu' -> 'j-lnp-rtu' -> '[j-lnp-rtu]'
228 Spaces are preserved:
229 'ab c' -> ' a-c' -> '[ a-c]'
230 Characters that are significant when defining regex ranges
231 get escaped:
232 'acde[]-' -> r'\-\[\]ac-e' -> r'[\-\[\]ac-e]'
233 """
235 # Developer notes:
236 # - Do not optimize this code assuming that the given input string
237 # or internal lists will be short (such as in loading generators into
238 # lists to make it easier to find the last element); this method is also
239 # used to generate regex ranges for character sets in the pyparsing.unicode
240 # classes, and these can be _very_ long lists of strings
242 def escape_re_range_char(c: str) -> str:
243 return "\\" + c if c in r"\^-][" else c
245 def no_escape_re_range_char(c: str) -> str:
246 return c
248 if not re_escape:
249 escape_re_range_char = no_escape_re_range_char
251 ret = []
253 # reduce input string to remove duplicates, and put in sorted order
254 s_chars: list[str] = sorted(set(s))
256 if len(s_chars) > 2:
257 # find groups of characters that are consecutive (can be collapsed
258 # down to "<first>-<last>")
259 for _, chars in itertools.groupby(s_chars, key=_GroupConsecutive()):
260 # _ is unimportant, is just used to identify groups
261 # chars is an iterator of one or more consecutive characters
262 # that comprise the current group
263 first = last = next(chars)
264 with contextlib.suppress(ValueError):
265 *_, last = chars
267 if first == last:
268 # there was only a single char in this group
269 ret.append(escape_re_range_char(first))
271 elif last == chr(ord(first) + 1):
272 # there were only 2 characters in this group
273 # 'a','b' -> 'ab'
274 ret.append(f"{escape_re_range_char(first)}{escape_re_range_char(last)}")
276 else:
277 # there were > 2 characters in this group, make into a range
278 # 'c','d','e' -> 'c-e'
279 ret.append(
280 f"{escape_re_range_char(first)}-{escape_re_range_char(last)}"
281 )
282 else:
283 # only 1 or 2 chars were given to form into groups
284 # 'a' -> ['a']
285 # 'bc' -> ['b', 'c']
286 # 'dg' -> ['d', 'g']
287 # no need to list them with "-", just return as a list
288 # (after escaping)
289 ret = [escape_re_range_char(c) for c in s_chars]
291 return "".join(ret)
294def _flatten(ll: Iterable) -> list:
295 ret = []
296 to_visit = [*ll]
297 while to_visit:
298 i = to_visit.pop(0)
299 if isinstance(i, Iterable) and not isinstance(i, str):
300 to_visit[:0] = i
301 else:
302 ret.append(i)
303 return ret
306def make_compressed_re(
307 word_list: Iterable[str],
308 max_level: int = 2,
309 *,
310 non_capturing_groups: bool = True,
311 _level: int = 1,
312) -> str:
313 """
314 Create a regular expression string from a list of words, collapsing by common
315 prefixes and optional suffixes.
317 Calls itself recursively to build nested sublists for each group of suffixes
318 that have a shared prefix.
319 """
321 def get_suffixes_from_common_prefixes(namelist: list[str]):
322 if len(namelist) > 1:
323 for prefix, suffixes in itertools.groupby(namelist, key=lambda s: s[:1]):
324 yield prefix, sorted([s[1:] for s in suffixes], key=len, reverse=True)
325 else:
326 yield namelist[0][0], [namelist[0][1:]]
328 if _level == 1:
329 if not word_list:
330 raise ValueError("no words given to make_compressed_re()")
332 if "" in word_list:
333 raise ValueError("word list cannot contain empty string")
334 else:
335 # internal recursive call, just return empty string if no words
336 if not word_list:
337 return ""
339 # dedupe the word list
340 word_list = list({}.fromkeys(word_list))
342 if max_level == 0:
343 if any(len(wd) > 1 for wd in word_list):
344 return "|".join(
345 sorted([re.escape(wd) for wd in word_list], key=len, reverse=True)
346 )
347 else:
348 return f"[{''.join(_escape_regex_range_chars(wd) for wd in word_list)}]"
350 ret = []
351 sep = ""
352 ncgroup = "?:" if non_capturing_groups else ""
354 for initial, suffixes in get_suffixes_from_common_prefixes(sorted(word_list)):
355 ret.append(sep)
356 sep = "|"
358 initial = re.escape(initial)
360 trailing = ""
361 if "" in suffixes:
362 trailing = "?"
363 suffixes.remove("")
365 if len(suffixes) > 1:
366 if all(len(s) == 1 for s in suffixes):
367 ret.append(
368 f"{initial}[{''.join(_escape_regex_range_chars(s) for s in suffixes)}]{trailing}"
369 )
370 else:
371 if _level < max_level:
372 suffix_re = make_compressed_re(
373 sorted(suffixes),
374 max_level,
375 non_capturing_groups=non_capturing_groups,
376 _level=_level + 1,
377 )
378 ret.append(f"{initial}({ncgroup}{suffix_re}){trailing}")
379 else:
380 if all(len(s) == 1 for s in suffixes):
381 ret.append(
382 f"{initial}[{''.join(_escape_regex_range_chars(s) for s in suffixes)}]{trailing}"
383 )
384 else:
385 suffixes.sort(key=len, reverse=True)
386 ret.append(
387 f"{initial}({ncgroup}{'|'.join(re.escape(s) for s in suffixes)}){trailing}"
388 )
389 else:
390 if suffixes:
391 suffix = re.escape(suffixes[0])
392 if len(suffix) > 1 and trailing:
393 ret.append(f"{initial}({ncgroup}{suffix}){trailing}")
394 else:
395 ret.append(f"{initial}{suffix}{trailing}")
396 else:
397 ret.append(initial)
398 return "".join(ret)
401def replaced_by_pep8(compat_name: str, fn: C) -> C:
402 # In a future version, uncomment the code in the internal _inner() functions
403 # to begin emitting DeprecationWarnings.
405 # Unwrap staticmethod/classmethod
406 fn = getattr(fn, "__func__", fn)
408 # (Presence of 'self' arg in signature is used by explain_exception() methods, so we take
409 # some extra steps to add it if present in decorated function.)
410 if ["self"] == list(inspect.signature(fn).parameters)[:1]:
412 @wraps(fn)
413 def _inner(self, *args, **kwargs):
414 # warnings.warn(
415 # f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=2
416 # )
417 return fn(self, *args, **kwargs)
419 else:
421 @wraps(fn)
422 def _inner(*args, **kwargs):
423 # warnings.warn(
424 # f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=2
425 # )
426 return fn(*args, **kwargs)
428 _inner.__doc__ = f"""Deprecated - use :class:`{fn.__name__}`"""
429 _inner.__name__ = compat_name
430 _inner.__annotations__ = fn.__annotations__
431 if isinstance(fn, types.FunctionType):
432 _inner.__kwdefaults__ = fn.__kwdefaults__ # type: ignore [attr-defined]
433 elif isinstance(fn, type) and hasattr(fn, "__init__"):
434 _inner.__kwdefaults__ = fn.__init__.__kwdefaults__ # type: ignore [misc,attr-defined]
435 else:
436 _inner.__kwdefaults__ = None # type: ignore [attr-defined]
437 _inner.__qualname__ = fn.__qualname__
438 return cast(C, _inner)