Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dns/tokenizer.py: 51%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
3# Copyright (C) 2003-2017 Nominum, Inc.
4#
5# Permission to use, copy, modify, and distribute this software and its
6# documentation for any purpose with or without fee is hereby granted,
7# provided that the above copyright notice and this permission notice
8# appear in all copies.
9#
10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18"""Tokenize DNS zone file format"""
20import io
21import sys
22from typing import Any, List, Optional, Tuple
24import dns.exception
25import dns.name
26import dns.ttl
28_DELIMITERS = {" ", "\t", "\n", ";", "(", ")", '"'}
29_QUOTING_DELIMITERS = {'"'}
31EOF = 0
32EOL = 1
33WHITESPACE = 2
34IDENTIFIER = 3
35QUOTED_STRING = 4
36COMMENT = 5
37DELIMITER = 6
40class UngetBufferFull(dns.exception.DNSException):
41 """An attempt was made to unget a token when the unget buffer was full."""
44class Token:
45 """A DNS zone file format token.
47 ttype: The token type
48 value: The token value
49 has_escape: Does the token value contain escapes?
50 """
52 def __init__(
53 self,
54 ttype: int,
55 value: Any = "",
56 has_escape: bool = False,
57 comment: Optional[str] = None,
58 ):
59 """Initialize a token instance."""
61 self.ttype = ttype
62 self.value = value
63 self.has_escape = has_escape
64 self.comment = comment
66 def is_eof(self) -> bool:
67 return self.ttype == EOF
69 def is_eol(self) -> bool:
70 return self.ttype == EOL
72 def is_whitespace(self) -> bool:
73 return self.ttype == WHITESPACE
75 def is_identifier(self) -> bool:
76 return self.ttype == IDENTIFIER
78 def is_quoted_string(self) -> bool:
79 return self.ttype == QUOTED_STRING
81 def is_comment(self) -> bool:
82 return self.ttype == COMMENT
84 def is_delimiter(self) -> bool: # pragma: no cover (we don't return delimiters yet)
85 return self.ttype == DELIMITER
87 def is_eol_or_eof(self) -> bool:
88 return self.ttype == EOL or self.ttype == EOF
90 def __eq__(self, other):
91 if not isinstance(other, Token):
92 return False
93 return self.ttype == other.ttype and self.value == other.value
95 def __ne__(self, other):
96 if not isinstance(other, Token):
97 return True
98 return self.ttype != other.ttype or self.value != other.value
100 def __str__(self):
101 return f'{self.ttype} "{self.value}"'
103 def unescape(self) -> "Token":
104 if not self.has_escape:
105 return self
106 unescaped = ""
107 l = len(self.value)
108 i = 0
109 while i < l:
110 c = self.value[i]
111 i += 1
112 if c == "\\":
113 if i >= l: # pragma: no cover (can't happen via get())
114 raise dns.exception.UnexpectedEnd
115 c = self.value[i]
116 i += 1
117 if c.isdigit():
118 if i >= l:
119 raise dns.exception.UnexpectedEnd
120 c2 = self.value[i]
121 i += 1
122 if i >= l:
123 raise dns.exception.UnexpectedEnd
124 c3 = self.value[i]
125 i += 1
126 if not (c2.isdigit() and c3.isdigit()):
127 raise dns.exception.SyntaxError
128 codepoint = int(c) * 100 + int(c2) * 10 + int(c3)
129 if codepoint > 255:
130 raise dns.exception.SyntaxError
131 c = chr(codepoint)
132 unescaped += c
133 return Token(self.ttype, unescaped)
135 def unescape_to_bytes(self) -> "Token":
136 # We used to use unescape() for TXT-like records, but this
137 # caused problems as we'd process DNS escapes into Unicode code
138 # points instead of byte values, and then a to_text() of the
139 # processed data would not equal the original input. For
140 # example, \226 in the TXT record would have a to_text() of
141 # \195\162 because we applied UTF-8 encoding to Unicode code
142 # point 226.
143 #
144 # We now apply escapes while converting directly to bytes,
145 # avoiding this double encoding.
146 #
147 # This code also handles cases where the unicode input has
148 # non-ASCII code-points in it by converting it to UTF-8. TXT
149 # records aren't defined for Unicode, but this is the best we
150 # can do to preserve meaning. For example,
151 #
152 # foo\u200bbar
153 #
154 # (where \u200b is Unicode code point 0x200b) will be treated
155 # as if the input had been the UTF-8 encoding of that string,
156 # namely:
157 #
158 # foo\226\128\139bar
159 #
160 unescaped = b""
161 l = len(self.value)
162 i = 0
163 while i < l:
164 c = self.value[i]
165 i += 1
166 if c == "\\":
167 if i >= l: # pragma: no cover (can't happen via get())
168 raise dns.exception.UnexpectedEnd
169 c = self.value[i]
170 i += 1
171 if c.isdigit():
172 if i >= l:
173 raise dns.exception.UnexpectedEnd
174 c2 = self.value[i]
175 i += 1
176 if i >= l:
177 raise dns.exception.UnexpectedEnd
178 c3 = self.value[i]
179 i += 1
180 if not (c2.isdigit() and c3.isdigit()):
181 raise dns.exception.SyntaxError
182 codepoint = int(c) * 100 + int(c2) * 10 + int(c3)
183 if codepoint > 255:
184 raise dns.exception.SyntaxError
185 unescaped += b"%c" % (codepoint)
186 else:
187 # Note that as mentioned above, if c is a Unicode
188 # code point outside of the ASCII range, then this
189 # += is converting that code point to its UTF-8
190 # encoding and appending multiple bytes to
191 # unescaped.
192 unescaped += c.encode()
193 else:
194 unescaped += c.encode()
195 return Token(self.ttype, bytes(unescaped))
198class Tokenizer:
199 """A DNS zone file format tokenizer.
201 A token object is basically a (type, value) tuple. The valid
202 types are EOF, EOL, WHITESPACE, IDENTIFIER, QUOTED_STRING,
203 COMMENT, and DELIMITER.
205 file: The file to tokenize
207 ungotten_char: The most recently ungotten character, or None.
209 ungotten_token: The most recently ungotten token, or None.
211 multiline: The current multiline level. This value is increased
212 by one every time a '(' delimiter is read, and decreased by one every time
213 a ')' delimiter is read.
215 quoting: This variable is true if the tokenizer is currently
216 reading a quoted string.
218 eof: This variable is true if the tokenizer has encountered EOF.
220 delimiters: The current delimiter dictionary.
222 line_number: The current line number
224 filename: A filename that will be returned by the where() method.
226 idna_codec: A dns.name.IDNACodec, specifies the IDNA
227 encoder/decoder. If None, the default IDNA 2003
228 encoder/decoder is used.
229 """
231 def __init__(
232 self,
233 f: Any = sys.stdin,
234 filename: Optional[str] = None,
235 idna_codec: Optional[dns.name.IDNACodec] = None,
236 ):
237 """Initialize a tokenizer instance.
239 f: The file to tokenize. The default is sys.stdin.
240 This parameter may also be a string, in which case the tokenizer
241 will take its input from the contents of the string.
243 filename: the name of the filename that the where() method
244 will return.
246 idna_codec: A dns.name.IDNACodec, specifies the IDNA
247 encoder/decoder. If None, the default IDNA 2003
248 encoder/decoder is used.
249 """
251 if isinstance(f, str):
252 f = io.StringIO(f)
253 if filename is None:
254 filename = "<string>"
255 elif isinstance(f, bytes):
256 f = io.StringIO(f.decode())
257 if filename is None:
258 filename = "<string>"
259 else:
260 if filename is None:
261 if f is sys.stdin:
262 filename = "<stdin>"
263 else:
264 filename = "<file>"
265 self.file = f
266 self.ungotten_char: Optional[str] = None
267 self.ungotten_token: Optional[Token] = None
268 self.multiline = 0
269 self.quoting = False
270 self.eof = False
271 self.delimiters = _DELIMITERS
272 self.line_number = 1
273 assert filename is not None
274 self.filename = filename
275 if idna_codec is None:
276 self.idna_codec: dns.name.IDNACodec = dns.name.IDNA_2003
277 else:
278 self.idna_codec = idna_codec
280 def _get_char(self) -> str:
281 """Read a character from input."""
283 if self.ungotten_char is None:
284 if self.eof:
285 c = ""
286 else:
287 c = self.file.read(1)
288 if c == "":
289 self.eof = True
290 elif c == "\n":
291 self.line_number += 1
292 else:
293 c = self.ungotten_char
294 self.ungotten_char = None
295 return c
297 def where(self) -> Tuple[str, int]:
298 """Return the current location in the input.
300 Returns a (string, int) tuple. The first item is the filename of
301 the input, the second is the current line number.
302 """
304 return (self.filename, self.line_number)
306 def _unget_char(self, c: str) -> None:
307 """Unget a character.
309 The unget buffer for characters is only one character large; it is
310 an error to try to unget a character when the unget buffer is not
311 empty.
313 c: the character to unget
314 raises UngetBufferFull: there is already an ungotten char
315 """
317 if self.ungotten_char is not None:
318 # this should never happen!
319 raise UngetBufferFull # pragma: no cover
320 self.ungotten_char = c
322 def skip_whitespace(self) -> int:
323 """Consume input until a non-whitespace character is encountered.
325 The non-whitespace character is then ungotten, and the number of
326 whitespace characters consumed is returned.
328 If the tokenizer is in multiline mode, then newlines are whitespace.
330 Returns the number of characters skipped.
331 """
333 skipped = 0
334 while True:
335 c = self._get_char()
336 if c != " " and c != "\t":
337 if (c != "\n") or not self.multiline:
338 self._unget_char(c)
339 return skipped
340 skipped += 1
342 def get(self, want_leading: bool = False, want_comment: bool = False) -> Token:
343 """Get the next token.
345 want_leading: If True, return a WHITESPACE token if the
346 first character read is whitespace. The default is False.
348 want_comment: If True, return a COMMENT token if the
349 first token read is a comment. The default is False.
351 Raises dns.exception.UnexpectedEnd: input ended prematurely
353 Raises dns.exception.SyntaxError: input was badly formed
355 Returns a Token.
356 """
358 if self.ungotten_token is not None:
359 utoken = self.ungotten_token
360 self.ungotten_token = None
361 if utoken.is_whitespace():
362 if want_leading:
363 return utoken
364 elif utoken.is_comment():
365 if want_comment:
366 return utoken
367 else:
368 return utoken
369 skipped = self.skip_whitespace()
370 if want_leading and skipped > 0:
371 return Token(WHITESPACE, " ")
372 token = ""
373 ttype = IDENTIFIER
374 has_escape = False
375 while True:
376 c = self._get_char()
377 if c == "" or c in self.delimiters:
378 if c == "" and self.quoting:
379 raise dns.exception.UnexpectedEnd
380 if token == "" and ttype != QUOTED_STRING:
381 if c == "(":
382 self.multiline += 1
383 self.skip_whitespace()
384 continue
385 elif c == ")":
386 if self.multiline <= 0:
387 raise dns.exception.SyntaxError
388 self.multiline -= 1
389 self.skip_whitespace()
390 continue
391 elif c == '"':
392 if not self.quoting:
393 self.quoting = True
394 self.delimiters = _QUOTING_DELIMITERS
395 ttype = QUOTED_STRING
396 continue
397 else:
398 self.quoting = False
399 self.delimiters = _DELIMITERS
400 self.skip_whitespace()
401 continue
402 elif c == "\n":
403 return Token(EOL, "\n")
404 elif c == ";":
405 while 1:
406 c = self._get_char()
407 if c == "\n" or c == "":
408 break
409 token += c
410 if want_comment:
411 self._unget_char(c)
412 return Token(COMMENT, token)
413 elif c == "":
414 if self.multiline:
415 raise dns.exception.SyntaxError(
416 "unbalanced parentheses"
417 )
418 return Token(EOF, comment=token)
419 elif self.multiline:
420 self.skip_whitespace()
421 token = ""
422 continue
423 else:
424 return Token(EOL, "\n", comment=token)
425 else:
426 # This code exists in case we ever want a
427 # delimiter to be returned. It never produces
428 # a token currently.
429 token = c
430 ttype = DELIMITER
431 else:
432 self._unget_char(c)
433 break
434 elif self.quoting and c == "\n":
435 raise dns.exception.SyntaxError("newline in quoted string")
436 elif c == "\\":
437 #
438 # It's an escape. Put it and the next character into
439 # the token; it will be checked later for goodness.
440 #
441 token += c
442 has_escape = True
443 c = self._get_char()
444 if c == "" or (c == "\n" and not self.quoting):
445 raise dns.exception.UnexpectedEnd
446 token += c
447 if token == "" and ttype != QUOTED_STRING:
448 if self.multiline:
449 raise dns.exception.SyntaxError("unbalanced parentheses")
450 ttype = EOF
451 return Token(ttype, token, has_escape)
453 def unget(self, token: Token) -> None:
454 """Unget a token.
456 The unget buffer for tokens is only one token large; it is
457 an error to try to unget a token when the unget buffer is not
458 empty.
460 token: the token to unget
462 Raises UngetBufferFull: there is already an ungotten token
463 """
465 if self.ungotten_token is not None:
466 raise UngetBufferFull
467 self.ungotten_token = token
469 def next(self):
470 """Return the next item in an iteration.
472 Returns a Token.
473 """
475 token = self.get()
476 if token.is_eof():
477 raise StopIteration
478 return token
480 __next__ = next
482 def __iter__(self):
483 return self
485 # Helpers
487 def get_int(self, base: int = 10) -> int:
488 """Read the next token and interpret it as an unsigned integer.
490 Raises dns.exception.SyntaxError if not an unsigned integer.
492 Returns an int.
493 """
495 token = self.get().unescape()
496 if not token.is_identifier():
497 raise dns.exception.SyntaxError("expecting an identifier")
498 if not token.value.isdigit():
499 raise dns.exception.SyntaxError("expecting an integer")
500 return int(token.value, base)
502 def get_uint8(self) -> int:
503 """Read the next token and interpret it as an 8-bit unsigned
504 integer.
506 Raises dns.exception.SyntaxError if not an 8-bit unsigned integer.
508 Returns an int.
509 """
511 value = self.get_int()
512 if value < 0 or value > 255:
513 raise dns.exception.SyntaxError(f"{value} is not an unsigned 8-bit integer")
514 return value
516 def get_uint16(self, base: int = 10) -> int:
517 """Read the next token and interpret it as a 16-bit unsigned
518 integer.
520 Raises dns.exception.SyntaxError if not a 16-bit unsigned integer.
522 Returns an int.
523 """
525 value = self.get_int(base=base)
526 if value < 0 or value > 65535:
527 if base == 8:
528 raise dns.exception.SyntaxError(
529 f"{value:o} is not an octal unsigned 16-bit integer"
530 )
531 else:
532 raise dns.exception.SyntaxError(
533 f"{value} is not an unsigned 16-bit integer"
534 )
535 return value
537 def get_uint32(self, base: int = 10) -> int:
538 """Read the next token and interpret it as a 32-bit unsigned
539 integer.
541 Raises dns.exception.SyntaxError if not a 32-bit unsigned integer.
543 Returns an int.
544 """
546 value = self.get_int(base=base)
547 if value < 0 or value > 4294967295:
548 raise dns.exception.SyntaxError(
549 f"{value} is not an unsigned 32-bit integer"
550 )
551 return value
553 def get_uint48(self, base: int = 10) -> int:
554 """Read the next token and interpret it as a 48-bit unsigned
555 integer.
557 Raises dns.exception.SyntaxError if not a 48-bit unsigned integer.
559 Returns an int.
560 """
562 value = self.get_int(base=base)
563 if value < 0 or value > 281474976710655:
564 raise dns.exception.SyntaxError(
565 f"{value} is not an unsigned 48-bit integer"
566 )
567 return value
569 def get_string(self, max_length: Optional[int] = None) -> str:
570 """Read the next token and interpret it as a string.
572 Raises dns.exception.SyntaxError if not a string.
573 Raises dns.exception.SyntaxError if token value length
574 exceeds max_length (if specified).
576 Returns a string.
577 """
579 token = self.get().unescape()
580 if not (token.is_identifier() or token.is_quoted_string()):
581 raise dns.exception.SyntaxError("expecting a string")
582 if max_length and len(token.value) > max_length:
583 raise dns.exception.SyntaxError("string too long")
584 return token.value
586 def get_identifier(self) -> str:
587 """Read the next token, which should be an identifier.
589 Raises dns.exception.SyntaxError if not an identifier.
591 Returns a string.
592 """
594 token = self.get().unescape()
595 if not token.is_identifier():
596 raise dns.exception.SyntaxError("expecting an identifier")
597 return token.value
599 def get_remaining(self, max_tokens: Optional[int] = None) -> List[Token]:
600 """Return the remaining tokens on the line, until an EOL or EOF is seen.
602 max_tokens: If not None, stop after this number of tokens.
604 Returns a list of tokens.
605 """
607 tokens = []
608 while True:
609 token = self.get()
610 if token.is_eol_or_eof():
611 self.unget(token)
612 break
613 tokens.append(token)
614 if len(tokens) == max_tokens:
615 break
616 return tokens
618 def concatenate_remaining_identifiers(self, allow_empty: bool = False) -> str:
619 """Read the remaining tokens on the line, which should be identifiers.
621 Raises dns.exception.SyntaxError if there are no remaining tokens,
622 unless `allow_empty=True` is given.
624 Raises dns.exception.SyntaxError if a token is seen that is not an
625 identifier.
627 Returns a string containing a concatenation of the remaining
628 identifiers.
629 """
630 s = ""
631 while True:
632 token = self.get().unescape()
633 if token.is_eol_or_eof():
634 self.unget(token)
635 break
636 if not token.is_identifier():
637 raise dns.exception.SyntaxError
638 s += token.value
639 if not (allow_empty or s):
640 raise dns.exception.SyntaxError("expecting another identifier")
641 return s
643 def as_name(
644 self,
645 token: Token,
646 origin: Optional[dns.name.Name] = None,
647 relativize: bool = False,
648 relativize_to: Optional[dns.name.Name] = None,
649 ) -> dns.name.Name:
650 """Try to interpret the token as a DNS name.
652 Raises dns.exception.SyntaxError if not a name.
654 Returns a dns.name.Name.
655 """
656 if not token.is_identifier():
657 raise dns.exception.SyntaxError("expecting an identifier")
658 name = dns.name.from_text(token.value, origin, self.idna_codec)
659 return name.choose_relativity(relativize_to or origin, relativize)
661 def get_name(
662 self,
663 origin: Optional[dns.name.Name] = None,
664 relativize: bool = False,
665 relativize_to: Optional[dns.name.Name] = None,
666 ) -> dns.name.Name:
667 """Read the next token and interpret it as a DNS name.
669 Raises dns.exception.SyntaxError if not a name.
671 Returns a dns.name.Name.
672 """
674 token = self.get()
675 return self.as_name(token, origin, relativize, relativize_to)
677 def get_eol_as_token(self) -> Token:
678 """Read the next token and raise an exception if it isn't EOL or
679 EOF.
681 Returns a string.
682 """
684 token = self.get()
685 if not token.is_eol_or_eof():
686 raise dns.exception.SyntaxError(
687 f'expected EOL or EOF, got {token.ttype} "{token.value}"'
688 )
689 return token
691 def get_eol(self) -> str:
692 return self.get_eol_as_token().value
694 def get_ttl(self) -> int:
695 """Read the next token and interpret it as a DNS TTL.
697 Raises dns.exception.SyntaxError or dns.ttl.BadTTL if not an
698 identifier or badly formed.
700 Returns an int.
701 """
703 token = self.get().unescape()
704 if not token.is_identifier():
705 raise dns.exception.SyntaxError("expecting an identifier")
706 return dns.ttl.from_text(token.value)