Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/rdataset.py: 39%
193 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
3# Copyright (C) 2001-2017 Nominum, Inc.
4#
5# Permission to use, copy, modify, and distribute this software and its
6# documentation for any purpose with or without fee is hereby granted,
7# provided that the above copyright notice and this permission notice
8# appear in all copies.
9#
10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18"""DNS rdatasets (an rdataset is a set of rdatas of a given type and class)"""
20import io
21import random
22import struct
23from typing import Any, Collection, Dict, List, Optional, Union, cast
25import dns.exception
26import dns.immutable
27import dns.name
28import dns.rdata
29import dns.rdataclass
30import dns.rdatatype
31import dns.renderer
32import dns.set
33import dns.ttl
35# define SimpleSet here for backwards compatibility
36SimpleSet = dns.set.Set
39class DifferingCovers(dns.exception.DNSException):
40 """An attempt was made to add a DNS SIG/RRSIG whose covered type
41 is not the same as that of the other rdatas in the rdataset."""
44class IncompatibleTypes(dns.exception.DNSException):
45 """An attempt was made to add DNS RR data of an incompatible type."""
48class Rdataset(dns.set.Set):
50 """A DNS rdataset."""
52 __slots__ = ["rdclass", "rdtype", "covers", "ttl"]
54 def __init__(
55 self,
56 rdclass: dns.rdataclass.RdataClass,
57 rdtype: dns.rdatatype.RdataType,
58 covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
59 ttl: int = 0,
60 ):
61 """Create a new rdataset of the specified class and type.
63 *rdclass*, a ``dns.rdataclass.RdataClass``, the rdataclass.
65 *rdtype*, an ``dns.rdatatype.RdataType``, the rdatatype.
67 *covers*, an ``dns.rdatatype.RdataType``, the covered rdatatype.
69 *ttl*, an ``int``, the TTL.
70 """
72 super().__init__()
73 self.rdclass = rdclass
74 self.rdtype: dns.rdatatype.RdataType = rdtype
75 self.covers: dns.rdatatype.RdataType = covers
76 self.ttl = ttl
78 def _clone(self):
79 obj = super()._clone()
80 obj.rdclass = self.rdclass
81 obj.rdtype = self.rdtype
82 obj.covers = self.covers
83 obj.ttl = self.ttl
84 return obj
86 def update_ttl(self, ttl: int) -> None:
87 """Perform TTL minimization.
89 Set the TTL of the rdataset to be the lesser of the set's current
90 TTL or the specified TTL. If the set contains no rdatas, set the TTL
91 to the specified TTL.
93 *ttl*, an ``int`` or ``str``.
94 """
95 ttl = dns.ttl.make(ttl)
96 if len(self) == 0:
97 self.ttl = ttl
98 elif ttl < self.ttl:
99 self.ttl = ttl
101 def add( # pylint: disable=arguments-differ,arguments-renamed
102 self, rd: dns.rdata.Rdata, ttl: Optional[int] = None
103 ) -> None:
104 """Add the specified rdata to the rdataset.
106 If the optional *ttl* parameter is supplied, then
107 ``self.update_ttl(ttl)`` will be called prior to adding the rdata.
109 *rd*, a ``dns.rdata.Rdata``, the rdata
111 *ttl*, an ``int``, the TTL.
113 Raises ``dns.rdataset.IncompatibleTypes`` if the type and class
114 do not match the type and class of the rdataset.
116 Raises ``dns.rdataset.DifferingCovers`` if the type is a signature
117 type and the covered type does not match that of the rdataset.
118 """
120 #
121 # If we're adding a signature, do some special handling to
122 # check that the signature covers the same type as the
123 # other rdatas in this rdataset. If this is the first rdata
124 # in the set, initialize the covers field.
125 #
126 if self.rdclass != rd.rdclass or self.rdtype != rd.rdtype:
127 raise IncompatibleTypes
128 if ttl is not None:
129 self.update_ttl(ttl)
130 if self.rdtype == dns.rdatatype.RRSIG or self.rdtype == dns.rdatatype.SIG:
131 covers = rd.covers()
132 if len(self) == 0 and self.covers == dns.rdatatype.NONE:
133 self.covers = covers
134 elif self.covers != covers:
135 raise DifferingCovers
136 if dns.rdatatype.is_singleton(rd.rdtype) and len(self) > 0:
137 self.clear()
138 super().add(rd)
140 def union_update(self, other):
141 self.update_ttl(other.ttl)
142 super().union_update(other)
144 def intersection_update(self, other):
145 self.update_ttl(other.ttl)
146 super().intersection_update(other)
148 def update(self, other):
149 """Add all rdatas in other to self.
151 *other*, a ``dns.rdataset.Rdataset``, the rdataset from which
152 to update.
153 """
155 self.update_ttl(other.ttl)
156 super().update(other)
158 def _rdata_repr(self):
159 def maybe_truncate(s):
160 if len(s) > 100:
161 return s[:100] + "..."
162 return s
164 return "[%s]" % ", ".join("<%s>" % maybe_truncate(str(rr)) for rr in self)
166 def __repr__(self):
167 if self.covers == 0:
168 ctext = ""
169 else:
170 ctext = "(" + dns.rdatatype.to_text(self.covers) + ")"
171 return (
172 "<DNS "
173 + dns.rdataclass.to_text(self.rdclass)
174 + " "
175 + dns.rdatatype.to_text(self.rdtype)
176 + ctext
177 + " rdataset: "
178 + self._rdata_repr()
179 + ">"
180 )
182 def __str__(self):
183 return self.to_text()
185 def __eq__(self, other):
186 if not isinstance(other, Rdataset):
187 return False
188 if (
189 self.rdclass != other.rdclass
190 or self.rdtype != other.rdtype
191 or self.covers != other.covers
192 ):
193 return False
194 return super().__eq__(other)
196 def __ne__(self, other):
197 return not self.__eq__(other)
199 def to_text(
200 self,
201 name: Optional[dns.name.Name] = None,
202 origin: Optional[dns.name.Name] = None,
203 relativize: bool = True,
204 override_rdclass: Optional[dns.rdataclass.RdataClass] = None,
205 want_comments: bool = False,
206 **kw: Dict[str, Any],
207 ) -> str:
208 """Convert the rdataset into DNS zone file format.
210 See ``dns.name.Name.choose_relativity`` for more information
211 on how *origin* and *relativize* determine the way names
212 are emitted.
214 Any additional keyword arguments are passed on to the rdata
215 ``to_text()`` method.
217 *name*, a ``dns.name.Name``. If name is not ``None``, emit RRs with
218 *name* as the owner name.
220 *origin*, a ``dns.name.Name`` or ``None``, the origin for relative
221 names.
223 *relativize*, a ``bool``. If ``True``, names will be relativized
224 to *origin*.
226 *override_rdclass*, a ``dns.rdataclass.RdataClass`` or ``None``.
227 If not ``None``, use this class instead of the Rdataset's class.
229 *want_comments*, a ``bool``. If ``True``, emit comments for rdata
230 which have them. The default is ``False``.
231 """
233 if name is not None:
234 name = name.choose_relativity(origin, relativize)
235 ntext = str(name)
236 pad = " "
237 else:
238 ntext = ""
239 pad = ""
240 s = io.StringIO()
241 if override_rdclass is not None:
242 rdclass = override_rdclass
243 else:
244 rdclass = self.rdclass
245 if len(self) == 0:
246 #
247 # Empty rdatasets are used for the question section, and in
248 # some dynamic updates, so we don't need to print out the TTL
249 # (which is meaningless anyway).
250 #
251 s.write(
252 "{}{}{} {}\n".format(
253 ntext,
254 pad,
255 dns.rdataclass.to_text(rdclass),
256 dns.rdatatype.to_text(self.rdtype),
257 )
258 )
259 else:
260 for rd in self:
261 extra = ""
262 if want_comments:
263 if rd.rdcomment:
264 extra = f" ;{rd.rdcomment}"
265 s.write(
266 "%s%s%d %s %s %s%s\n"
267 % (
268 ntext,
269 pad,
270 self.ttl,
271 dns.rdataclass.to_text(rdclass),
272 dns.rdatatype.to_text(self.rdtype),
273 rd.to_text(origin=origin, relativize=relativize, **kw),
274 extra,
275 )
276 )
277 #
278 # We strip off the final \n for the caller's convenience in printing
279 #
280 return s.getvalue()[:-1]
282 def to_wire(
283 self,
284 name: dns.name.Name,
285 file: Any,
286 compress: Optional[dns.name.CompressType] = None,
287 origin: Optional[dns.name.Name] = None,
288 override_rdclass: Optional[dns.rdataclass.RdataClass] = None,
289 want_shuffle: bool = True,
290 ) -> int:
291 """Convert the rdataset to wire format.
293 *name*, a ``dns.name.Name`` is the owner name to use.
295 *file* is the file where the name is emitted (typically a
296 BytesIO file).
298 *compress*, a ``dict``, is the compression table to use. If
299 ``None`` (the default), names will not be compressed.
301 *origin* is a ``dns.name.Name`` or ``None``. If the name is
302 relative and origin is not ``None``, then *origin* will be appended
303 to it.
305 *override_rdclass*, an ``int``, is used as the class instead of the
306 class of the rdataset. This is useful when rendering rdatasets
307 associated with dynamic updates.
309 *want_shuffle*, a ``bool``. If ``True``, then the order of the
310 Rdatas within the Rdataset will be shuffled before rendering.
312 Returns an ``int``, the number of records emitted.
313 """
315 if override_rdclass is not None:
316 rdclass = override_rdclass
317 want_shuffle = False
318 else:
319 rdclass = self.rdclass
320 if len(self) == 0:
321 name.to_wire(file, compress, origin)
322 file.write(struct.pack("!HHIH", self.rdtype, rdclass, 0, 0))
323 return 1
324 else:
325 l: Union[Rdataset, List[dns.rdata.Rdata]]
326 if want_shuffle:
327 l = list(self)
328 random.shuffle(l)
329 else:
330 l = self
331 for rd in l:
332 name.to_wire(file, compress, origin)
333 file.write(struct.pack("!HHI", self.rdtype, rdclass, self.ttl))
334 with dns.renderer.prefixed_length(file, 2):
335 rd.to_wire(file, compress, origin)
336 return len(self)
338 def match(
339 self,
340 rdclass: dns.rdataclass.RdataClass,
341 rdtype: dns.rdatatype.RdataType,
342 covers: dns.rdatatype.RdataType,
343 ) -> bool:
344 """Returns ``True`` if this rdataset matches the specified class,
345 type, and covers.
346 """
347 if self.rdclass == rdclass and self.rdtype == rdtype and self.covers == covers:
348 return True
349 return False
351 def processing_order(self) -> List[dns.rdata.Rdata]:
352 """Return rdatas in a valid processing order according to the type's
353 specification. For example, MX records are in preference order from
354 lowest to highest preferences, with items of the same preference
355 shuffled.
357 For types that do not define a processing order, the rdatas are
358 simply shuffled.
359 """
360 if len(self) == 0:
361 return []
362 else:
363 return self[0]._processing_order(iter(self))
366@dns.immutable.immutable
367class ImmutableRdataset(Rdataset): # lgtm[py/missing-equals]
369 """An immutable DNS rdataset."""
371 _clone_class = Rdataset
373 def __init__(self, rdataset: Rdataset):
374 """Create an immutable rdataset from the specified rdataset."""
376 super().__init__(
377 rdataset.rdclass, rdataset.rdtype, rdataset.covers, rdataset.ttl
378 )
379 self.items = dns.immutable.Dict(rdataset.items)
381 def update_ttl(self, ttl):
382 raise TypeError("immutable")
384 def add(self, rd, ttl=None):
385 raise TypeError("immutable")
387 def union_update(self, other):
388 raise TypeError("immutable")
390 def intersection_update(self, other):
391 raise TypeError("immutable")
393 def update(self, other):
394 raise TypeError("immutable")
396 def __delitem__(self, i):
397 raise TypeError("immutable")
399 # lgtm complains about these not raising ArithmeticError, but there is
400 # precedent for overrides of these methods in other classes to raise
401 # TypeError, and it seems like the better exception.
403 def __ior__(self, other): # lgtm[py/unexpected-raise-in-special-method]
404 raise TypeError("immutable")
406 def __iand__(self, other): # lgtm[py/unexpected-raise-in-special-method]
407 raise TypeError("immutable")
409 def __iadd__(self, other): # lgtm[py/unexpected-raise-in-special-method]
410 raise TypeError("immutable")
412 def __isub__(self, other): # lgtm[py/unexpected-raise-in-special-method]
413 raise TypeError("immutable")
415 def clear(self):
416 raise TypeError("immutable")
418 def __copy__(self):
419 return ImmutableRdataset(super().copy())
421 def copy(self):
422 return ImmutableRdataset(super().copy())
424 def union(self, other):
425 return ImmutableRdataset(super().union(other))
427 def intersection(self, other):
428 return ImmutableRdataset(super().intersection(other))
430 def difference(self, other):
431 return ImmutableRdataset(super().difference(other))
433 def symmetric_difference(self, other):
434 return ImmutableRdataset(super().symmetric_difference(other))
437def from_text_list(
438 rdclass: Union[dns.rdataclass.RdataClass, str],
439 rdtype: Union[dns.rdatatype.RdataType, str],
440 ttl: int,
441 text_rdatas: Collection[str],
442 idna_codec: Optional[dns.name.IDNACodec] = None,
443 origin: Optional[dns.name.Name] = None,
444 relativize: bool = True,
445 relativize_to: Optional[dns.name.Name] = None,
446) -> Rdataset:
447 """Create an rdataset with the specified class, type, and TTL, and with
448 the specified list of rdatas in text format.
450 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
451 encoder/decoder to use; if ``None``, the default IDNA 2003
452 encoder/decoder is used.
454 *origin*, a ``dns.name.Name`` (or ``None``), the
455 origin to use for relative names.
457 *relativize*, a ``bool``. If true, name will be relativized.
459 *relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use
460 when relativizing names. If not set, the *origin* value will be used.
462 Returns a ``dns.rdataset.Rdataset`` object.
463 """
465 rdclass = dns.rdataclass.RdataClass.make(rdclass)
466 rdtype = dns.rdatatype.RdataType.make(rdtype)
467 r = Rdataset(rdclass, rdtype)
468 r.update_ttl(ttl)
469 for t in text_rdatas:
470 rd = dns.rdata.from_text(
471 r.rdclass, r.rdtype, t, origin, relativize, relativize_to, idna_codec
472 )
473 r.add(rd)
474 return r
477def from_text(
478 rdclass: Union[dns.rdataclass.RdataClass, str],
479 rdtype: Union[dns.rdatatype.RdataType, str],
480 ttl: int,
481 *text_rdatas: Any,
482) -> Rdataset:
483 """Create an rdataset with the specified class, type, and TTL, and with
484 the specified rdatas in text format.
486 Returns a ``dns.rdataset.Rdataset`` object.
487 """
489 return from_text_list(rdclass, rdtype, ttl, cast(Collection[str], text_rdatas))
492def from_rdata_list(ttl: int, rdatas: Collection[dns.rdata.Rdata]) -> Rdataset:
493 """Create an rdataset with the specified TTL, and with
494 the specified list of rdata objects.
496 Returns a ``dns.rdataset.Rdataset`` object.
497 """
499 if len(rdatas) == 0:
500 raise ValueError("rdata list must not be empty")
501 r = None
502 for rd in rdatas:
503 if r is None:
504 r = Rdataset(rd.rdclass, rd.rdtype)
505 r.update_ttl(ttl)
506 r.add(rd)
507 assert r is not None
508 return r
511def from_rdata(ttl: int, *rdatas: Any) -> Rdataset:
512 """Create an rdataset with the specified TTL, and with
513 the specified rdata objects.
515 Returns a ``dns.rdataset.Rdataset`` object.
516 """
518 return from_rdata_list(ttl, cast(Collection[dns.rdata.Rdata], rdatas))