Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/wire.py: 34%
64 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
3import contextlib
4import struct
5from typing import Iterator, Optional, Tuple
7import dns.exception
8import dns.name
11class Parser:
12 def __init__(self, wire: bytes, current: int = 0):
13 self.wire = wire
14 self.current = 0
15 self.end = len(self.wire)
16 if current:
17 self.seek(current)
18 self.furthest = current
20 def remaining(self) -> int:
21 return self.end - self.current
23 def get_bytes(self, size: int) -> bytes:
24 assert size >= 0
25 if size > self.remaining():
26 raise dns.exception.FormError
27 output = self.wire[self.current : self.current + size]
28 self.current += size
29 self.furthest = max(self.furthest, self.current)
30 return output
32 def get_counted_bytes(self, length_size: int = 1) -> bytes:
33 length = int.from_bytes(self.get_bytes(length_size), "big")
34 return self.get_bytes(length)
36 def get_remaining(self) -> bytes:
37 return self.get_bytes(self.remaining())
39 def get_uint8(self) -> int:
40 return struct.unpack("!B", self.get_bytes(1))[0]
42 def get_uint16(self) -> int:
43 return struct.unpack("!H", self.get_bytes(2))[0]
45 def get_uint32(self) -> int:
46 return struct.unpack("!I", self.get_bytes(4))[0]
48 def get_uint48(self) -> int:
49 return int.from_bytes(self.get_bytes(6), "big")
51 def get_struct(self, format: str) -> Tuple:
52 return struct.unpack(format, self.get_bytes(struct.calcsize(format)))
54 def get_name(self, origin: Optional["dns.name.Name"] = None) -> "dns.name.Name":
55 name = dns.name.from_wire_parser(self)
56 if origin:
57 name = name.relativize(origin)
58 return name
60 def seek(self, where: int) -> None:
61 # Note that seeking to the end is OK! (If you try to read
62 # after such a seek, you'll get an exception as expected.)
63 if where < 0 or where > self.end:
64 raise dns.exception.FormError
65 self.current = where
67 @contextlib.contextmanager
68 def restrict_to(self, size: int) -> Iterator:
69 assert size >= 0
70 if size > self.remaining():
71 raise dns.exception.FormError
72 saved_end = self.end
73 try:
74 self.end = self.current + size
75 yield
76 # We make this check here and not in the finally as we
77 # don't want to raise if we're already raising for some
78 # other reason.
79 if self.current != self.end:
80 raise dns.exception.FormError
81 finally:
82 self.end = saved_end
84 @contextlib.contextmanager
85 def restore_furthest(self) -> Iterator:
86 try:
87 yield None
88 finally:
89 self.current = self.furthest