Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/tokenizer.py: 74%
330 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
3# Copyright (C) 2003-2017 Nominum, Inc.
4#
5# Permission to use, copy, modify, and distribute this software and its
6# documentation for any purpose with or without fee is hereby granted,
7# provided that the above copyright notice and this permission notice
8# appear in all copies.
9#
10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18"""Tokenize DNS zone file format"""
20import io
21import sys
22from typing import Any, List, Optional, Tuple
24import dns.exception
25import dns.name
26import dns.ttl
28_DELIMITERS = {" ", "\t", "\n", ";", "(", ")", '"'}
29_QUOTING_DELIMITERS = {'"'}
31EOF = 0
32EOL = 1
33WHITESPACE = 2
34IDENTIFIER = 3
35QUOTED_STRING = 4
36COMMENT = 5
37DELIMITER = 6
40class UngetBufferFull(dns.exception.DNSException):
41 """An attempt was made to unget a token when the unget buffer was full."""
44class Token:
45 """A DNS zone file format token.
47 ttype: The token type
48 value: The token value
49 has_escape: Does the token value contain escapes?
50 """
52 def __init__(
53 self,
54 ttype: int,
55 value: Any = "",
56 has_escape: bool = False,
57 comment: Optional[str] = None,
58 ):
59 """Initialize a token instance."""
61 self.ttype = ttype
62 self.value = value
63 self.has_escape = has_escape
64 self.comment = comment
66 def is_eof(self) -> bool:
67 return self.ttype == EOF
69 def is_eol(self) -> bool:
70 return self.ttype == EOL
72 def is_whitespace(self) -> bool:
73 return self.ttype == WHITESPACE
75 def is_identifier(self) -> bool:
76 return self.ttype == IDENTIFIER
78 def is_quoted_string(self) -> bool:
79 return self.ttype == QUOTED_STRING
81 def is_comment(self) -> bool:
82 return self.ttype == COMMENT
84 def is_delimiter(self) -> bool: # pragma: no cover (we don't return delimiters yet)
85 return self.ttype == DELIMITER
87 def is_eol_or_eof(self) -> bool:
88 return self.ttype == EOL or self.ttype == EOF
90 def __eq__(self, other):
91 if not isinstance(other, Token):
92 return False
93 return self.ttype == other.ttype and self.value == other.value
95 def __ne__(self, other):
96 if not isinstance(other, Token):
97 return True
98 return self.ttype != other.ttype or self.value != other.value
100 def __str__(self):
101 return '%d "%s"' % (self.ttype, self.value)
103 def unescape(self) -> "Token":
104 if not self.has_escape:
105 return self
106 unescaped = ""
107 l = len(self.value)
108 i = 0
109 while i < l:
110 c = self.value[i]
111 i += 1
112 if c == "\\":
113 if i >= l: # pragma: no cover (can't happen via get())
114 raise dns.exception.UnexpectedEnd
115 c = self.value[i]
116 i += 1
117 if c.isdigit():
118 if i >= l:
119 raise dns.exception.UnexpectedEnd
120 c2 = self.value[i]
121 i += 1
122 if i >= l:
123 raise dns.exception.UnexpectedEnd
124 c3 = self.value[i]
125 i += 1
126 if not (c2.isdigit() and c3.isdigit()):
127 raise dns.exception.SyntaxError
128 codepoint = int(c) * 100 + int(c2) * 10 + int(c3)
129 if codepoint > 255:
130 raise dns.exception.SyntaxError
131 c = chr(codepoint)
132 unescaped += c
133 return Token(self.ttype, unescaped)
135 def unescape_to_bytes(self) -> "Token":
136 # We used to use unescape() for TXT-like records, but this
137 # caused problems as we'd process DNS escapes into Unicode code
138 # points instead of byte values, and then a to_text() of the
139 # processed data would not equal the original input. For
140 # example, \226 in the TXT record would have a to_text() of
141 # \195\162 because we applied UTF-8 encoding to Unicode code
142 # point 226.
143 #
144 # We now apply escapes while converting directly to bytes,
145 # avoiding this double encoding.
146 #
147 # This code also handles cases where the unicode input has
148 # non-ASCII code-points in it by converting it to UTF-8. TXT
149 # records aren't defined for Unicode, but this is the best we
150 # can do to preserve meaning. For example,
151 #
152 # foo\u200bbar
153 #
154 # (where \u200b is Unicode code point 0x200b) will be treated
155 # as if the input had been the UTF-8 encoding of that string,
156 # namely:
157 #
158 # foo\226\128\139bar
159 #
160 unescaped = b""
161 l = len(self.value)
162 i = 0
163 while i < l:
164 c = self.value[i]
165 i += 1
166 if c == "\\":
167 if i >= l: # pragma: no cover (can't happen via get())
168 raise dns.exception.UnexpectedEnd
169 c = self.value[i]
170 i += 1
171 if c.isdigit():
172 if i >= l:
173 raise dns.exception.UnexpectedEnd
174 c2 = self.value[i]
175 i += 1
176 if i >= l:
177 raise dns.exception.UnexpectedEnd
178 c3 = self.value[i]
179 i += 1
180 if not (c2.isdigit() and c3.isdigit()):
181 raise dns.exception.SyntaxError
182 codepoint = int(c) * 100 + int(c2) * 10 + int(c3)
183 if codepoint > 255:
184 raise dns.exception.SyntaxError
185 unescaped += b"%c" % (codepoint)
186 else:
187 # Note that as mentioned above, if c is a Unicode
188 # code point outside of the ASCII range, then this
189 # += is converting that code point to its UTF-8
190 # encoding and appending multiple bytes to
191 # unescaped.
192 unescaped += c.encode()
193 else:
194 unescaped += c.encode()
195 return Token(self.ttype, bytes(unescaped))
198class Tokenizer:
199 """A DNS zone file format tokenizer.
201 A token object is basically a (type, value) tuple. The valid
202 types are EOF, EOL, WHITESPACE, IDENTIFIER, QUOTED_STRING,
203 COMMENT, and DELIMITER.
205 file: The file to tokenize
207 ungotten_char: The most recently ungotten character, or None.
209 ungotten_token: The most recently ungotten token, or None.
211 multiline: The current multiline level. This value is increased
212 by one every time a '(' delimiter is read, and decreased by one every time
213 a ')' delimiter is read.
215 quoting: This variable is true if the tokenizer is currently
216 reading a quoted string.
218 eof: This variable is true if the tokenizer has encountered EOF.
220 delimiters: The current delimiter dictionary.
222 line_number: The current line number
224 filename: A filename that will be returned by the where() method.
226 idna_codec: A dns.name.IDNACodec, specifies the IDNA
227 encoder/decoder. If None, the default IDNA 2003
228 encoder/decoder is used.
229 """
231 def __init__(
232 self,
233 f: Any = sys.stdin,
234 filename: Optional[str] = None,
235 idna_codec: Optional[dns.name.IDNACodec] = None,
236 ):
237 """Initialize a tokenizer instance.
239 f: The file to tokenize. The default is sys.stdin.
240 This parameter may also be a string, in which case the tokenizer
241 will take its input from the contents of the string.
243 filename: the name of the filename that the where() method
244 will return.
246 idna_codec: A dns.name.IDNACodec, specifies the IDNA
247 encoder/decoder. If None, the default IDNA 2003
248 encoder/decoder is used.
249 """
251 if isinstance(f, str):
252 f = io.StringIO(f)
253 if filename is None:
254 filename = "<string>"
255 elif isinstance(f, bytes):
256 f = io.StringIO(f.decode())
257 if filename is None:
258 filename = "<string>"
259 else:
260 if filename is None:
261 if f is sys.stdin:
262 filename = "<stdin>"
263 else:
264 filename = "<file>"
265 self.file = f
266 self.ungotten_char: Optional[str] = None
267 self.ungotten_token: Optional[Token] = None
268 self.multiline = 0
269 self.quoting = False
270 self.eof = False
271 self.delimiters = _DELIMITERS
272 self.line_number = 1
273 assert filename is not None
274 self.filename = filename
275 if idna_codec is None:
276 self.idna_codec: dns.name.IDNACodec = dns.name.IDNA_2003
277 else:
278 self.idna_codec = idna_codec
280 def _get_char(self) -> str:
281 """Read a character from input."""
283 if self.ungotten_char is None:
284 if self.eof:
285 c = ""
286 else:
287 c = self.file.read(1)
288 if c == "":
289 self.eof = True
290 elif c == "\n":
291 self.line_number += 1
292 else:
293 c = self.ungotten_char
294 self.ungotten_char = None
295 return c
297 def where(self) -> Tuple[str, int]:
298 """Return the current location in the input.
300 Returns a (string, int) tuple. The first item is the filename of
301 the input, the second is the current line number.
302 """
304 return (self.filename, self.line_number)
306 def _unget_char(self, c: str) -> None:
307 """Unget a character.
309 The unget buffer for characters is only one character large; it is
310 an error to try to unget a character when the unget buffer is not
311 empty.
313 c: the character to unget
314 raises UngetBufferFull: there is already an ungotten char
315 """
317 if self.ungotten_char is not None:
318 # this should never happen!
319 raise UngetBufferFull # pragma: no cover
320 self.ungotten_char = c
322 def skip_whitespace(self) -> int:
323 """Consume input until a non-whitespace character is encountered.
325 The non-whitespace character is then ungotten, and the number of
326 whitespace characters consumed is returned.
328 If the tokenizer is in multiline mode, then newlines are whitespace.
330 Returns the number of characters skipped.
331 """
333 skipped = 0
334 while True:
335 c = self._get_char()
336 if c != " " and c != "\t":
337 if (c != "\n") or not self.multiline:
338 self._unget_char(c)
339 return skipped
340 skipped += 1
342 def get(self, want_leading: bool = False, want_comment: bool = False) -> Token:
343 """Get the next token.
345 want_leading: If True, return a WHITESPACE token if the
346 first character read is whitespace. The default is False.
348 want_comment: If True, return a COMMENT token if the
349 first token read is a comment. The default is False.
351 Raises dns.exception.UnexpectedEnd: input ended prematurely
353 Raises dns.exception.SyntaxError: input was badly formed
355 Returns a Token.
356 """
358 if self.ungotten_token is not None:
359 utoken = self.ungotten_token
360 self.ungotten_token = None
361 if utoken.is_whitespace():
362 if want_leading:
363 return utoken
364 elif utoken.is_comment():
365 if want_comment:
366 return utoken
367 else:
368 return utoken
369 skipped = self.skip_whitespace()
370 if want_leading and skipped > 0:
371 return Token(WHITESPACE, " ")
372 token = ""
373 ttype = IDENTIFIER
374 has_escape = False
375 while True:
376 c = self._get_char()
377 if c == "" or c in self.delimiters:
378 if c == "" and self.quoting:
379 raise dns.exception.UnexpectedEnd
380 if token == "" and ttype != QUOTED_STRING:
381 if c == "(":
382 self.multiline += 1
383 self.skip_whitespace()
384 continue
385 elif c == ")":
386 if self.multiline <= 0:
387 raise dns.exception.SyntaxError
388 self.multiline -= 1
389 self.skip_whitespace()
390 continue
391 elif c == '"':
392 if not self.quoting:
393 self.quoting = True
394 self.delimiters = _QUOTING_DELIMITERS
395 ttype = QUOTED_STRING
396 continue
397 else:
398 self.quoting = False
399 self.delimiters = _DELIMITERS
400 self.skip_whitespace()
401 continue
402 elif c == "\n":
403 return Token(EOL, "\n")
404 elif c == ";":
405 while 1:
406 c = self._get_char()
407 if c == "\n" or c == "":
408 break
409 token += c
410 if want_comment:
411 self._unget_char(c)
412 return Token(COMMENT, token)
413 elif c == "":
414 if self.multiline:
415 raise dns.exception.SyntaxError(
416 "unbalanced parentheses"
417 )
418 return Token(EOF, comment=token)
419 elif self.multiline:
420 self.skip_whitespace()
421 token = ""
422 continue
423 else:
424 return Token(EOL, "\n", comment=token)
425 else:
426 # This code exists in case we ever want a
427 # delimiter to be returned. It never produces
428 # a token currently.
429 token = c
430 ttype = DELIMITER
431 else:
432 self._unget_char(c)
433 break
434 elif self.quoting and c == "\n":
435 raise dns.exception.SyntaxError("newline in quoted string")
436 elif c == "\\":
437 #
438 # It's an escape. Put it and the next character into
439 # the token; it will be checked later for goodness.
440 #
441 token += c
442 has_escape = True
443 c = self._get_char()
444 if c == "" or (c == "\n" and not self.quoting):
445 raise dns.exception.UnexpectedEnd
446 token += c
447 if token == "" and ttype != QUOTED_STRING:
448 if self.multiline:
449 raise dns.exception.SyntaxError("unbalanced parentheses")
450 ttype = EOF
451 return Token(ttype, token, has_escape)
453 def unget(self, token: Token) -> None:
454 """Unget a token.
456 The unget buffer for tokens is only one token large; it is
457 an error to try to unget a token when the unget buffer is not
458 empty.
460 token: the token to unget
462 Raises UngetBufferFull: there is already an ungotten token
463 """
465 if self.ungotten_token is not None:
466 raise UngetBufferFull
467 self.ungotten_token = token
469 def next(self):
470 """Return the next item in an iteration.
472 Returns a Token.
473 """
475 token = self.get()
476 if token.is_eof():
477 raise StopIteration
478 return token
480 __next__ = next
482 def __iter__(self):
483 return self
485 # Helpers
487 def get_int(self, base: int = 10) -> int:
488 """Read the next token and interpret it as an unsigned integer.
490 Raises dns.exception.SyntaxError if not an unsigned integer.
492 Returns an int.
493 """
495 token = self.get().unescape()
496 if not token.is_identifier():
497 raise dns.exception.SyntaxError("expecting an identifier")
498 if not token.value.isdigit():
499 raise dns.exception.SyntaxError("expecting an integer")
500 return int(token.value, base)
502 def get_uint8(self) -> int:
503 """Read the next token and interpret it as an 8-bit unsigned
504 integer.
506 Raises dns.exception.SyntaxError if not an 8-bit unsigned integer.
508 Returns an int.
509 """
511 value = self.get_int()
512 if value < 0 or value > 255:
513 raise dns.exception.SyntaxError(
514 "%d is not an unsigned 8-bit integer" % value
515 )
516 return value
518 def get_uint16(self, base: int = 10) -> int:
519 """Read the next token and interpret it as a 16-bit unsigned
520 integer.
522 Raises dns.exception.SyntaxError if not a 16-bit unsigned integer.
524 Returns an int.
525 """
527 value = self.get_int(base=base)
528 if value < 0 or value > 65535:
529 if base == 8:
530 raise dns.exception.SyntaxError(
531 "%o is not an octal unsigned 16-bit integer" % value
532 )
533 else:
534 raise dns.exception.SyntaxError(
535 "%d is not an unsigned 16-bit integer" % value
536 )
537 return value
539 def get_uint32(self, base: int = 10) -> int:
540 """Read the next token and interpret it as a 32-bit unsigned
541 integer.
543 Raises dns.exception.SyntaxError if not a 32-bit unsigned integer.
545 Returns an int.
546 """
548 value = self.get_int(base=base)
549 if value < 0 or value > 4294967295:
550 raise dns.exception.SyntaxError(
551 "%d is not an unsigned 32-bit integer" % value
552 )
553 return value
555 def get_uint48(self, base: int = 10) -> int:
556 """Read the next token and interpret it as a 48-bit unsigned
557 integer.
559 Raises dns.exception.SyntaxError if not a 48-bit unsigned integer.
561 Returns an int.
562 """
564 value = self.get_int(base=base)
565 if value < 0 or value > 281474976710655:
566 raise dns.exception.SyntaxError(
567 "%d is not an unsigned 48-bit integer" % value
568 )
569 return value
571 def get_string(self, max_length: Optional[int] = None) -> str:
572 """Read the next token and interpret it as a string.
574 Raises dns.exception.SyntaxError if not a string.
575 Raises dns.exception.SyntaxError if token value length
576 exceeds max_length (if specified).
578 Returns a string.
579 """
581 token = self.get().unescape()
582 if not (token.is_identifier() or token.is_quoted_string()):
583 raise dns.exception.SyntaxError("expecting a string")
584 if max_length and len(token.value) > max_length:
585 raise dns.exception.SyntaxError("string too long")
586 return token.value
588 def get_identifier(self) -> str:
589 """Read the next token, which should be an identifier.
591 Raises dns.exception.SyntaxError if not an identifier.
593 Returns a string.
594 """
596 token = self.get().unescape()
597 if not token.is_identifier():
598 raise dns.exception.SyntaxError("expecting an identifier")
599 return token.value
601 def get_remaining(self, max_tokens: Optional[int] = None) -> List[Token]:
602 """Return the remaining tokens on the line, until an EOL or EOF is seen.
604 max_tokens: If not None, stop after this number of tokens.
606 Returns a list of tokens.
607 """
609 tokens = []
610 while True:
611 token = self.get()
612 if token.is_eol_or_eof():
613 self.unget(token)
614 break
615 tokens.append(token)
616 if len(tokens) == max_tokens:
617 break
618 return tokens
620 def concatenate_remaining_identifiers(self, allow_empty: bool = False) -> str:
621 """Read the remaining tokens on the line, which should be identifiers.
623 Raises dns.exception.SyntaxError if there are no remaining tokens,
624 unless `allow_empty=True` is given.
626 Raises dns.exception.SyntaxError if a token is seen that is not an
627 identifier.
629 Returns a string containing a concatenation of the remaining
630 identifiers.
631 """
632 s = ""
633 while True:
634 token = self.get().unescape()
635 if token.is_eol_or_eof():
636 self.unget(token)
637 break
638 if not token.is_identifier():
639 raise dns.exception.SyntaxError
640 s += token.value
641 if not (allow_empty or s):
642 raise dns.exception.SyntaxError("expecting another identifier")
643 return s
645 def as_name(
646 self,
647 token: Token,
648 origin: Optional[dns.name.Name] = None,
649 relativize: bool = False,
650 relativize_to: Optional[dns.name.Name] = None,
651 ) -> dns.name.Name:
652 """Try to interpret the token as a DNS name.
654 Raises dns.exception.SyntaxError if not a name.
656 Returns a dns.name.Name.
657 """
658 if not token.is_identifier():
659 raise dns.exception.SyntaxError("expecting an identifier")
660 name = dns.name.from_text(token.value, origin, self.idna_codec)
661 return name.choose_relativity(relativize_to or origin, relativize)
663 def get_name(
664 self,
665 origin: Optional[dns.name.Name] = None,
666 relativize: bool = False,
667 relativize_to: Optional[dns.name.Name] = None,
668 ) -> dns.name.Name:
669 """Read the next token and interpret it as a DNS name.
671 Raises dns.exception.SyntaxError if not a name.
673 Returns a dns.name.Name.
674 """
676 token = self.get()
677 return self.as_name(token, origin, relativize, relativize_to)
679 def get_eol_as_token(self) -> Token:
680 """Read the next token and raise an exception if it isn't EOL or
681 EOF.
683 Returns a string.
684 """
686 token = self.get()
687 if not token.is_eol_or_eof():
688 raise dns.exception.SyntaxError(
689 'expected EOL or EOF, got %d "%s"' % (token.ttype, token.value)
690 )
691 return token
693 def get_eol(self) -> str:
694 return self.get_eol_as_token().value
696 def get_ttl(self) -> int:
697 """Read the next token and interpret it as a DNS TTL.
699 Raises dns.exception.SyntaxError or dns.ttl.BadTTL if not an
700 identifier or badly formed.
702 Returns an int.
703 """
705 token = self.get().unescape()
706 if not token.is_identifier():
707 raise dns.exception.SyntaxError("expecting an identifier")
708 return dns.ttl.from_text(token.value)