Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyparsing/util.py: 52%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# util.py
2import inspect
3import warnings
4import types
5import collections
6import itertools
7from functools import lru_cache, wraps
8from typing import Callable, List, Union, Iterable, TypeVar, cast
10_bslash = chr(92)
11C = TypeVar("C", bound=Callable)
14class __config_flags:
15 """Internal class for defining compatibility and debugging flags"""
17 _all_names: List[str] = []
18 _fixed_names: List[str] = []
19 _type_desc = "configuration"
21 @classmethod
22 def _set(cls, dname, value):
23 if dname in cls._fixed_names:
24 warnings.warn(
25 f"{cls.__name__}.{dname} {cls._type_desc} is {str(getattr(cls, dname)).upper()}"
26 f" and cannot be overridden",
27 stacklevel=3,
28 )
29 return
30 if dname in cls._all_names:
31 setattr(cls, dname, value)
32 else:
33 raise ValueError(f"no such {cls._type_desc} {dname!r}")
35 enable = classmethod(lambda cls, name: cls._set(name, True))
36 disable = classmethod(lambda cls, name: cls._set(name, False))
39@lru_cache(maxsize=128)
40def col(loc: int, strg: str) -> int:
41 """
42 Returns current column within a string, counting newlines as line separators.
43 The first column is number 1.
45 Note: the default parsing behavior is to expand tabs in the input string
46 before starting the parsing process. See
47 :class:`ParserElement.parse_string` for more
48 information on parsing strings containing ``<TAB>`` s, and suggested
49 methods to maintain a consistent view of the parsed string, the parse
50 location, and line and column positions within the parsed string.
51 """
52 s = strg
53 return 1 if 0 < loc < len(s) and s[loc - 1] == "\n" else loc - s.rfind("\n", 0, loc)
56@lru_cache(maxsize=128)
57def lineno(loc: int, strg: str) -> int:
58 """Returns current line number within a string, counting newlines as line separators.
59 The first line is number 1.
61 Note - the default parsing behavior is to expand tabs in the input string
62 before starting the parsing process. See :class:`ParserElement.parse_string`
63 for more information on parsing strings containing ``<TAB>`` s, and
64 suggested methods to maintain a consistent view of the parsed string, the
65 parse location, and line and column positions within the parsed string.
66 """
67 return strg.count("\n", 0, loc) + 1
70@lru_cache(maxsize=128)
71def line(loc: int, strg: str) -> str:
72 """
73 Returns the line of text containing loc within a string, counting newlines as line separators.
74 """
75 last_cr = strg.rfind("\n", 0, loc)
76 next_cr = strg.find("\n", loc)
77 return strg[last_cr + 1 : next_cr] if next_cr >= 0 else strg[last_cr + 1 :]
80class _UnboundedCache:
81 def __init__(self):
82 cache = {}
83 cache_get = cache.get
84 self.not_in_cache = not_in_cache = object()
86 def get(_, key):
87 return cache_get(key, not_in_cache)
89 def set_(_, key, value):
90 cache[key] = value
92 def clear(_):
93 cache.clear()
95 self.size = None
96 self.get = types.MethodType(get, self)
97 self.set = types.MethodType(set_, self)
98 self.clear = types.MethodType(clear, self)
101class _FifoCache:
102 def __init__(self, size):
103 self.not_in_cache = not_in_cache = object()
104 cache = {}
105 keyring = [object()] * size
106 cache_get = cache.get
107 cache_pop = cache.pop
108 keyiter = itertools.cycle(range(size))
110 def get(_, key):
111 return cache_get(key, not_in_cache)
113 def set_(_, key, value):
114 cache[key] = value
115 i = next(keyiter)
116 cache_pop(keyring[i], None)
117 keyring[i] = key
119 def clear(_):
120 cache.clear()
121 keyring[:] = [object()] * size
123 self.size = size
124 self.get = types.MethodType(get, self)
125 self.set = types.MethodType(set_, self)
126 self.clear = types.MethodType(clear, self)
129class LRUMemo:
130 """
131 A memoizing mapping that retains `capacity` deleted items
133 The memo tracks retained items by their access order; once `capacity` items
134 are retained, the least recently used item is discarded.
135 """
137 def __init__(self, capacity):
138 self._capacity = capacity
139 self._active = {}
140 self._memory = collections.OrderedDict()
142 def __getitem__(self, key):
143 try:
144 return self._active[key]
145 except KeyError:
146 self._memory.move_to_end(key)
147 return self._memory[key]
149 def __setitem__(self, key, value):
150 self._memory.pop(key, None)
151 self._active[key] = value
153 def __delitem__(self, key):
154 try:
155 value = self._active.pop(key)
156 except KeyError:
157 pass
158 else:
159 while len(self._memory) >= self._capacity:
160 self._memory.popitem(last=False)
161 self._memory[key] = value
163 def clear(self):
164 self._active.clear()
165 self._memory.clear()
168class UnboundedMemo(dict):
169 """
170 A memoizing mapping that retains all deleted items
171 """
173 def __delitem__(self, key):
174 pass
177def _escape_regex_range_chars(s: str) -> str:
178 # escape these chars: ^-[]
179 for c in r"\^-[]":
180 s = s.replace(c, _bslash + c)
181 s = s.replace("\n", r"\n")
182 s = s.replace("\t", r"\t")
183 return str(s)
186def _collapse_string_to_ranges(
187 s: Union[str, Iterable[str]], re_escape: bool = True
188) -> str:
189 def is_consecutive(c):
190 c_int = ord(c)
191 is_consecutive.prev, prev = c_int, is_consecutive.prev
192 if c_int - prev > 1:
193 is_consecutive.value = next(is_consecutive.counter)
194 return is_consecutive.value
196 is_consecutive.prev = 0 # type: ignore [attr-defined]
197 is_consecutive.counter = itertools.count() # type: ignore [attr-defined]
198 is_consecutive.value = -1 # type: ignore [attr-defined]
200 def escape_re_range_char(c):
201 return "\\" + c if c in r"\^-][" else c
203 def no_escape_re_range_char(c):
204 return c
206 if not re_escape:
207 escape_re_range_char = no_escape_re_range_char
209 ret = []
210 s = "".join(sorted(set(s)))
211 if len(s) > 3:
212 for _, chars in itertools.groupby(s, key=is_consecutive):
213 first = last = next(chars)
214 last = collections.deque(
215 itertools.chain(iter([last]), chars), maxlen=1
216 ).pop()
217 if first == last:
218 ret.append(escape_re_range_char(first))
219 else:
220 sep = "" if ord(last) == ord(first) + 1 else "-"
221 ret.append(
222 f"{escape_re_range_char(first)}{sep}{escape_re_range_char(last)}"
223 )
224 else:
225 ret = [escape_re_range_char(c) for c in s]
227 return "".join(ret)
230def _flatten(ll: list) -> list:
231 ret = []
232 for i in ll:
233 if isinstance(i, list):
234 ret.extend(_flatten(i))
235 else:
236 ret.append(i)
237 return ret
240def replaced_by_pep8(compat_name: str, fn: C) -> C:
241 # In a future version, uncomment the code in the internal _inner() functions
242 # to begin emitting DeprecationWarnings.
244 # Unwrap staticmethod/classmethod
245 fn = getattr(fn, "__func__", fn)
247 # (Presence of 'self' arg in signature is used by explain_exception() methods, so we take
248 # some extra steps to add it if present in decorated function.)
249 if ["self"] == list(inspect.signature(fn).parameters)[:1]:
251 @wraps(fn)
252 def _inner(self, *args, **kwargs):
253 # warnings.warn(
254 # f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=2
255 # )
256 return fn(self, *args, **kwargs)
258 else:
260 @wraps(fn)
261 def _inner(*args, **kwargs):
262 # warnings.warn(
263 # f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=2
264 # )
265 return fn(*args, **kwargs)
267 _inner.__doc__ = f"""Deprecated - use :class:`{fn.__name__}`"""
268 _inner.__name__ = compat_name
269 _inner.__annotations__ = fn.__annotations__
270 if isinstance(fn, types.FunctionType):
271 _inner.__kwdefaults__ = fn.__kwdefaults__
272 elif isinstance(fn, type) and hasattr(fn, "__init__"):
273 _inner.__kwdefaults__ = fn.__init__.__kwdefaults__
274 else:
275 _inner.__kwdefaults__ = None
276 _inner.__qualname__ = fn.__qualname__
277 return cast(C, _inner)