Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pyparsing/util.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

211 statements  

1# util.py 

2import contextlib 

3import re 

4from functools import lru_cache, wraps 

5import inspect 

6import itertools 

7import types 

8from typing import Callable, Union, Iterable, TypeVar, cast 

9import warnings 

10 

11_bslash = chr(92) 

12C = TypeVar("C", bound=Callable) 

13 

14 

15class __config_flags: 

16 """Internal class for defining compatibility and debugging flags""" 

17 

18 _all_names: list[str] = [] 

19 _fixed_names: list[str] = [] 

20 _type_desc = "configuration" 

21 

22 @classmethod 

23 def _set(cls, dname, value): 

24 if dname in cls._fixed_names: 

25 warnings.warn( 

26 f"{cls.__name__}.{dname} {cls._type_desc} is {str(getattr(cls, dname)).upper()}" 

27 f" and cannot be overridden", 

28 stacklevel=3, 

29 ) 

30 return 

31 if dname in cls._all_names: 

32 setattr(cls, dname, value) 

33 else: 

34 raise ValueError(f"no such {cls._type_desc} {dname!r}") 

35 

36 enable = classmethod(lambda cls, name: cls._set(name, True)) 

37 disable = classmethod(lambda cls, name: cls._set(name, False)) 

38 

39 

40@lru_cache(maxsize=128) 

41def col(loc: int, strg: str) -> int: 

42 """ 

43 Returns current column within a string, counting newlines as line separators. 

44 The first column is number 1. 

45 

46 Note: the default parsing behavior is to expand tabs in the input string 

47 before starting the parsing process. See 

48 :class:`ParserElement.parse_string` for more 

49 information on parsing strings containing ``<TAB>`` s, and suggested 

50 methods to maintain a consistent view of the parsed string, the parse 

51 location, and line and column positions within the parsed string. 

52 """ 

53 s = strg 

54 return 1 if 0 < loc < len(s) and s[loc - 1] == "\n" else loc - s.rfind("\n", 0, loc) 

55 

56 

57@lru_cache(maxsize=128) 

58def lineno(loc: int, strg: str) -> int: 

59 """Returns current line number within a string, counting newlines as line separators. 

60 The first line is number 1. 

61 

62 Note - the default parsing behavior is to expand tabs in the input string 

63 before starting the parsing process. See :class:`ParserElement.parse_string` 

64 for more information on parsing strings containing ``<TAB>`` s, and 

65 suggested methods to maintain a consistent view of the parsed string, the 

66 parse location, and line and column positions within the parsed string. 

67 """ 

68 return strg.count("\n", 0, loc) + 1 

69 

70 

71@lru_cache(maxsize=128) 

72def line(loc: int, strg: str) -> str: 

73 """ 

74 Returns the line of text containing loc within a string, counting newlines as line separators. 

75 """ 

76 last_cr = strg.rfind("\n", 0, loc) 

77 next_cr = strg.find("\n", loc) 

78 return strg[last_cr + 1 : next_cr] if next_cr >= 0 else strg[last_cr + 1 :] 

79 

80 

81class _UnboundedCache: 

82 def __init__(self): 

83 cache = {} 

84 cache_get = cache.get 

85 self.not_in_cache = not_in_cache = object() 

86 

87 def get(_, key): 

88 return cache_get(key, not_in_cache) 

89 

90 def set_(_, key, value): 

91 cache[key] = value 

92 

93 def clear(_): 

94 cache.clear() 

95 

96 self.size = None 

97 self.get = types.MethodType(get, self) 

98 self.set = types.MethodType(set_, self) 

99 self.clear = types.MethodType(clear, self) 

100 

101 

102class _FifoCache: 

103 def __init__(self, size): 

104 cache = {} 

105 self.size = size 

106 self.not_in_cache = not_in_cache = object() 

107 cache_get = cache.get 

108 cache_pop = cache.pop 

109 

110 def get(_, key): 

111 return cache_get(key, not_in_cache) 

112 

113 def set_(_, key, value): 

114 cache[key] = value 

115 while len(cache) > size: 

116 # pop oldest element in cache by getting the first key 

117 cache_pop(next(iter(cache))) 

118 

119 def clear(_): 

120 cache.clear() 

121 

122 self.get = types.MethodType(get, self) 

123 self.set = types.MethodType(set_, self) 

124 self.clear = types.MethodType(clear, self) 

125 

126 

127class LRUMemo: 

128 """ 

129 A memoizing mapping that retains `capacity` deleted items 

130 

131 The memo tracks retained items by their access order; once `capacity` items 

132 are retained, the least recently used item is discarded. 

133 """ 

134 

135 def __init__(self, capacity): 

136 self._capacity = capacity 

137 self._active = {} 

138 self._memory = {} 

139 

140 def __getitem__(self, key): 

141 try: 

142 return self._active[key] 

143 except KeyError: 

144 self._memory[key] = self._memory.pop(key) 

145 return self._memory[key] 

146 

147 def __setitem__(self, key, value): 

148 self._memory.pop(key, None) 

149 self._active[key] = value 

150 

151 def __delitem__(self, key): 

152 try: 

153 value = self._active.pop(key) 

154 except KeyError: 

155 pass 

156 else: 

157 oldest_keys = list(self._memory)[: -(self._capacity + 1)] 

158 for key_to_delete in oldest_keys: 

159 self._memory.pop(key_to_delete) 

160 self._memory[key] = value 

161 

162 def clear(self): 

163 self._active.clear() 

164 self._memory.clear() 

165 

166 

167class UnboundedMemo(dict): 

168 """ 

169 A memoizing mapping that retains all deleted items 

170 """ 

171 

172 def __delitem__(self, key): 

173 pass 

174 

175 

176def _escape_regex_range_chars(s: str) -> str: 

177 # escape these chars: ^-[] 

178 for c in r"\^-[]": 

179 s = s.replace(c, _bslash + c) 

180 s = s.replace("\n", r"\n") 

181 s = s.replace("\t", r"\t") 

182 return str(s) 

183 

184 

185class _GroupConsecutive: 

186 """ 

187 Used as a callable `key` for itertools.groupby to group 

188 characters that are consecutive: 

189 itertools.groupby("abcdejkmpqrs", key=IsConsecutive()) 

190 yields: 

191 (0, iter(['a', 'b', 'c', 'd', 'e'])) 

192 (1, iter(['j', 'k'])) 

193 (2, iter(['m'])) 

194 (3, iter(['p', 'q', 'r', 's'])) 

195 """ 

196 

197 def __init__(self) -> None: 

198 self.prev = 0 

199 self.counter = itertools.count() 

200 self.value = -1 

201 

202 def __call__(self, char: str) -> int: 

203 c_int = ord(char) 

204 self.prev, prev = c_int, self.prev 

205 if c_int - prev > 1: 

206 self.value = next(self.counter) 

207 return self.value 

208 

209 

210def _collapse_string_to_ranges( 

211 s: Union[str, Iterable[str]], re_escape: bool = True 

212) -> str: 

213 r""" 

214 Take a string or list of single-character strings, and return 

215 a string of the consecutive characters in that string collapsed 

216 into groups, as might be used in a regular expression '[a-z]' 

217 character set: 

218 'a' -> 'a' -> '[a]' 

219 'bc' -> 'bc' -> '[bc]' 

220 'defgh' -> 'd-h' -> '[d-h]' 

221 'fdgeh' -> 'd-h' -> '[d-h]' 

222 'jklnpqrtu' -> 'j-lnp-rtu' -> '[j-lnp-rtu]' 

223 Duplicates get collapsed out: 

224 'aaa' -> 'a' -> '[a]' 

225 'bcbccb' -> 'bc' -> '[bc]' 

226 'defghhgf' -> 'd-h' -> '[d-h]' 

227 'jklnpqrjjjtu' -> 'j-lnp-rtu' -> '[j-lnp-rtu]' 

228 Spaces are preserved: 

229 'ab c' -> ' a-c' -> '[ a-c]' 

230 Characters that are significant when defining regex ranges 

231 get escaped: 

232 'acde[]-' -> r'\-\[\]ac-e' -> r'[\-\[\]ac-e]' 

233 """ 

234 

235 # Developer notes: 

236 # - Do not optimize this code assuming that the given input string 

237 # or internal lists will be short (such as in loading generators into 

238 # lists to make it easier to find the last element); this method is also 

239 # used to generate regex ranges for character sets in the pyparsing.unicode 

240 # classes, and these can be _very_ long lists of strings 

241 

242 def escape_re_range_char(c: str) -> str: 

243 return "\\" + c if c in r"\^-][" else c 

244 

245 def no_escape_re_range_char(c: str) -> str: 

246 return c 

247 

248 if not re_escape: 

249 escape_re_range_char = no_escape_re_range_char 

250 

251 ret = [] 

252 

253 # reduce input string to remove duplicates, and put in sorted order 

254 s_chars: list[str] = sorted(set(s)) 

255 

256 if len(s_chars) > 2: 

257 # find groups of characters that are consecutive (can be collapsed 

258 # down to "<first>-<last>") 

259 for _, chars in itertools.groupby(s_chars, key=_GroupConsecutive()): 

260 # _ is unimportant, is just used to identify groups 

261 # chars is an iterator of one or more consecutive characters 

262 # that comprise the current group 

263 first = last = next(chars) 

264 with contextlib.suppress(ValueError): 

265 *_, last = chars 

266 

267 if first == last: 

268 # there was only a single char in this group 

269 ret.append(escape_re_range_char(first)) 

270 

271 elif last == chr(ord(first) + 1): 

272 # there were only 2 characters in this group 

273 # 'a','b' -> 'ab' 

274 ret.append(f"{escape_re_range_char(first)}{escape_re_range_char(last)}") 

275 

276 else: 

277 # there were > 2 characters in this group, make into a range 

278 # 'c','d','e' -> 'c-e' 

279 ret.append( 

280 f"{escape_re_range_char(first)}-{escape_re_range_char(last)}" 

281 ) 

282 else: 

283 # only 1 or 2 chars were given to form into groups 

284 # 'a' -> ['a'] 

285 # 'bc' -> ['b', 'c'] 

286 # 'dg' -> ['d', 'g'] 

287 # no need to list them with "-", just return as a list 

288 # (after escaping) 

289 ret = [escape_re_range_char(c) for c in s_chars] 

290 

291 return "".join(ret) 

292 

293 

294def _flatten(ll: Iterable) -> list: 

295 ret = [] 

296 to_visit = [*ll] 

297 while to_visit: 

298 i = to_visit.pop(0) 

299 if isinstance(i, Iterable) and not isinstance(i, str): 

300 to_visit[:0] = i 

301 else: 

302 ret.append(i) 

303 return ret 

304 

305 

306def make_compressed_re( 

307 word_list: Iterable[str], 

308 max_level: int = 2, 

309 *, 

310 non_capturing_groups: bool = True, 

311 _level: int = 1, 

312) -> str: 

313 """ 

314 Create a regular expression string from a list of words, collapsing by common 

315 prefixes and optional suffixes. 

316 

317 Calls itself recursively to build nested sublists for each group of suffixes 

318 that have a shared prefix. 

319 """ 

320 

321 def get_suffixes_from_common_prefixes(namelist: list[str]): 

322 if len(namelist) > 1: 

323 for prefix, suffixes in itertools.groupby(namelist, key=lambda s: s[:1]): 

324 yield prefix, sorted([s[1:] for s in suffixes], key=len, reverse=True) 

325 else: 

326 yield namelist[0][0], [namelist[0][1:]] 

327 

328 if _level == 1: 

329 if not word_list: 

330 raise ValueError("no words given to make_compressed_re()") 

331 

332 if "" in word_list: 

333 raise ValueError("word list cannot contain empty string") 

334 else: 

335 # internal recursive call, just return empty string if no words 

336 if not word_list: 

337 return "" 

338 

339 # dedupe the word list 

340 word_list = list({}.fromkeys(word_list)) 

341 

342 if max_level == 0: 

343 if any(len(wd) > 1 for wd in word_list): 

344 return "|".join( 

345 sorted([re.escape(wd) for wd in word_list], key=len, reverse=True) 

346 ) 

347 else: 

348 return f"[{''.join(_escape_regex_range_chars(wd) for wd in word_list)}]" 

349 

350 ret = [] 

351 sep = "" 

352 ncgroup = "?:" if non_capturing_groups else "" 

353 

354 for initial, suffixes in get_suffixes_from_common_prefixes(sorted(word_list)): 

355 ret.append(sep) 

356 sep = "|" 

357 

358 initial = re.escape(initial) 

359 

360 trailing = "" 

361 if "" in suffixes: 

362 trailing = "?" 

363 suffixes.remove("") 

364 

365 if len(suffixes) > 1: 

366 if all(len(s) == 1 for s in suffixes): 

367 ret.append( 

368 f"{initial}[{''.join(_escape_regex_range_chars(s) for s in suffixes)}]{trailing}" 

369 ) 

370 else: 

371 if _level < max_level: 

372 suffix_re = make_compressed_re( 

373 sorted(suffixes), 

374 max_level, 

375 non_capturing_groups=non_capturing_groups, 

376 _level=_level + 1, 

377 ) 

378 ret.append(f"{initial}({ncgroup}{suffix_re}){trailing}") 

379 else: 

380 if all(len(s) == 1 for s in suffixes): 

381 ret.append( 

382 f"{initial}[{''.join(_escape_regex_range_chars(s) for s in suffixes)}]{trailing}" 

383 ) 

384 else: 

385 suffixes.sort(key=len, reverse=True) 

386 ret.append( 

387 f"{initial}({ncgroup}{'|'.join(re.escape(s) for s in suffixes)}){trailing}" 

388 ) 

389 else: 

390 if suffixes: 

391 suffix = re.escape(suffixes[0]) 

392 if len(suffix) > 1 and trailing: 

393 ret.append(f"{initial}({ncgroup}{suffix}){trailing}") 

394 else: 

395 ret.append(f"{initial}{suffix}{trailing}") 

396 else: 

397 ret.append(initial) 

398 return "".join(ret) 

399 

400 

401def replaced_by_pep8(compat_name: str, fn: C) -> C: 

402 # In a future version, uncomment the code in the internal _inner() functions 

403 # to begin emitting DeprecationWarnings. 

404 

405 # Unwrap staticmethod/classmethod 

406 fn = getattr(fn, "__func__", fn) 

407 

408 # (Presence of 'self' arg in signature is used by explain_exception() methods, so we take 

409 # some extra steps to add it if present in decorated function.) 

410 if ["self"] == list(inspect.signature(fn).parameters)[:1]: 

411 

412 @wraps(fn) 

413 def _inner(self, *args, **kwargs): 

414 # warnings.warn( 

415 # f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=2 

416 # ) 

417 return fn(self, *args, **kwargs) 

418 

419 else: 

420 

421 @wraps(fn) 

422 def _inner(*args, **kwargs): 

423 # warnings.warn( 

424 # f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=2 

425 # ) 

426 return fn(*args, **kwargs) 

427 

428 _inner.__doc__ = f"""Deprecated - use :class:`{fn.__name__}`""" 

429 _inner.__name__ = compat_name 

430 _inner.__annotations__ = fn.__annotations__ 

431 if isinstance(fn, types.FunctionType): 

432 _inner.__kwdefaults__ = fn.__kwdefaults__ # type: ignore [attr-defined] 

433 elif isinstance(fn, type) and hasattr(fn, "__init__"): 

434 _inner.__kwdefaults__ = fn.__init__.__kwdefaults__ # type: ignore [misc,attr-defined] 

435 else: 

436 _inner.__kwdefaults__ = None # type: ignore [attr-defined] 

437 _inner.__qualname__ = fn.__qualname__ 

438 return cast(C, _inner)