Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/rdtypes/util.py: 28%
191 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 07:09 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 07:09 +0000
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
3# Copyright (C) 2006, 2007, 2009-2011 Nominum, Inc.
4#
5# Permission to use, copy, modify, and distribute this software and its
6# documentation for any purpose with or without fee is hereby granted,
7# provided that the above copyright notice and this permission notice
8# appear in all copies.
9#
10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18import collections
19import random
20import struct
21from typing import Any, List
23import dns.exception
24import dns.ipv4
25import dns.ipv6
26import dns.name
27import dns.rdata
30class Gateway:
31 """A helper class for the IPSECKEY gateway and AMTRELAY relay fields"""
33 name = ""
35 def __init__(self, type, gateway=None):
36 self.type = dns.rdata.Rdata._as_uint8(type)
37 self.gateway = gateway
38 self._check()
40 @classmethod
41 def _invalid_type(cls, gateway_type):
42 return f"invalid {cls.name} type: {gateway_type}"
44 def _check(self):
45 if self.type == 0:
46 if self.gateway not in (".", None):
47 raise SyntaxError(f"invalid {self.name} for type 0")
48 self.gateway = None
49 elif self.type == 1:
50 # check that it's OK
51 dns.ipv4.inet_aton(self.gateway)
52 elif self.type == 2:
53 # check that it's OK
54 dns.ipv6.inet_aton(self.gateway)
55 elif self.type == 3:
56 if not isinstance(self.gateway, dns.name.Name):
57 raise SyntaxError(f"invalid {self.name}; not a name")
58 else:
59 raise SyntaxError(self._invalid_type(self.type))
61 def to_text(self, origin=None, relativize=True):
62 if self.type == 0:
63 return "."
64 elif self.type in (1, 2):
65 return self.gateway
66 elif self.type == 3:
67 return str(self.gateway.choose_relativity(origin, relativize))
68 else:
69 raise ValueError(self._invalid_type(self.type)) # pragma: no cover
71 @classmethod
72 def from_text(
73 cls, gateway_type, tok, origin=None, relativize=True, relativize_to=None
74 ):
75 if gateway_type in (0, 1, 2):
76 gateway = tok.get_string()
77 elif gateway_type == 3:
78 gateway = tok.get_name(origin, relativize, relativize_to)
79 else:
80 raise dns.exception.SyntaxError(
81 cls._invalid_type(gateway_type)
82 ) # pragma: no cover
83 return cls(gateway_type, gateway)
85 # pylint: disable=unused-argument
86 def to_wire(self, file, compress=None, origin=None, canonicalize=False):
87 if self.type == 0:
88 pass
89 elif self.type == 1:
90 file.write(dns.ipv4.inet_aton(self.gateway))
91 elif self.type == 2:
92 file.write(dns.ipv6.inet_aton(self.gateway))
93 elif self.type == 3:
94 self.gateway.to_wire(file, None, origin, False)
95 else:
96 raise ValueError(self._invalid_type(self.type)) # pragma: no cover
98 # pylint: enable=unused-argument
100 @classmethod
101 def from_wire_parser(cls, gateway_type, parser, origin=None):
102 if gateway_type == 0:
103 gateway = None
104 elif gateway_type == 1:
105 gateway = dns.ipv4.inet_ntoa(parser.get_bytes(4))
106 elif gateway_type == 2:
107 gateway = dns.ipv6.inet_ntoa(parser.get_bytes(16))
108 elif gateway_type == 3:
109 gateway = parser.get_name(origin)
110 else:
111 raise dns.exception.FormError(cls._invalid_type(gateway_type))
112 return cls(gateway_type, gateway)
115class Bitmap:
116 """A helper class for the NSEC/NSEC3/CSYNC type bitmaps"""
118 type_name = ""
120 def __init__(self, windows=None):
121 last_window = -1
122 self.windows = windows
123 for window, bitmap in self.windows:
124 if not isinstance(window, int):
125 raise ValueError(f"bad {self.type_name} window type")
126 if window <= last_window:
127 raise ValueError(f"bad {self.type_name} window order")
128 if window > 256:
129 raise ValueError(f"bad {self.type_name} window number")
130 last_window = window
131 if not isinstance(bitmap, bytes):
132 raise ValueError(f"bad {self.type_name} octets type")
133 if len(bitmap) == 0 or len(bitmap) > 32:
134 raise ValueError(f"bad {self.type_name} octets")
136 def to_text(self) -> str:
137 text = ""
138 for window, bitmap in self.windows:
139 bits = []
140 for i, byte in enumerate(bitmap):
141 for j in range(0, 8):
142 if byte & (0x80 >> j):
143 rdtype = window * 256 + i * 8 + j
144 bits.append(dns.rdatatype.to_text(rdtype))
145 text += " " + " ".join(bits)
146 return text
148 @classmethod
149 def from_text(cls, tok: "dns.tokenizer.Tokenizer") -> "Bitmap":
150 rdtypes = []
151 for token in tok.get_remaining():
152 rdtype = dns.rdatatype.from_text(token.unescape().value)
153 if rdtype == 0:
154 raise dns.exception.SyntaxError(f"{cls.type_name} with bit 0")
155 rdtypes.append(rdtype)
156 return cls.from_rdtypes(rdtypes)
158 @classmethod
159 def from_rdtypes(cls, rdtypes: List[dns.rdatatype.RdataType]) -> "Bitmap":
160 rdtypes = sorted(rdtypes)
161 window = 0
162 octets = 0
163 prior_rdtype = 0
164 bitmap = bytearray(b"\0" * 32)
165 windows = []
166 for rdtype in rdtypes:
167 if rdtype == prior_rdtype:
168 continue
169 prior_rdtype = rdtype
170 new_window = rdtype // 256
171 if new_window != window:
172 if octets != 0:
173 windows.append((window, bytes(bitmap[0:octets])))
174 bitmap = bytearray(b"\0" * 32)
175 window = new_window
176 offset = rdtype % 256
177 byte = offset // 8
178 bit = offset % 8
179 octets = byte + 1
180 bitmap[byte] = bitmap[byte] | (0x80 >> bit)
181 if octets != 0:
182 windows.append((window, bytes(bitmap[0:octets])))
183 return cls(windows)
185 def to_wire(self, file: Any) -> None:
186 for window, bitmap in self.windows:
187 file.write(struct.pack("!BB", window, len(bitmap)))
188 file.write(bitmap)
190 @classmethod
191 def from_wire_parser(cls, parser: "dns.wire.Parser") -> "Bitmap":
192 windows = []
193 while parser.remaining() > 0:
194 window = parser.get_uint8()
195 bitmap = parser.get_counted_bytes()
196 windows.append((window, bitmap))
197 return cls(windows)
200def _priority_table(items):
201 by_priority = collections.defaultdict(list)
202 for rdata in items:
203 by_priority[rdata._processing_priority()].append(rdata)
204 return by_priority
207def priority_processing_order(iterable):
208 items = list(iterable)
209 if len(items) == 1:
210 return items
211 by_priority = _priority_table(items)
212 ordered = []
213 for k in sorted(by_priority.keys()):
214 rdatas = by_priority[k]
215 random.shuffle(rdatas)
216 ordered.extend(rdatas)
217 return ordered
220_no_weight = 0.1
223def weighted_processing_order(iterable):
224 items = list(iterable)
225 if len(items) == 1:
226 return items
227 by_priority = _priority_table(items)
228 ordered = []
229 for k in sorted(by_priority.keys()):
230 rdatas = by_priority[k]
231 total = sum(rdata._processing_weight() or _no_weight for rdata in rdatas)
232 while len(rdatas) > 1:
233 r = random.uniform(0, total)
234 for n, rdata in enumerate(rdatas):
235 weight = rdata._processing_weight() or _no_weight
236 if weight > r:
237 break
238 r -= weight
239 total -= weight
240 ordered.append(rdata) # pylint: disable=undefined-loop-variable
241 del rdatas[n] # pylint: disable=undefined-loop-variable
242 ordered.append(rdatas[0])
243 return ordered
246def parse_formatted_hex(formatted, num_chunks, chunk_size, separator):
247 if len(formatted) != num_chunks * (chunk_size + 1) - 1:
248 raise ValueError("invalid formatted hex string")
249 value = b""
250 for _ in range(num_chunks):
251 chunk = formatted[0:chunk_size]
252 value += int(chunk, 16).to_bytes(chunk_size // 2, "big")
253 formatted = formatted[chunk_size:]
254 if len(formatted) > 0 and formatted[0] != separator:
255 raise ValueError("invalid formatted hex string")
256 formatted = formatted[1:]
257 return value