Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dns/wire.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

65 statements  

1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 

2 

3import contextlib 

4import struct 

5from typing import Iterator, Optional, Tuple 

6 

7import dns.exception 

8import dns.name 

9 

10 

11class Parser: 

12 """Helper class for parsing DNS wire format.""" 

13 

14 def __init__(self, wire: bytes, current: int = 0): 

15 """Initialize a Parser 

16 

17 *wire*, a ``bytes`` contains the data to be parsed, and possibly other data. 

18 Typically it is the whole message or a slice of it. 

19 

20 *current*, an `int`, the offset within *wire* where parsing should begin. 

21 """ 

22 self.wire = wire 

23 self.current = 0 

24 self.end = len(self.wire) 

25 if current: 

26 self.seek(current) 

27 self.furthest = current 

28 

29 def remaining(self) -> int: 

30 return self.end - self.current 

31 

32 def get_bytes(self, size: int) -> bytes: 

33 assert size >= 0 

34 if size > self.remaining(): 

35 raise dns.exception.FormError 

36 output = self.wire[self.current : self.current + size] 

37 self.current += size 

38 self.furthest = max(self.furthest, self.current) 

39 return output 

40 

41 def get_counted_bytes(self, length_size: int = 1) -> bytes: 

42 length = int.from_bytes(self.get_bytes(length_size), "big") 

43 return self.get_bytes(length) 

44 

45 def get_remaining(self) -> bytes: 

46 return self.get_bytes(self.remaining()) 

47 

48 def get_uint8(self) -> int: 

49 return struct.unpack("!B", self.get_bytes(1))[0] 

50 

51 def get_uint16(self) -> int: 

52 return struct.unpack("!H", self.get_bytes(2))[0] 

53 

54 def get_uint32(self) -> int: 

55 return struct.unpack("!I", self.get_bytes(4))[0] 

56 

57 def get_uint48(self) -> int: 

58 return int.from_bytes(self.get_bytes(6), "big") 

59 

60 def get_struct(self, format: str) -> Tuple: 

61 return struct.unpack(format, self.get_bytes(struct.calcsize(format))) 

62 

63 def get_name(self, origin: Optional["dns.name.Name"] = None) -> "dns.name.Name": 

64 name = dns.name.from_wire_parser(self) 

65 if origin: 

66 name = name.relativize(origin) 

67 return name 

68 

69 def seek(self, where: int) -> None: 

70 # Note that seeking to the end is OK! (If you try to read 

71 # after such a seek, you'll get an exception as expected.) 

72 if where < 0 or where > self.end: 

73 raise dns.exception.FormError 

74 self.current = where 

75 

76 @contextlib.contextmanager 

77 def restrict_to(self, size: int) -> Iterator: 

78 assert size >= 0 

79 if size > self.remaining(): 

80 raise dns.exception.FormError 

81 saved_end = self.end 

82 try: 

83 self.end = self.current + size 

84 yield 

85 # We make this check here and not in the finally as we 

86 # don't want to raise if we're already raising for some 

87 # other reason. 

88 if self.current != self.end: 

89 raise dns.exception.FormError 

90 finally: 

91 self.end = saved_end 

92 

93 @contextlib.contextmanager 

94 def restore_furthest(self) -> Iterator: 

95 try: 

96 yield None 

97 finally: 

98 self.current = self.furthest