Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/tokenizer.py: 74%

330 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-02 06:07 +0000

1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 

2 

3# Copyright (C) 2003-2017 Nominum, Inc. 

4# 

5# Permission to use, copy, modify, and distribute this software and its 

6# documentation for any purpose with or without fee is hereby granted, 

7# provided that the above copyright notice and this permission notice 

8# appear in all copies. 

9# 

10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 

11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 

12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 

13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 

14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 

15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 

16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 

17 

18"""Tokenize DNS zone file format""" 

19 

20import io 

21import sys 

22from typing import Any, List, Optional, Tuple 

23 

24import dns.exception 

25import dns.name 

26import dns.ttl 

27 

28_DELIMITERS = {" ", "\t", "\n", ";", "(", ")", '"'} 

29_QUOTING_DELIMITERS = {'"'} 

30 

31EOF = 0 

32EOL = 1 

33WHITESPACE = 2 

34IDENTIFIER = 3 

35QUOTED_STRING = 4 

36COMMENT = 5 

37DELIMITER = 6 

38 

39 

40class UngetBufferFull(dns.exception.DNSException): 

41 """An attempt was made to unget a token when the unget buffer was full.""" 

42 

43 

44class Token: 

45 """A DNS zone file format token. 

46 

47 ttype: The token type 

48 value: The token value 

49 has_escape: Does the token value contain escapes? 

50 """ 

51 

52 def __init__( 

53 self, 

54 ttype: int, 

55 value: Any = "", 

56 has_escape: bool = False, 

57 comment: Optional[str] = None, 

58 ): 

59 """Initialize a token instance.""" 

60 

61 self.ttype = ttype 

62 self.value = value 

63 self.has_escape = has_escape 

64 self.comment = comment 

65 

66 def is_eof(self) -> bool: 

67 return self.ttype == EOF 

68 

69 def is_eol(self) -> bool: 

70 return self.ttype == EOL 

71 

72 def is_whitespace(self) -> bool: 

73 return self.ttype == WHITESPACE 

74 

75 def is_identifier(self) -> bool: 

76 return self.ttype == IDENTIFIER 

77 

78 def is_quoted_string(self) -> bool: 

79 return self.ttype == QUOTED_STRING 

80 

81 def is_comment(self) -> bool: 

82 return self.ttype == COMMENT 

83 

84 def is_delimiter(self) -> bool: # pragma: no cover (we don't return delimiters yet) 

85 return self.ttype == DELIMITER 

86 

87 def is_eol_or_eof(self) -> bool: 

88 return self.ttype == EOL or self.ttype == EOF 

89 

90 def __eq__(self, other): 

91 if not isinstance(other, Token): 

92 return False 

93 return self.ttype == other.ttype and self.value == other.value 

94 

95 def __ne__(self, other): 

96 if not isinstance(other, Token): 

97 return True 

98 return self.ttype != other.ttype or self.value != other.value 

99 

100 def __str__(self): 

101 return '%d "%s"' % (self.ttype, self.value) 

102 

103 def unescape(self) -> "Token": 

104 if not self.has_escape: 

105 return self 

106 unescaped = "" 

107 l = len(self.value) 

108 i = 0 

109 while i < l: 

110 c = self.value[i] 

111 i += 1 

112 if c == "\\": 

113 if i >= l: # pragma: no cover (can't happen via get()) 

114 raise dns.exception.UnexpectedEnd 

115 c = self.value[i] 

116 i += 1 

117 if c.isdigit(): 

118 if i >= l: 

119 raise dns.exception.UnexpectedEnd 

120 c2 = self.value[i] 

121 i += 1 

122 if i >= l: 

123 raise dns.exception.UnexpectedEnd 

124 c3 = self.value[i] 

125 i += 1 

126 if not (c2.isdigit() and c3.isdigit()): 

127 raise dns.exception.SyntaxError 

128 codepoint = int(c) * 100 + int(c2) * 10 + int(c3) 

129 if codepoint > 255: 

130 raise dns.exception.SyntaxError 

131 c = chr(codepoint) 

132 unescaped += c 

133 return Token(self.ttype, unescaped) 

134 

135 def unescape_to_bytes(self) -> "Token": 

136 # We used to use unescape() for TXT-like records, but this 

137 # caused problems as we'd process DNS escapes into Unicode code 

138 # points instead of byte values, and then a to_text() of the 

139 # processed data would not equal the original input. For 

140 # example, \226 in the TXT record would have a to_text() of 

141 # \195\162 because we applied UTF-8 encoding to Unicode code 

142 # point 226. 

143 # 

144 # We now apply escapes while converting directly to bytes, 

145 # avoiding this double encoding. 

146 # 

147 # This code also handles cases where the unicode input has 

148 # non-ASCII code-points in it by converting it to UTF-8. TXT 

149 # records aren't defined for Unicode, but this is the best we 

150 # can do to preserve meaning. For example, 

151 # 

152 # foo\u200bbar 

153 # 

154 # (where \u200b is Unicode code point 0x200b) will be treated 

155 # as if the input had been the UTF-8 encoding of that string, 

156 # namely: 

157 # 

158 # foo\226\128\139bar 

159 # 

160 unescaped = b"" 

161 l = len(self.value) 

162 i = 0 

163 while i < l: 

164 c = self.value[i] 

165 i += 1 

166 if c == "\\": 

167 if i >= l: # pragma: no cover (can't happen via get()) 

168 raise dns.exception.UnexpectedEnd 

169 c = self.value[i] 

170 i += 1 

171 if c.isdigit(): 

172 if i >= l: 

173 raise dns.exception.UnexpectedEnd 

174 c2 = self.value[i] 

175 i += 1 

176 if i >= l: 

177 raise dns.exception.UnexpectedEnd 

178 c3 = self.value[i] 

179 i += 1 

180 if not (c2.isdigit() and c3.isdigit()): 

181 raise dns.exception.SyntaxError 

182 codepoint = int(c) * 100 + int(c2) * 10 + int(c3) 

183 if codepoint > 255: 

184 raise dns.exception.SyntaxError 

185 unescaped += b"%c" % (codepoint) 

186 else: 

187 # Note that as mentioned above, if c is a Unicode 

188 # code point outside of the ASCII range, then this 

189 # += is converting that code point to its UTF-8 

190 # encoding and appending multiple bytes to 

191 # unescaped. 

192 unescaped += c.encode() 

193 else: 

194 unescaped += c.encode() 

195 return Token(self.ttype, bytes(unescaped)) 

196 

197 

198class Tokenizer: 

199 """A DNS zone file format tokenizer. 

200 

201 A token object is basically a (type, value) tuple. The valid 

202 types are EOF, EOL, WHITESPACE, IDENTIFIER, QUOTED_STRING, 

203 COMMENT, and DELIMITER. 

204 

205 file: The file to tokenize 

206 

207 ungotten_char: The most recently ungotten character, or None. 

208 

209 ungotten_token: The most recently ungotten token, or None. 

210 

211 multiline: The current multiline level. This value is increased 

212 by one every time a '(' delimiter is read, and decreased by one every time 

213 a ')' delimiter is read. 

214 

215 quoting: This variable is true if the tokenizer is currently 

216 reading a quoted string. 

217 

218 eof: This variable is true if the tokenizer has encountered EOF. 

219 

220 delimiters: The current delimiter dictionary. 

221 

222 line_number: The current line number 

223 

224 filename: A filename that will be returned by the where() method. 

225 

226 idna_codec: A dns.name.IDNACodec, specifies the IDNA 

227 encoder/decoder. If None, the default IDNA 2003 

228 encoder/decoder is used. 

229 """ 

230 

231 def __init__( 

232 self, 

233 f: Any = sys.stdin, 

234 filename: Optional[str] = None, 

235 idna_codec: Optional[dns.name.IDNACodec] = None, 

236 ): 

237 """Initialize a tokenizer instance. 

238 

239 f: The file to tokenize. The default is sys.stdin. 

240 This parameter may also be a string, in which case the tokenizer 

241 will take its input from the contents of the string. 

242 

243 filename: the name of the filename that the where() method 

244 will return. 

245 

246 idna_codec: A dns.name.IDNACodec, specifies the IDNA 

247 encoder/decoder. If None, the default IDNA 2003 

248 encoder/decoder is used. 

249 """ 

250 

251 if isinstance(f, str): 

252 f = io.StringIO(f) 

253 if filename is None: 

254 filename = "<string>" 

255 elif isinstance(f, bytes): 

256 f = io.StringIO(f.decode()) 

257 if filename is None: 

258 filename = "<string>" 

259 else: 

260 if filename is None: 

261 if f is sys.stdin: 

262 filename = "<stdin>" 

263 else: 

264 filename = "<file>" 

265 self.file = f 

266 self.ungotten_char: Optional[str] = None 

267 self.ungotten_token: Optional[Token] = None 

268 self.multiline = 0 

269 self.quoting = False 

270 self.eof = False 

271 self.delimiters = _DELIMITERS 

272 self.line_number = 1 

273 assert filename is not None 

274 self.filename = filename 

275 if idna_codec is None: 

276 self.idna_codec: dns.name.IDNACodec = dns.name.IDNA_2003 

277 else: 

278 self.idna_codec = idna_codec 

279 

280 def _get_char(self) -> str: 

281 """Read a character from input.""" 

282 

283 if self.ungotten_char is None: 

284 if self.eof: 

285 c = "" 

286 else: 

287 c = self.file.read(1) 

288 if c == "": 

289 self.eof = True 

290 elif c == "\n": 

291 self.line_number += 1 

292 else: 

293 c = self.ungotten_char 

294 self.ungotten_char = None 

295 return c 

296 

297 def where(self) -> Tuple[str, int]: 

298 """Return the current location in the input. 

299 

300 Returns a (string, int) tuple. The first item is the filename of 

301 the input, the second is the current line number. 

302 """ 

303 

304 return (self.filename, self.line_number) 

305 

306 def _unget_char(self, c: str) -> None: 

307 """Unget a character. 

308 

309 The unget buffer for characters is only one character large; it is 

310 an error to try to unget a character when the unget buffer is not 

311 empty. 

312 

313 c: the character to unget 

314 raises UngetBufferFull: there is already an ungotten char 

315 """ 

316 

317 if self.ungotten_char is not None: 

318 # this should never happen! 

319 raise UngetBufferFull # pragma: no cover 

320 self.ungotten_char = c 

321 

322 def skip_whitespace(self) -> int: 

323 """Consume input until a non-whitespace character is encountered. 

324 

325 The non-whitespace character is then ungotten, and the number of 

326 whitespace characters consumed is returned. 

327 

328 If the tokenizer is in multiline mode, then newlines are whitespace. 

329 

330 Returns the number of characters skipped. 

331 """ 

332 

333 skipped = 0 

334 while True: 

335 c = self._get_char() 

336 if c != " " and c != "\t": 

337 if (c != "\n") or not self.multiline: 

338 self._unget_char(c) 

339 return skipped 

340 skipped += 1 

341 

342 def get(self, want_leading: bool = False, want_comment: bool = False) -> Token: 

343 """Get the next token. 

344 

345 want_leading: If True, return a WHITESPACE token if the 

346 first character read is whitespace. The default is False. 

347 

348 want_comment: If True, return a COMMENT token if the 

349 first token read is a comment. The default is False. 

350 

351 Raises dns.exception.UnexpectedEnd: input ended prematurely 

352 

353 Raises dns.exception.SyntaxError: input was badly formed 

354 

355 Returns a Token. 

356 """ 

357 

358 if self.ungotten_token is not None: 

359 utoken = self.ungotten_token 

360 self.ungotten_token = None 

361 if utoken.is_whitespace(): 

362 if want_leading: 

363 return utoken 

364 elif utoken.is_comment(): 

365 if want_comment: 

366 return utoken 

367 else: 

368 return utoken 

369 skipped = self.skip_whitespace() 

370 if want_leading and skipped > 0: 

371 return Token(WHITESPACE, " ") 

372 token = "" 

373 ttype = IDENTIFIER 

374 has_escape = False 

375 while True: 

376 c = self._get_char() 

377 if c == "" or c in self.delimiters: 

378 if c == "" and self.quoting: 

379 raise dns.exception.UnexpectedEnd 

380 if token == "" and ttype != QUOTED_STRING: 

381 if c == "(": 

382 self.multiline += 1 

383 self.skip_whitespace() 

384 continue 

385 elif c == ")": 

386 if self.multiline <= 0: 

387 raise dns.exception.SyntaxError 

388 self.multiline -= 1 

389 self.skip_whitespace() 

390 continue 

391 elif c == '"': 

392 if not self.quoting: 

393 self.quoting = True 

394 self.delimiters = _QUOTING_DELIMITERS 

395 ttype = QUOTED_STRING 

396 continue 

397 else: 

398 self.quoting = False 

399 self.delimiters = _DELIMITERS 

400 self.skip_whitespace() 

401 continue 

402 elif c == "\n": 

403 return Token(EOL, "\n") 

404 elif c == ";": 

405 while 1: 

406 c = self._get_char() 

407 if c == "\n" or c == "": 

408 break 

409 token += c 

410 if want_comment: 

411 self._unget_char(c) 

412 return Token(COMMENT, token) 

413 elif c == "": 

414 if self.multiline: 

415 raise dns.exception.SyntaxError( 

416 "unbalanced parentheses" 

417 ) 

418 return Token(EOF, comment=token) 

419 elif self.multiline: 

420 self.skip_whitespace() 

421 token = "" 

422 continue 

423 else: 

424 return Token(EOL, "\n", comment=token) 

425 else: 

426 # This code exists in case we ever want a 

427 # delimiter to be returned. It never produces 

428 # a token currently. 

429 token = c 

430 ttype = DELIMITER 

431 else: 

432 self._unget_char(c) 

433 break 

434 elif self.quoting and c == "\n": 

435 raise dns.exception.SyntaxError("newline in quoted string") 

436 elif c == "\\": 

437 # 

438 # It's an escape. Put it and the next character into 

439 # the token; it will be checked later for goodness. 

440 # 

441 token += c 

442 has_escape = True 

443 c = self._get_char() 

444 if c == "" or (c == "\n" and not self.quoting): 

445 raise dns.exception.UnexpectedEnd 

446 token += c 

447 if token == "" and ttype != QUOTED_STRING: 

448 if self.multiline: 

449 raise dns.exception.SyntaxError("unbalanced parentheses") 

450 ttype = EOF 

451 return Token(ttype, token, has_escape) 

452 

453 def unget(self, token: Token) -> None: 

454 """Unget a token. 

455 

456 The unget buffer for tokens is only one token large; it is 

457 an error to try to unget a token when the unget buffer is not 

458 empty. 

459 

460 token: the token to unget 

461 

462 Raises UngetBufferFull: there is already an ungotten token 

463 """ 

464 

465 if self.ungotten_token is not None: 

466 raise UngetBufferFull 

467 self.ungotten_token = token 

468 

469 def next(self): 

470 """Return the next item in an iteration. 

471 

472 Returns a Token. 

473 """ 

474 

475 token = self.get() 

476 if token.is_eof(): 

477 raise StopIteration 

478 return token 

479 

480 __next__ = next 

481 

482 def __iter__(self): 

483 return self 

484 

485 # Helpers 

486 

487 def get_int(self, base: int = 10) -> int: 

488 """Read the next token and interpret it as an unsigned integer. 

489 

490 Raises dns.exception.SyntaxError if not an unsigned integer. 

491 

492 Returns an int. 

493 """ 

494 

495 token = self.get().unescape() 

496 if not token.is_identifier(): 

497 raise dns.exception.SyntaxError("expecting an identifier") 

498 if not token.value.isdigit(): 

499 raise dns.exception.SyntaxError("expecting an integer") 

500 return int(token.value, base) 

501 

502 def get_uint8(self) -> int: 

503 """Read the next token and interpret it as an 8-bit unsigned 

504 integer. 

505 

506 Raises dns.exception.SyntaxError if not an 8-bit unsigned integer. 

507 

508 Returns an int. 

509 """ 

510 

511 value = self.get_int() 

512 if value < 0 or value > 255: 

513 raise dns.exception.SyntaxError( 

514 "%d is not an unsigned 8-bit integer" % value 

515 ) 

516 return value 

517 

518 def get_uint16(self, base: int = 10) -> int: 

519 """Read the next token and interpret it as a 16-bit unsigned 

520 integer. 

521 

522 Raises dns.exception.SyntaxError if not a 16-bit unsigned integer. 

523 

524 Returns an int. 

525 """ 

526 

527 value = self.get_int(base=base) 

528 if value < 0 or value > 65535: 

529 if base == 8: 

530 raise dns.exception.SyntaxError( 

531 "%o is not an octal unsigned 16-bit integer" % value 

532 ) 

533 else: 

534 raise dns.exception.SyntaxError( 

535 "%d is not an unsigned 16-bit integer" % value 

536 ) 

537 return value 

538 

539 def get_uint32(self, base: int = 10) -> int: 

540 """Read the next token and interpret it as a 32-bit unsigned 

541 integer. 

542 

543 Raises dns.exception.SyntaxError if not a 32-bit unsigned integer. 

544 

545 Returns an int. 

546 """ 

547 

548 value = self.get_int(base=base) 

549 if value < 0 or value > 4294967295: 

550 raise dns.exception.SyntaxError( 

551 "%d is not an unsigned 32-bit integer" % value 

552 ) 

553 return value 

554 

555 def get_uint48(self, base: int = 10) -> int: 

556 """Read the next token and interpret it as a 48-bit unsigned 

557 integer. 

558 

559 Raises dns.exception.SyntaxError if not a 48-bit unsigned integer. 

560 

561 Returns an int. 

562 """ 

563 

564 value = self.get_int(base=base) 

565 if value < 0 or value > 281474976710655: 

566 raise dns.exception.SyntaxError( 

567 "%d is not an unsigned 48-bit integer" % value 

568 ) 

569 return value 

570 

571 def get_string(self, max_length: Optional[int] = None) -> str: 

572 """Read the next token and interpret it as a string. 

573 

574 Raises dns.exception.SyntaxError if not a string. 

575 Raises dns.exception.SyntaxError if token value length 

576 exceeds max_length (if specified). 

577 

578 Returns a string. 

579 """ 

580 

581 token = self.get().unescape() 

582 if not (token.is_identifier() or token.is_quoted_string()): 

583 raise dns.exception.SyntaxError("expecting a string") 

584 if max_length and len(token.value) > max_length: 

585 raise dns.exception.SyntaxError("string too long") 

586 return token.value 

587 

588 def get_identifier(self) -> str: 

589 """Read the next token, which should be an identifier. 

590 

591 Raises dns.exception.SyntaxError if not an identifier. 

592 

593 Returns a string. 

594 """ 

595 

596 token = self.get().unescape() 

597 if not token.is_identifier(): 

598 raise dns.exception.SyntaxError("expecting an identifier") 

599 return token.value 

600 

601 def get_remaining(self, max_tokens: Optional[int] = None) -> List[Token]: 

602 """Return the remaining tokens on the line, until an EOL or EOF is seen. 

603 

604 max_tokens: If not None, stop after this number of tokens. 

605 

606 Returns a list of tokens. 

607 """ 

608 

609 tokens = [] 

610 while True: 

611 token = self.get() 

612 if token.is_eol_or_eof(): 

613 self.unget(token) 

614 break 

615 tokens.append(token) 

616 if len(tokens) == max_tokens: 

617 break 

618 return tokens 

619 

620 def concatenate_remaining_identifiers(self, allow_empty: bool = False) -> str: 

621 """Read the remaining tokens on the line, which should be identifiers. 

622 

623 Raises dns.exception.SyntaxError if there are no remaining tokens, 

624 unless `allow_empty=True` is given. 

625 

626 Raises dns.exception.SyntaxError if a token is seen that is not an 

627 identifier. 

628 

629 Returns a string containing a concatenation of the remaining 

630 identifiers. 

631 """ 

632 s = "" 

633 while True: 

634 token = self.get().unescape() 

635 if token.is_eol_or_eof(): 

636 self.unget(token) 

637 break 

638 if not token.is_identifier(): 

639 raise dns.exception.SyntaxError 

640 s += token.value 

641 if not (allow_empty or s): 

642 raise dns.exception.SyntaxError("expecting another identifier") 

643 return s 

644 

645 def as_name( 

646 self, 

647 token: Token, 

648 origin: Optional[dns.name.Name] = None, 

649 relativize: bool = False, 

650 relativize_to: Optional[dns.name.Name] = None, 

651 ) -> dns.name.Name: 

652 """Try to interpret the token as a DNS name. 

653 

654 Raises dns.exception.SyntaxError if not a name. 

655 

656 Returns a dns.name.Name. 

657 """ 

658 if not token.is_identifier(): 

659 raise dns.exception.SyntaxError("expecting an identifier") 

660 name = dns.name.from_text(token.value, origin, self.idna_codec) 

661 return name.choose_relativity(relativize_to or origin, relativize) 

662 

663 def get_name( 

664 self, 

665 origin: Optional[dns.name.Name] = None, 

666 relativize: bool = False, 

667 relativize_to: Optional[dns.name.Name] = None, 

668 ) -> dns.name.Name: 

669 """Read the next token and interpret it as a DNS name. 

670 

671 Raises dns.exception.SyntaxError if not a name. 

672 

673 Returns a dns.name.Name. 

674 """ 

675 

676 token = self.get() 

677 return self.as_name(token, origin, relativize, relativize_to) 

678 

679 def get_eol_as_token(self) -> Token: 

680 """Read the next token and raise an exception if it isn't EOL or 

681 EOF. 

682 

683 Returns a string. 

684 """ 

685 

686 token = self.get() 

687 if not token.is_eol_or_eof(): 

688 raise dns.exception.SyntaxError( 

689 'expected EOL or EOF, got %d "%s"' % (token.ttype, token.value) 

690 ) 

691 return token 

692 

693 def get_eol(self) -> str: 

694 return self.get_eol_as_token().value 

695 

696 def get_ttl(self) -> int: 

697 """Read the next token and interpret it as a DNS TTL. 

698 

699 Raises dns.exception.SyntaxError or dns.ttl.BadTTL if not an 

700 identifier or badly formed. 

701 

702 Returns an int. 

703 """ 

704 

705 token = self.get().unescape() 

706 if not token.is_identifier(): 

707 raise dns.exception.SyntaxError("expecting an identifier") 

708 return dns.ttl.from_text(token.value)