Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dns/wirebase.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
3# We have wirebase and wire to avoid circularity between name.py and wire.py
5import contextlib
6import struct
7from collections.abc import Iterator
9import dns.exception
12class Parser:
13 """Helper class for parsing DNS wire format."""
15 def __init__(self, wire: bytes, current: int = 0):
16 """Initialize a Parser
18 *wire*, a ``bytes`` contains the data to be parsed, and possibly other data.
19 Typically it is the whole message or a slice of it.
21 *current*, an `int`, the offset within *wire* where parsing should begin.
22 """
23 self.wire = wire
24 self.current = 0
25 self.end = len(self.wire)
26 if current:
27 self.seek(current)
28 self.furthest = current
30 def remaining(self) -> int:
31 return self.end - self.current
33 def get_bytes(self, size: int) -> bytes:
34 assert size >= 0
35 if size > self.remaining():
36 raise dns.exception.FormError
37 output = self.wire[self.current : self.current + size]
38 self.current += size
39 self.furthest = max(self.furthest, self.current)
40 return output
42 def get_counted_bytes(self, length_size: int = 1) -> bytes:
43 length = int.from_bytes(self.get_bytes(length_size), "big")
44 return self.get_bytes(length)
46 def get_remaining(self) -> bytes:
47 return self.get_bytes(self.remaining())
49 def get_uint8(self) -> int:
50 return struct.unpack("!B", self.get_bytes(1))[0]
52 def get_uint16(self) -> int:
53 return struct.unpack("!H", self.get_bytes(2))[0]
55 def get_uint32(self) -> int:
56 return struct.unpack("!I", self.get_bytes(4))[0]
58 def get_uint48(self) -> int:
59 return int.from_bytes(self.get_bytes(6), "big")
61 def get_struct(self, format: str) -> tuple:
62 return struct.unpack(format, self.get_bytes(struct.calcsize(format)))
64 def seek(self, where: int) -> None:
65 # Note that seeking to the end is OK! (If you try to read
66 # after such a seek, you'll get an exception as expected.)
67 if where < 0 or where > self.end:
68 raise dns.exception.FormError
69 self.current = where
71 @contextlib.contextmanager
72 def restrict_to(self, size: int) -> Iterator:
73 assert size >= 0
74 if size > self.remaining():
75 raise dns.exception.FormError
76 saved_end = self.end
77 try:
78 self.end = self.current + size
79 yield
80 # We make this check here and not in the finally as we
81 # don't want to raise if we're already raising for some
82 # other reason.
83 if self.current != self.end:
84 raise dns.exception.FormError
85 finally:
86 self.end = saved_end
88 @contextlib.contextmanager
89 def restore_furthest(self) -> Iterator:
90 try:
91 yield None
92 finally:
93 self.current = self.furthest