Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dns/wirebase.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

59 statements  

1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 

2 

3# We have wirebase and wire to avoid circularity between name.py and wire.py 

4 

5import contextlib 

6import struct 

7from collections.abc import Iterator 

8 

9import dns.exception 

10 

11 

12class Parser: 

13 """Helper class for parsing DNS wire format.""" 

14 

15 def __init__(self, wire: bytes, current: int = 0): 

16 """Initialize a Parser 

17 

18 *wire*, a ``bytes`` contains the data to be parsed, and possibly other data. 

19 Typically it is the whole message or a slice of it. 

20 

21 *current*, an `int`, the offset within *wire* where parsing should begin. 

22 """ 

23 self.wire = wire 

24 self.current = 0 

25 self.end = len(self.wire) 

26 if current: 

27 self.seek(current) 

28 self.furthest = current 

29 

30 def remaining(self) -> int: 

31 return self.end - self.current 

32 

33 def get_bytes(self, size: int) -> bytes: 

34 assert size >= 0 

35 if size > self.remaining(): 

36 raise dns.exception.FormError 

37 output = self.wire[self.current : self.current + size] 

38 self.current += size 

39 self.furthest = max(self.furthest, self.current) 

40 return output 

41 

42 def get_counted_bytes(self, length_size: int = 1) -> bytes: 

43 length = int.from_bytes(self.get_bytes(length_size), "big") 

44 return self.get_bytes(length) 

45 

46 def get_remaining(self) -> bytes: 

47 return self.get_bytes(self.remaining()) 

48 

49 def get_uint8(self) -> int: 

50 return struct.unpack("!B", self.get_bytes(1))[0] 

51 

52 def get_uint16(self) -> int: 

53 return struct.unpack("!H", self.get_bytes(2))[0] 

54 

55 def get_uint32(self) -> int: 

56 return struct.unpack("!I", self.get_bytes(4))[0] 

57 

58 def get_uint48(self) -> int: 

59 return int.from_bytes(self.get_bytes(6), "big") 

60 

61 def get_struct(self, format: str) -> tuple: 

62 return struct.unpack(format, self.get_bytes(struct.calcsize(format))) 

63 

64 def seek(self, where: int) -> None: 

65 # Note that seeking to the end is OK! (If you try to read 

66 # after such a seek, you'll get an exception as expected.) 

67 if where < 0 or where > self.end: 

68 raise dns.exception.FormError 

69 self.current = where 

70 

71 @contextlib.contextmanager 

72 def restrict_to(self, size: int) -> Iterator: 

73 assert size >= 0 

74 if size > self.remaining(): 

75 raise dns.exception.FormError 

76 saved_end = self.end 

77 try: 

78 self.end = self.current + size 

79 yield 

80 # We make this check here and not in the finally as we 

81 # don't want to raise if we're already raising for some 

82 # other reason. 

83 if self.current != self.end: 

84 raise dns.exception.FormError 

85 finally: 

86 self.end = saved_end 

87 

88 @contextlib.contextmanager 

89 def restore_furthest(self) -> Iterator: 

90 try: 

91 yield None 

92 finally: 

93 self.current = self.furthest