Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dissect/cstruct/parser.py: 55%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

477 statements  

1from __future__ import annotations 

2 

3import ast 

4import re 

5from typing import TYPE_CHECKING, Any 

6 

7from dissect.cstruct import compiler 

8from dissect.cstruct.exceptions import ( 

9 ExpressionParserError, 

10 ExpressionTokenizerError, 

11 ParserError, 

12) 

13from dissect.cstruct.expression import Expression 

14from dissect.cstruct.types import BaseArray, BaseType, Field, Structure 

15 

16if TYPE_CHECKING: 

17 from dissect.cstruct import cstruct 

18 

19 

20class Parser: 

21 """Base class for definition parsers. 

22 

23 Args: 

24 cs: An instance of cstruct. 

25 """ 

26 

27 def __init__(self, cs: cstruct): 

28 self.cstruct = cs 

29 

30 def parse(self, data: str) -> None: 

31 """This function should parse definitions to cstruct types. 

32 

33 Args: 

34 data: Data to parse definitions from, usually a string. 

35 """ 

36 raise NotImplementedError 

37 

38 

39class TokenParser(Parser): 

40 """ 

41 Args: 

42 cs: An instance of cstruct. 

43 compiled: Whether structs should be compiled or not. 

44 """ 

45 

46 def __init__(self, cs: cstruct, compiled: bool = True, align: bool = False): 

47 super().__init__(cs) 

48 

49 self.compiled = compiled 

50 self.align = align 

51 self.TOK = self._tokencollection() 

52 self._conditionals = [] 

53 self._conditionals_depth = 0 

54 

55 @staticmethod 

56 def _tokencollection() -> TokenCollection: 

57 TOK = TokenCollection() 

58 TOK.add(r"#\[(?P<values>[^\]]+)\](?=\s*)", "CONFIG_FLAG") 

59 TOK.add(r"#define\s+(?P<name>[^\s]+)(?P<value>[^\r\n]*)", "DEFINE") 

60 TOK.add(r"#undef\s+(?P<name>[^\s]+)\s*", "UNDEF") 

61 TOK.add(r"#ifdef\s+(?P<name>[^\s]+)\s*", "IFDEF") 

62 TOK.add(r"#ifndef\s+(?P<name>[^\s]+)\s*", "IFNDEF") 

63 TOK.add(r"#else\s*", "ELSE") 

64 TOK.add(r"#endif\s*", "ENDIF") 

65 TOK.add(r"typedef(?=\s)", "TYPEDEF") 

66 TOK.add(r"(?:struct|union)(?=\s|{)", "STRUCT") 

67 TOK.add( 

68 r"(?P<enumtype>enum|flag)\s+(?P<name>[^\s:{]+)?\s*(:\s" 

69 r"*(?P<type>[^{]+?)\s*)?\{(?P<values>[^}]+)\}\s*(?=;)", 

70 "ENUM", 

71 ) 

72 TOK.add(r"(?<=})\s*(?P<defs>(?:[a-zA-Z0-9_]+\s*,\s*)+[a-zA-Z0-9_]+)\s*(?=;)", "DEFS") 

73 TOK.add(r"(?P<name>\**?\s*[a-zA-Z0-9_]+)(?:\s*:\s*(?P<bits>\d+))?(?:\[(?P<count>[^;]*)\])?\s*(?=;)", "NAME") 

74 TOK.add(r"#include\s+(?P<name>[^\s]+)\s*", "INCLUDE") 

75 TOK.add(r"[a-zA-Z_][a-zA-Z0-9_]*", "IDENTIFIER") 

76 TOK.add(r"[{}]", "BLOCK") 

77 TOK.add(r"\$(?P<name>[^\s]+) = (?P<value>{[^}]+})\w*[\r\n]+", "LOOKUP") 

78 TOK.add(r";", "EOL") 

79 TOK.add(r"\s+", None) 

80 TOK.add(r".", None) 

81 

82 return TOK 

83 

84 def _identifier(self, tokens: TokenConsumer) -> str: 

85 idents = [] 

86 while tokens.next == self.TOK.IDENTIFIER: 

87 idents.append(tokens.consume()) 

88 return " ".join([i.value for i in idents]) 

89 

90 def _conditional(self, tokens: TokenConsumer) -> None: 

91 token = tokens.consume() 

92 pattern = self.TOK.patterns[token.token] 

93 match = pattern.match(token.value).groupdict() 

94 

95 value = match["name"] 

96 

97 if token.token == self.TOK.IFDEF: 

98 self._conditionals.append(value in self.cstruct.consts) 

99 elif token.token == self.TOK.IFNDEF: 

100 self._conditionals.append(value not in self.cstruct.consts) 

101 

102 def _check_conditional(self, tokens: TokenConsumer) -> bool: 

103 """Check and handle conditionals. Return a boolean indicating if we need to continue to the next token.""" 

104 if self._conditionals and self._conditionals_depth == len(self._conditionals): 

105 # If we have a conditional and the depth matches, handle it accordingly 

106 if tokens.next == self.TOK.ELSE: 

107 # Flip the last conditional 

108 tokens.consume() 

109 self._conditionals[-1] = not self._conditionals[-1] 

110 return True 

111 

112 if tokens.next == self.TOK.ENDIF: 

113 # Pop the last conditional 

114 tokens.consume() 

115 self._conditionals.pop() 

116 self._conditionals_depth -= 1 

117 return True 

118 

119 if tokens.next in (self.TOK.IFDEF, self.TOK.IFNDEF): 

120 # If we encounter a new conditional, increase the depth 

121 self._conditionals_depth += 1 

122 

123 if tokens.next == self.TOK.ENDIF: 

124 # Similarly, decrease the depth if needed 

125 self._conditionals_depth -= 1 

126 

127 if self._conditionals and not self._conditionals[-1]: 

128 # If the last conditional evaluated to False, skip the next token 

129 tokens.consume() 

130 return True 

131 

132 if tokens.next in (self.TOK.IFDEF, self.TOK.IFNDEF): 

133 # If the next token is a conditional, process it 

134 self._conditional(tokens) 

135 return True 

136 

137 return False 

138 

139 def _constant(self, tokens: TokenConsumer) -> None: 

140 const = tokens.consume() 

141 pattern = self.TOK.patterns[self.TOK.DEFINE] 

142 match = pattern.match(const.value).groupdict() 

143 

144 value = match["value"].strip() 

145 try: 

146 value = ast.literal_eval(value) 

147 except (ValueError, SyntaxError): 

148 pass 

149 

150 if isinstance(value, str): 

151 try: 

152 value = Expression(value).evaluate(self.cstruct) 

153 except (ExpressionParserError, ExpressionTokenizerError): 

154 pass 

155 

156 self.cstruct.consts[match["name"]] = value 

157 

158 def _undef(self, tokens: TokenConsumer) -> None: 

159 const = tokens.consume() 

160 pattern = self.TOK.patterns[self.TOK.UNDEF] 

161 match = pattern.match(const.value).groupdict() 

162 

163 if match["name"] in self.cstruct.consts: 

164 del self.cstruct.consts[match["name"]] 

165 else: 

166 raise ParserError(f"line {self._lineno(const)}: constant {match['name']!r} not defined") 

167 

168 def _enum(self, tokens: TokenConsumer) -> None: 

169 # We cheat with enums because the entire enum is in the token 

170 etok = tokens.consume() 

171 

172 pattern = self.TOK.patterns[self.TOK.ENUM] 

173 # Dirty trick because the regex expects a ; but we don't want it to be part of the value 

174 d = pattern.match(etok.value + ";").groupdict() 

175 enumtype = d["enumtype"] 

176 

177 nextval = 0 

178 if enumtype == "flag": 

179 nextval = 1 

180 

181 values = {} 

182 for line in d["values"].splitlines(): 

183 for v in line.split(","): 

184 key, _, val = v.partition("=") 

185 key = key.strip() 

186 val = val.strip() 

187 if not key: 

188 continue 

189 

190 val = nextval if not val else Expression(val).evaluate(self.cstruct, values) 

191 

192 if enumtype == "flag": 

193 high_bit = val.bit_length() - 1 

194 nextval = 2 ** (high_bit + 1) 

195 else: 

196 nextval = val + 1 

197 

198 values[key] = val 

199 

200 if not d["type"]: 

201 d["type"] = "uint32" 

202 

203 factory = self.cstruct._make_flag if enumtype == "flag" else self.cstruct._make_enum 

204 

205 enum = factory(d["name"] or "", self.cstruct.resolve(d["type"]), values) 

206 if not enum.__name__: 

207 self.cstruct.consts.update(enum.__members__) 

208 else: 

209 self.cstruct.add_type(enum.__name__, enum) 

210 

211 tokens.eol() 

212 

213 def _typedef(self, tokens: TokenConsumer) -> None: 

214 tokens.consume() 

215 type_ = None 

216 

217 names = [] 

218 

219 if tokens.next == self.TOK.IDENTIFIER: 

220 type_ = self.cstruct.resolve(self._identifier(tokens)) 

221 elif tokens.next == self.TOK.STRUCT: 

222 type_ = self._struct(tokens) 

223 if not type_.__anonymous__: 

224 names.append(type_.__name__) 

225 

226 names.extend(self._names(tokens)) 

227 for name in names: 

228 if issubclass(type_, Structure) and type_.__anonymous__: 

229 type_.__anonymous__ = False 

230 type_.__name__ = name 

231 type_.__qualname__ = name 

232 

233 type_, name, bits = self._parse_field_type(type_, name) 

234 if bits is not None: 

235 raise ParserError(f"line {self._lineno(tokens.previous)}: typedefs cannot have bitfields") 

236 self.cstruct.add_type(name, type_) 

237 

238 def _struct(self, tokens: TokenConsumer, register: bool = False) -> type[Structure]: 

239 stype = tokens.consume() 

240 

241 factory = self.cstruct._make_union if stype.value.startswith("union") else self.cstruct._make_struct 

242 

243 st = None 

244 names = [] 

245 registered = False 

246 

247 if tokens.next == self.TOK.IDENTIFIER: 

248 ident = tokens.consume() 

249 if register: 

250 # Pre-register an empty struct for self-referencing 

251 # We update this instance later with the fields 

252 st = factory(ident.value, [], align=self.align) 

253 if self.compiled and "nocompile" not in tokens.flags: 

254 st = compiler.compile(st) 

255 self.cstruct.add_type(ident.value, st) 

256 registered = True 

257 else: 

258 names.append(ident.value) 

259 

260 if tokens.next == self.TOK.NAME: 

261 # As part of a struct field 

262 # struct type_name field_name; 

263 if not names: 

264 raise ParserError(f"line {self._lineno(tokens.next)}: unexpected anonymous struct") 

265 return self.cstruct.resolve(names[0]) 

266 

267 if tokens.next != self.TOK.BLOCK: 

268 raise ParserError(f"line {self._lineno(tokens.next)}: expected start of block '{tokens.next}'") 

269 

270 fields = [] 

271 tokens.consume() 

272 while len(tokens): 

273 if tokens.next == self.TOK.BLOCK and tokens.next.value == "}": 

274 tokens.consume() 

275 break 

276 

277 if self._check_conditional(tokens): 

278 continue 

279 

280 field = self._parse_field(tokens) 

281 fields.append(field) 

282 

283 if register: 

284 names.extend(self._names(tokens)) 

285 

286 # If the next token is EOL, consume it 

287 # Otherwise we're part of a typedef or field definition 

288 if tokens.next == self.TOK.EOL: 

289 tokens.eol() 

290 

291 name = names[0] if names else None 

292 

293 if st is None: 

294 is_anonymous = False 

295 if not name: 

296 is_anonymous = True 

297 name = self.cstruct._next_anonymous() 

298 

299 st = factory(name, fields, align=self.align, anonymous=is_anonymous) 

300 if self.compiled and "nocompile" not in tokens.flags: 

301 st = compiler.compile(st) 

302 else: 

303 st.__fields__.extend(fields) 

304 st.commit() 

305 

306 # This is pretty dirty 

307 if register: 

308 if not names and not registered: 

309 raise ParserError(f"line {self._lineno(stype)}: struct has no name") 

310 

311 for name in names: 

312 self.cstruct.add_type(name, st) 

313 

314 tokens.reset_flags() 

315 return st 

316 

317 def _lookup(self, tokens: TokenConsumer) -> None: 

318 # Just like enums, we cheat and have the entire lookup in the token 

319 ltok = tokens.consume() 

320 

321 pattern = self.TOK.patterns[self.TOK.LOOKUP] 

322 # Dirty trick because the regex expects a ; but we don't want it to be part of the value 

323 m = pattern.match(ltok.value + ";") 

324 d = ast.literal_eval(m.group(2)) 

325 self.cstruct.lookups[m.group(1)] = {self.cstruct.consts[k]: v for k, v in d.items()} 

326 

327 def _parse_field(self, tokens: TokenConsumer) -> Field: 

328 type_ = None 

329 if tokens.next == self.TOK.IDENTIFIER: 

330 type_ = self.cstruct.resolve(self._identifier(tokens)) 

331 elif tokens.next == self.TOK.STRUCT: 

332 type_ = self._struct(tokens) 

333 

334 if tokens.next != self.TOK.NAME: 

335 return Field(None, type_, None) 

336 

337 if tokens.next != self.TOK.NAME: 

338 raise ParserError(f"line {self._lineno(tokens.next)}: expected name, got {tokens.next!r}") 

339 nametok = tokens.consume() 

340 

341 type_, name, bits = self._parse_field_type(type_, nametok.value) 

342 

343 tokens.eol() 

344 return Field(name.strip(), type_, bits) 

345 

346 def _parse_field_type(self, type_: type[BaseType], name: str) -> tuple[type[BaseType], str, int | None]: 

347 pattern = self.TOK.patterns[self.TOK.NAME] 

348 # Dirty trick because the regex expects a ; but we don't want it to be part of the value 

349 d = pattern.match(name + ";").groupdict() 

350 

351 name = d["name"] 

352 count_expression = d["count"] 

353 

354 while name.startswith("*"): 

355 name = name[1:] 

356 type_ = self.cstruct._make_pointer(type_) 

357 

358 if count_expression is not None: 

359 # Poor mans multi-dimensional array by abusing the eager regex match of count 

360 counts = count_expression.split("][") if "][" in count_expression else [count_expression] 

361 

362 for count in reversed(counts): 

363 if count == "": 

364 count = None 

365 else: 

366 count = Expression(count) 

367 try: 

368 count = count.evaluate(self.cstruct) 

369 except Exception: 

370 pass 

371 

372 if issubclass(type_, BaseArray) and count is None: 

373 raise ParserError("Depth required for multi-dimensional array") 

374 

375 type_ = self.cstruct._make_array(type_, count) 

376 

377 return type_, name.strip(), int(d["bits"]) if d["bits"] else None 

378 

379 def _names(self, tokens: TokenConsumer) -> list[str]: 

380 names = [] 

381 while True: 

382 if tokens.next == self.TOK.EOL: 

383 tokens.eol() 

384 break 

385 

386 if tokens.next not in (self.TOK.NAME, self.TOK.DEFS, self.TOK.IDENTIFIER): 

387 break 

388 

389 ntoken = tokens.consume() 

390 if ntoken in (self.TOK.NAME, self.TOK.IDENTIFIER): 

391 names.append(ntoken.value.strip()) 

392 elif ntoken == self.TOK.DEFS: 

393 names.extend([name.strip() for name in ntoken.value.strip().split(",")]) 

394 

395 return names 

396 

397 def _include(self, tokens: TokenConsumer) -> None: 

398 include = tokens.consume() 

399 pattern = self.TOK.patterns[self.TOK.INCLUDE] 

400 match = pattern.match(include.value).groupdict() 

401 

402 self.cstruct.includes.append(match["name"].strip().strip("'\"")) 

403 

404 @staticmethod 

405 def _remove_comments(string: str) -> str: 

406 # https://stackoverflow.com/a/18381470 

407 pattern = r"(\".*?\"|\'.*?\')|(/\*.*?\*/|//[^\r\n]*$)" 

408 # first group captures quoted strings (double or single) 

409 # second group captures comments (//single-line or /* multi-line */) 

410 regex = re.compile(pattern, re.MULTILINE | re.DOTALL) 

411 

412 def _replacer(match: re.Match) -> str: 

413 # if the 2nd group (capturing comments) is not None, 

414 # it means we have captured a non-quoted (real) comment string. 

415 if comment := match.group(2): 

416 return "\n" * comment.count("\n") # so we will return empty to remove the comment 

417 # otherwise, we will return the 1st group 

418 return match.group(1) # captured quoted-string 

419 

420 return regex.sub(_replacer, string) 

421 

422 @staticmethod 

423 def _lineno(tok: Token) -> int: 

424 """Quick and dirty line number calculator""" 

425 

426 match = tok.match 

427 return match.string.count("\n", 0, match.start()) + 1 

428 

429 def _config_flag(self, tokens: TokenConsumer) -> None: 

430 flag_token = tokens.consume() 

431 pattern = self.TOK.patterns[self.TOK.CONFIG_FLAG] 

432 tok_dict = pattern.match(flag_token.value).groupdict() 

433 tokens.flags.extend(tok_dict["values"].split(",")) 

434 

435 def parse(self, data: str) -> None: 

436 scanner = re.Scanner(self.TOK.tokens) 

437 data = self._remove_comments(data) 

438 tokens, remaining = scanner.scan(data) 

439 

440 if len(remaining): 

441 lineno = data.count("\n", 0, len(data) - len(remaining)) 

442 raise ParserError(f"line {lineno}: invalid syntax in definition") 

443 

444 tokens = TokenConsumer(tokens) 

445 while True: 

446 token = tokens.next 

447 if token is None: 

448 break 

449 

450 if self._check_conditional(tokens): 

451 continue 

452 

453 if token == self.TOK.CONFIG_FLAG: 

454 self._config_flag(tokens) 

455 elif token == self.TOK.DEFINE: 

456 self._constant(tokens) 

457 elif token == self.TOK.UNDEF: 

458 self._undef(tokens) 

459 elif token == self.TOK.TYPEDEF: 

460 self._typedef(tokens) 

461 elif token == self.TOK.STRUCT: 

462 self._struct(tokens, register=True) 

463 elif token == self.TOK.ENUM: 

464 self._enum(tokens) 

465 elif token == self.TOK.LOOKUP: 

466 self._lookup(tokens) 

467 elif token == self.TOK.INCLUDE: 

468 self._include(tokens) 

469 else: 

470 raise ParserError(f"line {self._lineno(token)}: unexpected token {token!r}") 

471 

472 if self._conditionals: 

473 raise ParserError(f"line {self._lineno(tokens.previous)}: unclosed conditional statement") 

474 

475 

476class CStyleParser(Parser): 

477 """Definition parser for C-like structure syntax. 

478 

479 Args: 

480 cs: An instance of cstruct 

481 compiled: Whether structs should be compiled or not. 

482 """ 

483 

484 def __init__(self, cs: cstruct, compiled: bool = True): 

485 self.compiled = compiled 

486 super().__init__(cs) 

487 

488 def _constants(self, data: str) -> None: 

489 r = re.finditer(r"#define\s+(?P<name>[^\s]+)\s+(?P<value>[^\r\n]+)\s*\n", data) 

490 for t in r: 

491 d = t.groupdict() 

492 v = d["value"].rsplit("//")[0] 

493 

494 try: 

495 v = ast.literal_eval(v) 

496 except (ValueError, SyntaxError): 

497 pass 

498 

499 self.cstruct.consts[d["name"]] = v 

500 

501 def _enums(self, data: str) -> None: 

502 r = re.finditer( 

503 r"(?P<enumtype>enum|flag)\s+(?P<name>[^\s:{]+)\s*(:\s*(?P<type>[^\s]+)\s*)?\{(?P<values>[^}]+)\}\s*;", 

504 data, 

505 ) 

506 for t in r: 

507 d = t.groupdict() 

508 enumtype = d["enumtype"] 

509 

510 nextval = 0 

511 if enumtype == "flag": 

512 nextval = 1 

513 

514 values = {} 

515 for line in d["values"].split("\n"): 

516 line, _, _ = line.partition("//") 

517 for v in line.split(","): 

518 key, _, val = v.partition("=") 

519 key = key.strip() 

520 val = val.strip() 

521 if not key: 

522 continue 

523 

524 val = nextval if not val else Expression(val).evaluate(self.cstruct) 

525 

526 if enumtype == "flag": 

527 high_bit = val.bit_length() - 1 

528 nextval = 2 ** (high_bit + 1) 

529 else: 

530 nextval = val + 1 

531 

532 values[key] = val 

533 

534 if not d["type"]: 

535 d["type"] = "uint32" 

536 

537 factory = self.cstruct._make_enum 

538 if enumtype == "flag": 

539 factory = self.cstruct._make_flag 

540 

541 enum = factory(d["name"], self.cstruct.resolve(d["type"]), values) 

542 self.cstruct.add_type(enum.__name__, enum) 

543 

544 def _structs(self, data: str) -> None: 

545 r = re.finditer( 

546 r"(#(?P<flags>(?:compile))\s+)?" 

547 r"((?P<typedef>typedef)\s+)?" 

548 r"(?P<type>[^\s]+)\s+" 

549 r"(?P<name>[^\s]+)?" 

550 r"(?P<fields>" 

551 r"\s*{[^}]+\}(?P<defs>\s+[^;\n]+)?" 

552 r")?\s*;", 

553 data, 

554 ) 

555 for t in r: 

556 d = t.groupdict() 

557 

558 if d["name"]: 

559 name = d["name"] 

560 elif d["defs"]: 

561 name = d["defs"].strip().split(",")[0].strip() 

562 else: 

563 raise ParserError("No name for struct") 

564 

565 if d["type"] == "struct": 

566 data = self._parse_fields(d["fields"][1:-1].strip()) 

567 st = self.cstruct._make_struct(name, data) 

568 if d["flags"] == "compile" or self.compiled: 

569 st = compiler.compile(st) 

570 elif d["typedef"] == "typedef": 

571 st = d["type"] 

572 else: 

573 continue 

574 

575 if d["name"]: 

576 self.cstruct.add_type(d["name"], st) 

577 

578 if d["defs"]: 

579 for td in d["defs"].strip().split(","): 

580 td = td.strip() 

581 self.cstruct.add_type(td, st) 

582 

583 def _parse_fields(self, data: str) -> None: 

584 fields = re.finditer( 

585 r"(?P<type>[^\s]+)\s+(?P<name>[^\s\[:]+)(:(?P<bits>\d+))?(\[(?P<count>[^;\n]*)\])?;", 

586 data, 

587 ) 

588 

589 result = [] 

590 for f in fields: 

591 d = f.groupdict() 

592 if d["type"].startswith("//"): 

593 continue 

594 

595 type_ = self.cstruct.resolve(d["type"]) 

596 

597 d["name"] = d["name"].replace("(", "").replace(")", "") 

598 

599 # Maybe reimplement lazy type references later 

600 # _type = TypeReference(self, d['type']) 

601 if d["count"] is not None: 

602 if d["count"] == "": 

603 count = None 

604 else: 

605 count = Expression(d["count"]) 

606 try: 

607 count = count.evaluate(self.cstruct) 

608 except Exception: 

609 pass 

610 

611 type_ = self.cstruct._make_array(type_, count) 

612 

613 if d["name"].startswith("*"): 

614 d["name"] = d["name"][1:] 

615 type_ = self.cstruct._make_pointer(type_) 

616 

617 field = Field(d["name"], type_, int(d["bits"]) if d["bits"] else None) 

618 result.append(field) 

619 

620 return result 

621 

622 def _lookups(self, data: str, consts: dict[str, int]) -> None: 

623 r = re.finditer(r"\$(?P<name>[^\s]+) = ({[^}]+})\w*\n", data) 

624 

625 for t in r: 

626 d = ast.literal_eval(t.group(2)) 

627 self.cstruct.lookups[t.group(1)] = {self.cstruct.consts[k]: v for k, v in d.items()} 

628 

629 def parse(self, data: str) -> None: 

630 self._constants(data) 

631 self._enums(data) 

632 self._structs(data) 

633 self._lookups(data, self.cstruct.consts) 

634 

635 

636class Token: 

637 __slots__ = ("match", "token", "value") 

638 

639 def __init__(self, token: str, value: str, match: re.Match): 

640 self.token = token 

641 self.value = value 

642 self.match = match 

643 

644 def __eq__(self, other: object) -> bool: 

645 if isinstance(other, Token): 

646 other = other.token 

647 

648 return self.token == other 

649 

650 def __ne__(self, other: object) -> bool: 

651 return not self == other 

652 

653 def __repr__(self) -> str: 

654 return f"<Token.{self.token} value={self.value!r}>" 

655 

656 

657class TokenCollection: 

658 def __init__(self): 

659 self.tokens: list[Token] = [] 

660 self.lookup: dict[str, str] = {} 

661 self.patterns: dict[str, re.Pattern] = {} 

662 

663 def __getattr__(self, attr: str) -> str | Any: 

664 try: 

665 return self.lookup[attr] 

666 except AttributeError: 

667 pass 

668 

669 return object.__getattribute__(self, attr) 

670 

671 def add(self, regex: str, name: str | None) -> None: 

672 if name is None: 

673 self.tokens.append((regex, None)) 

674 else: 

675 self.lookup[name] = name 

676 self.patterns[name] = re.compile(regex) 

677 self.tokens.append((regex, lambda s, t: Token(name, t, s.match))) 

678 

679 

680class TokenConsumer: 

681 def __init__(self, tokens: list[Token]): 

682 self.tokens = tokens 

683 self.flags = [] 

684 self.previous = None 

685 

686 def __contains__(self, token: Token) -> bool: 

687 return token in self.tokens 

688 

689 def __len__(self) -> int: 

690 return len(self.tokens) 

691 

692 def __repr__(self) -> str: 

693 return f"<TokenConsumer next={self.next!r}>" 

694 

695 @property 

696 def next(self) -> Token: 

697 try: 

698 return self.tokens[0] 

699 except IndexError: 

700 return None 

701 

702 def consume(self) -> Token: 

703 self.previous = self.tokens.pop(0) 

704 return self.previous 

705 

706 def reset_flags(self) -> None: 

707 self.flags = [] 

708 

709 def eol(self) -> None: 

710 token = self.consume() 

711 if token.token != "EOL": 

712 raise ParserError(f"line {self._lineno(token)}: expected EOL")