Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dns/tokenizer.py: 51%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
3# Copyright (C) 2003-2017 Nominum, Inc.
4#
5# Permission to use, copy, modify, and distribute this software and its
6# documentation for any purpose with or without fee is hereby granted,
7# provided that the above copyright notice and this permission notice
8# appear in all copies.
9#
10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18"""Tokenize DNS zone file format"""
20import io
21import sys
22from typing import Any
24import dns.exception
25import dns.name
26import dns.ttl
28_DELIMITERS = {" ", "\t", "\n", ";", "(", ")", '"'}
29_QUOTING_DELIMITERS = {'"'}
31EOF = 0
32EOL = 1
33WHITESPACE = 2
34IDENTIFIER = 3
35QUOTED_STRING = 4
36COMMENT = 5
37DELIMITER = 6
40class UngetBufferFull(dns.exception.DNSException):
41 """An attempt was made to unget a token when the unget buffer was full."""
44class Token:
45 """A DNS zone file format token.
47 ttype: The token type
48 value: The token value
49 has_escape: Does the token value contain escapes?
50 """
52 def __init__(
53 self,
54 ttype: int,
55 value: Any = "",
56 has_escape: bool = False,
57 comment: str | None = None,
58 ):
59 """Initialize a token instance."""
61 self.ttype = ttype
62 self.value = value
63 self.has_escape = has_escape
64 self.comment = comment
66 def is_eof(self) -> bool:
67 return self.ttype == EOF
69 def is_eol(self) -> bool:
70 return self.ttype == EOL
72 def is_whitespace(self) -> bool:
73 return self.ttype == WHITESPACE
75 def is_identifier(self) -> bool:
76 return self.ttype == IDENTIFIER
78 def is_quoted_string(self) -> bool:
79 return self.ttype == QUOTED_STRING
81 def is_comment(self) -> bool:
82 return self.ttype == COMMENT
84 def is_delimiter(self) -> bool: # pragma: no cover (we don't return delimiters yet)
85 return self.ttype == DELIMITER
87 def is_eol_or_eof(self) -> bool:
88 return self.ttype == EOL or self.ttype == EOF
90 def __eq__(self, other):
91 if not isinstance(other, Token):
92 return False
93 return self.ttype == other.ttype and self.value == other.value
95 def __ne__(self, other):
96 if not isinstance(other, Token):
97 return True
98 return self.ttype != other.ttype or self.value != other.value
100 def __str__(self):
101 return f'{self.ttype} "{self.value}"'
103 def unescape(self) -> "Token":
104 if not self.has_escape:
105 return self
106 unescaped = ""
107 l = len(self.value)
108 i = 0
109 while i < l:
110 c = self.value[i]
111 i += 1
112 if c == "\\":
113 if i >= l: # pragma: no cover (can't happen via get())
114 raise dns.exception.UnexpectedEnd
115 c = self.value[i]
116 i += 1
117 if c.isdigit():
118 if i >= l:
119 raise dns.exception.UnexpectedEnd
120 c2 = self.value[i]
121 i += 1
122 if i >= l:
123 raise dns.exception.UnexpectedEnd
124 c3 = self.value[i]
125 i += 1
126 if not (c2.isdigit() and c3.isdigit()):
127 raise dns.exception.SyntaxError
128 codepoint = int(c) * 100 + int(c2) * 10 + int(c3)
129 if codepoint > 255:
130 raise dns.exception.SyntaxError
131 c = chr(codepoint)
132 unescaped += c
133 return Token(self.ttype, unescaped)
135 def unescape_to_bytes(self) -> "Token":
136 # We used to use unescape() for TXT-like records, but this
137 # caused problems as we'd process DNS escapes into Unicode code
138 # points instead of byte values, and then a to_text() of the
139 # processed data would not equal the original input. For
140 # example, \226 in the TXT record would have a to_text() of
141 # \195\162 because we applied UTF-8 encoding to Unicode code
142 # point 226.
143 #
144 # We now apply escapes while converting directly to bytes,
145 # avoiding this double encoding.
146 #
147 # This code also handles cases where the unicode input has
148 # non-ASCII code-points in it by converting it to UTF-8. TXT
149 # records aren't defined for Unicode, but this is the best we
150 # can do to preserve meaning. For example,
151 #
152 # foo\u200bbar
153 #
154 # (where \u200b is Unicode code point 0x200b) will be treated
155 # as if the input had been the UTF-8 encoding of that string,
156 # namely:
157 #
158 # foo\226\128\139bar
159 #
160 unescaped = b""
161 l = len(self.value)
162 i = 0
163 while i < l:
164 c = self.value[i]
165 i += 1
166 if c == "\\":
167 if i >= l: # pragma: no cover (can't happen via get())
168 raise dns.exception.UnexpectedEnd
169 c = self.value[i]
170 i += 1
171 if c.isdigit():
172 if i >= l:
173 raise dns.exception.UnexpectedEnd
174 c2 = self.value[i]
175 i += 1
176 if i >= l:
177 raise dns.exception.UnexpectedEnd
178 c3 = self.value[i]
179 i += 1
180 if not (c2.isdigit() and c3.isdigit()):
181 raise dns.exception.SyntaxError
182 codepoint = int(c) * 100 + int(c2) * 10 + int(c3)
183 if codepoint > 255:
184 raise dns.exception.SyntaxError
185 unescaped += b"%c" % (codepoint)
186 else:
187 # Note that as mentioned above, if c is a Unicode
188 # code point outside of the ASCII range, then this
189 # += is converting that code point to its UTF-8
190 # encoding and appending multiple bytes to
191 # unescaped.
192 unescaped += c.encode()
193 else:
194 unescaped += c.encode()
195 return Token(self.ttype, bytes(unescaped))
198class Tokenizer:
199 """A DNS zone file format tokenizer.
201 A token object is basically a (type, value) tuple. The valid
202 types are EOF, EOL, WHITESPACE, IDENTIFIER, QUOTED_STRING,
203 COMMENT, and DELIMITER.
205 file: The file to tokenize
207 ungotten_char: The most recently ungotten character, or None.
209 ungotten_token: The most recently ungotten token, or None.
211 multiline: The current multiline level. This value is increased
212 by one every time a '(' delimiter is read, and decreased by one every time
213 a ')' delimiter is read.
215 quoting: This variable is true if the tokenizer is currently
216 reading a quoted string.
218 eof: This variable is true if the tokenizer has encountered EOF.
220 delimiters: The current delimiter dictionary.
222 line_number: The current line number
224 filename: A filename that will be returned by the where() method.
226 idna_codec: A dns.name.IDNACodec, specifies the IDNA
227 encoder/decoder. If None, the default IDNA 2003
228 encoder/decoder is used.
229 """
231 def __init__(
232 self,
233 f: Any = sys.stdin,
234 filename: str | None = None,
235 idna_codec: dns.name.IDNACodec | None = None,
236 ):
237 """Initialize a tokenizer instance.
239 f: The file to tokenize. The default is sys.stdin.
240 This parameter may also be a string, in which case the tokenizer
241 will take its input from the contents of the string.
243 filename: the name of the filename that the where() method
244 will return.
246 idna_codec: A dns.name.IDNACodec, specifies the IDNA
247 encoder/decoder. If None, the default IDNA 2003
248 encoder/decoder is used.
249 """
251 if isinstance(f, str):
252 f = io.StringIO(f)
253 if filename is None:
254 filename = "<string>"
255 elif isinstance(f, bytes):
256 f = io.StringIO(f.decode())
257 if filename is None:
258 filename = "<string>"
259 else:
260 if filename is None:
261 if f is sys.stdin:
262 filename = "<stdin>"
263 else:
264 filename = "<file>"
265 self.file = f
266 self.ungotten_char: str | None = None
267 self.ungotten_token: Token | None = None
268 self.multiline = 0
269 self.quoting = False
270 self.eof = False
271 self.delimiters = _DELIMITERS
272 self.line_number = 1
273 assert filename is not None
274 self.filename = filename
275 if idna_codec is None:
276 self.idna_codec: dns.name.IDNACodec = dns.name.IDNA_2003
277 else:
278 self.idna_codec = idna_codec
280 def _get_char(self) -> str:
281 """Read a character from input."""
283 if self.ungotten_char is None:
284 if self.eof:
285 c = ""
286 else:
287 c = self.file.read(1)
288 if c == "":
289 self.eof = True
290 elif c == "\n":
291 self.line_number += 1
292 else:
293 c = self.ungotten_char
294 self.ungotten_char = None
295 return c
297 def where(self) -> tuple[str, int]:
298 """Return the current location in the input.
300 Returns a (string, int) tuple. The first item is the filename of
301 the input, the second is the current line number.
302 """
304 return (self.filename, self.line_number)
306 def _unget_char(self, c: str) -> None:
307 """Unget a character.
309 The unget buffer for characters is only one character large; it is
310 an error to try to unget a character when the unget buffer is not
311 empty.
313 c: the character to unget
314 raises UngetBufferFull: there is already an ungotten char
315 """
317 if self.ungotten_char is not None:
318 # this should never happen!
319 raise UngetBufferFull # pragma: no cover
320 self.ungotten_char = c
322 def skip_whitespace(self) -> int:
323 """Consume input until a non-whitespace character is encountered.
325 The non-whitespace character is then ungotten, and the number of
326 whitespace characters consumed is returned.
328 If the tokenizer is in multiline mode, then newlines are whitespace.
330 Returns the number of characters skipped.
331 """
333 skipped = 0
334 while True:
335 c = self._get_char()
336 if c != " " and c != "\t":
337 if (c != "\n") or not self.multiline:
338 self._unget_char(c)
339 return skipped
340 skipped += 1
342 def get(self, want_leading: bool = False, want_comment: bool = False) -> Token:
343 """Get the next token.
345 want_leading: If True, return a WHITESPACE token if the
346 first character read is whitespace. The default is False.
348 want_comment: If True, return a COMMENT token if the
349 first token read is a comment. The default is False.
351 Raises dns.exception.UnexpectedEnd: input ended prematurely
353 Raises dns.exception.SyntaxError: input was badly formed
355 Returns a Token.
356 """
358 if self.ungotten_token is not None:
359 utoken = self.ungotten_token
360 self.ungotten_token = None
361 if utoken.is_whitespace():
362 if want_leading:
363 return utoken
364 elif utoken.is_comment():
365 if want_comment:
366 return utoken
367 else:
368 return utoken
369 skipped = self.skip_whitespace()
370 if want_leading and skipped > 0:
371 return Token(WHITESPACE, " ")
372 token = ""
373 ttype = IDENTIFIER
374 has_escape = False
375 while True:
376 c = self._get_char()
377 if c == "" or c in self.delimiters:
378 if c == "" and self.quoting:
379 raise dns.exception.UnexpectedEnd
380 if token == "" and ttype != QUOTED_STRING:
381 if c == "(":
382 self.multiline += 1
383 self.skip_whitespace()
384 continue
385 elif c == ")":
386 if self.multiline <= 0:
387 raise dns.exception.SyntaxError
388 self.multiline -= 1
389 self.skip_whitespace()
390 continue
391 elif c == '"':
392 if not self.quoting:
393 self.quoting = True
394 self.delimiters = _QUOTING_DELIMITERS
395 ttype = QUOTED_STRING
396 continue
397 else:
398 self.quoting = False
399 self.delimiters = _DELIMITERS
400 self.skip_whitespace()
401 continue
402 elif c == "\n":
403 return Token(EOL, "\n")
404 elif c == ";":
405 while 1:
406 c = self._get_char()
407 if c == "\n" or c == "":
408 break
409 token += c
410 if want_comment:
411 self._unget_char(c)
412 return Token(COMMENT, token)
413 elif c == "":
414 if self.multiline:
415 raise dns.exception.SyntaxError(
416 "unbalanced parentheses"
417 )
418 return Token(EOF, comment=token)
419 elif self.multiline:
420 self.skip_whitespace()
421 token = ""
422 continue
423 else:
424 return Token(EOL, "\n", comment=token)
425 else:
426 # This code exists in case we ever want a
427 # delimiter to be returned. It never produces
428 # a token currently.
429 token = c
430 ttype = DELIMITER
431 else:
432 self._unget_char(c)
433 break
434 elif self.quoting and c == "\n":
435 raise dns.exception.SyntaxError("newline in quoted string")
436 elif c == "\\":
437 #
438 # It's an escape. Put it and the next character into
439 # the token; it will be checked later for goodness.
440 #
441 token += c
442 has_escape = True
443 c = self._get_char()
444 if c == "" or (c == "\n" and not self.quoting):
445 raise dns.exception.UnexpectedEnd
446 token += c
447 if token == "" and ttype != QUOTED_STRING:
448 if self.multiline:
449 raise dns.exception.SyntaxError("unbalanced parentheses")
450 ttype = EOF
451 return Token(ttype, token, has_escape)
453 def unget(self, token: Token) -> None:
454 """Unget a token.
456 The unget buffer for tokens is only one token large; it is
457 an error to try to unget a token when the unget buffer is not
458 empty.
460 token: the token to unget
462 Raises UngetBufferFull: there is already an ungotten token
463 """
465 if self.ungotten_token is not None:
466 raise UngetBufferFull
467 self.ungotten_token = token
469 def next(self):
470 """Return the next item in an iteration.
472 Returns a Token.
473 """
475 token = self.get()
476 if token.is_eof():
477 raise StopIteration
478 return token
480 __next__ = next
482 def __iter__(self):
483 return self
485 # Helpers
487 def get_int(self, base: int = 10) -> int:
488 """Read the next token and interpret it as an unsigned integer.
490 Raises dns.exception.SyntaxError if not an unsigned integer.
492 Returns an int.
493 """
494 return self.as_int(self.get().unescape(), base)
496 def get_uint8(self) -> int:
497 """Read the next token and interpret it as an 8-bit unsigned
498 integer.
500 Raises dns.exception.SyntaxError if not an 8-bit unsigned integer.
502 Returns an int.
503 """
505 return self.as_uint8(self.get().unescape())
507 def get_uint16(self, base: int = 10) -> int:
508 """Read the next token and interpret it as a 16-bit unsigned
509 integer.
511 Raises dns.exception.SyntaxError if not a 16-bit unsigned integer.
513 Returns an int.
514 """
516 return self.as_uint16(self.get().unescape(), base)
518 def get_uint32(self, base: int = 10) -> int:
519 """Read the next token and interpret it as a 32-bit unsigned
520 integer.
522 Raises dns.exception.SyntaxError if not a 32-bit unsigned integer.
524 Returns an int.
525 """
527 return self.as_uint32(self.get().unescape(), base)
529 def get_uint48(self, base: int = 10) -> int:
530 """Read the next token and interpret it as a 48-bit unsigned
531 integer.
533 Raises dns.exception.SyntaxError if not a 48-bit unsigned integer.
535 Returns an int.
536 """
538 return self.as_uint48(self.get().unescape(), base)
540 def get_string(self, max_length: int | None = None) -> str:
541 """Read the next token and interpret it as a string.
543 Raises dns.exception.SyntaxError if not a string.
544 Raises dns.exception.SyntaxError if token value length
545 exceeds max_length (if specified).
547 Returns a string.
548 """
550 return self.as_string(self.get().unescape(), max_length)
552 def get_identifier(self) -> str:
553 """Read the next token, which should be an identifier.
555 Raises dns.exception.SyntaxError if not an identifier.
557 Returns a string.
558 """
560 return self.as_identifier(self.get().unescape())
562 def get_remaining(self, max_tokens: int | None = None) -> list[Token]:
563 """Return the remaining tokens on the line, until an EOL or EOF is seen.
565 max_tokens: If not None, stop after this number of tokens.
567 Returns a list of tokens.
568 """
570 tokens = []
571 while True:
572 token = self.get()
573 if token.is_eol_or_eof():
574 self.unget(token)
575 break
576 tokens.append(token)
577 if len(tokens) == max_tokens:
578 break
579 return tokens
581 def concatenate_remaining_identifiers(self, allow_empty: bool = False) -> str:
582 """Read the remaining tokens on the line, which should be identifiers.
584 Raises dns.exception.SyntaxError if there are no remaining tokens,
585 unless `allow_empty=True` is given.
587 Raises dns.exception.SyntaxError if a token is seen that is not an
588 identifier.
590 Returns a string containing a concatenation of the remaining
591 identifiers.
592 """
593 s = ""
594 while True:
595 token = self.get().unescape()
596 if token.is_eol_or_eof():
597 self.unget(token)
598 break
599 if not token.is_identifier():
600 raise dns.exception.SyntaxError
601 s += token.value
602 if not (allow_empty or s):
603 raise dns.exception.SyntaxError("expecting another identifier")
604 return s
606 def as_name(
607 self,
608 token: Token,
609 origin: dns.name.Name | None = None,
610 relativize: bool = False,
611 relativize_to: dns.name.Name | None = None,
612 ) -> dns.name.Name:
613 """Try to interpret the token as a DNS name.
615 Raises dns.exception.SyntaxError if not a name.
617 Returns a dns.name.Name.
618 """
619 if not token.is_identifier():
620 raise dns.exception.SyntaxError("expecting an identifier")
621 name = dns.name.from_text(token.value, origin, self.idna_codec)
622 return name.choose_relativity(relativize_to or origin, relativize)
624 def as_int(self, token: Token, base: int = 10) -> int:
625 """Try to interpret the token as an unsigned integer.
627 Raises dns.exception.SyntaxError if not an unsigned integer.
629 Returns an int.
630 """
632 if not token.is_identifier():
633 raise dns.exception.SyntaxError("expecting an identifier")
634 if not token.value.isdigit():
635 raise dns.exception.SyntaxError("expecting an integer")
636 return int(token.value, base)
638 def as_uint8(self, token: Token) -> int:
639 """Try to interpret the token as an unsigned 8-bit integer.
641 Raises dns.exception.SyntaxError if not 8-bit unsigned integer.
643 Returns an int.
644 """
646 value = self.as_int(token=token)
647 if value < 0 or value > 255:
648 raise dns.exception.SyntaxError(f"{value} is not an unsigned 8-bit integer")
649 return value
651 def as_uint16(self, token: Token, base: int = 10) -> int:
652 """Try to interpret the token as an unsigned 16-bit integer.
654 Raises dns.exception.SyntaxError if not a 16-bit unsigned integer.
656 Returns an int.
657 """
659 value = self.as_int(token=token, base=base)
660 if value < 0 or value > 65535:
661 if base == 8:
662 raise dns.exception.SyntaxError(
663 f"{value:o} is not an octal unsigned 16-bit integer"
664 )
665 else:
666 raise dns.exception.SyntaxError(
667 f"{value} is not an unsigned 16-bit integer"
668 )
669 return value
671 def as_uint32(self, token: Token, base: int = 10) -> int:
672 """Try to interpret the token as an unsigned 32-bit integer.
674 Raises dns.exception.SyntaxError if not a 32-bit unsigned integer.
676 Returns an int.
677 """
679 value = self.as_int(token=token, base=base)
680 if value < 0 or value > 4294967295:
681 raise dns.exception.SyntaxError(
682 f"{value} is not an unsigned 32-bit integer"
683 )
684 return value
686 def as_uint48(self, token: Token, base: int = 10) -> int:
687 """Try to interpret the token as an unsigned 48-bit integer.
689 Raises dns.exception.SyntaxError if not a 48-bit unsigned integer.
691 Returns an int.
692 """
694 value = self.as_int(token=token, base=base)
695 if value < 0 or value > 281474976710655:
696 raise dns.exception.SyntaxError(
697 f"{value} is not an unsigned 48-bit integer"
698 )
699 return value
701 def as_string(self, token: Token, max_length: int | None = None) -> str:
702 """Try to interpret the token as a string.
704 Raises dns.exception.SyntaxError if not a string.
705 Raises dns.exception.SyntaxError if token value length
706 exceeds max_length (if specified).
708 Returns a string.
709 """
711 if not (token.is_identifier() or token.is_quoted_string()):
712 raise dns.exception.SyntaxError("expecting a string")
713 if max_length and len(token.value) > max_length:
714 raise dns.exception.SyntaxError("string too long")
715 return token.value
717 def as_identifier(self, token: Token) -> str:
718 """Try to interpret the token as an identifier.
720 Raises dns.exception.SyntaxError if not an identifier.
722 Returns a string.
723 """
725 if not token.is_identifier():
726 raise dns.exception.SyntaxError("expecting an identifier")
727 return token.value
729 def get_name(
730 self,
731 origin: dns.name.Name | None = None,
732 relativize: bool = False,
733 relativize_to: dns.name.Name | None = None,
734 ) -> dns.name.Name:
735 """Read the next token and interpret it as a DNS name.
737 Raises dns.exception.SyntaxError if not a name.
739 Returns a dns.name.Name.
740 """
742 token = self.get()
743 return self.as_name(token, origin, relativize, relativize_to)
745 def get_eol_as_token(self) -> Token:
746 """Read the next token and raise an exception if it isn't EOL or
747 EOF.
749 Returns a string.
750 """
752 token = self.get()
753 if not token.is_eol_or_eof():
754 raise dns.exception.SyntaxError(
755 f'expected EOL or EOF, got {token.ttype} "{token.value}"'
756 )
757 return token
759 def get_eol(self) -> str:
760 return self.get_eol_as_token().value
762 def get_ttl(self) -> int:
763 """Read the next token and interpret it as a DNS TTL.
765 Raises dns.exception.SyntaxError or dns.ttl.BadTTL if not an
766 identifier or badly formed.
768 Returns an int.
769 """
771 token = self.get().unescape()
772 if not token.is_identifier():
773 raise dns.exception.SyntaxError("expecting an identifier")
774 return dns.ttl.from_text(token.value)