Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dns/wire.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
3import contextlib
4import struct
5from typing import Iterator, Optional, Tuple
7import dns.exception
8import dns.name
11class Parser:
12 """Helper class for parsing DNS wire format."""
14 def __init__(self, wire: bytes, current: int = 0):
15 """Initialize a Parser
17 *wire*, a ``bytes`` contains the data to be parsed, and possibly other data.
18 Typically it is the whole message or a slice of it.
20 *current*, an `int`, the offset within *wire* where parsing should begin.
21 """
22 self.wire = wire
23 self.current = 0
24 self.end = len(self.wire)
25 if current:
26 self.seek(current)
27 self.furthest = current
29 def remaining(self) -> int:
30 return self.end - self.current
32 def get_bytes(self, size: int) -> bytes:
33 assert size >= 0
34 if size > self.remaining():
35 raise dns.exception.FormError
36 output = self.wire[self.current : self.current + size]
37 self.current += size
38 self.furthest = max(self.furthest, self.current)
39 return output
41 def get_counted_bytes(self, length_size: int = 1) -> bytes:
42 length = int.from_bytes(self.get_bytes(length_size), "big")
43 return self.get_bytes(length)
45 def get_remaining(self) -> bytes:
46 return self.get_bytes(self.remaining())
48 def get_uint8(self) -> int:
49 return struct.unpack("!B", self.get_bytes(1))[0]
51 def get_uint16(self) -> int:
52 return struct.unpack("!H", self.get_bytes(2))[0]
54 def get_uint32(self) -> int:
55 return struct.unpack("!I", self.get_bytes(4))[0]
57 def get_uint48(self) -> int:
58 return int.from_bytes(self.get_bytes(6), "big")
60 def get_struct(self, format: str) -> Tuple:
61 return struct.unpack(format, self.get_bytes(struct.calcsize(format)))
63 def get_name(self, origin: Optional["dns.name.Name"] = None) -> "dns.name.Name":
64 name = dns.name.from_wire_parser(self)
65 if origin:
66 name = name.relativize(origin)
67 return name
69 def seek(self, where: int) -> None:
70 # Note that seeking to the end is OK! (If you try to read
71 # after such a seek, you'll get an exception as expected.)
72 if where < 0 or where > self.end:
73 raise dns.exception.FormError
74 self.current = where
76 @contextlib.contextmanager
77 def restrict_to(self, size: int) -> Iterator:
78 assert size >= 0
79 if size > self.remaining():
80 raise dns.exception.FormError
81 saved_end = self.end
82 try:
83 self.end = self.current + size
84 yield
85 # We make this check here and not in the finally as we
86 # don't want to raise if we're already raising for some
87 # other reason.
88 if self.current != self.end:
89 raise dns.exception.FormError
90 finally:
91 self.end = saved_end
93 @contextlib.contextmanager
94 def restore_furthest(self) -> Iterator:
95 try:
96 yield None
97 finally:
98 self.current = self.furthest