Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/update.py: 43%
141 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
3# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
4#
5# Permission to use, copy, modify, and distribute this software and its
6# documentation for any purpose with or without fee is hereby granted,
7# provided that the above copyright notice and this permission notice
8# appear in all copies.
9#
10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18"""DNS Dynamic Update Support"""
20from typing import Any, List, Optional, Union
22import dns.message
23import dns.name
24import dns.opcode
25import dns.rdata
26import dns.rdataclass
27import dns.rdataset
28import dns.rdatatype
29import dns.tsig
32class UpdateSection(dns.enum.IntEnum):
33 """Update sections"""
35 ZONE = 0
36 PREREQ = 1
37 UPDATE = 2
38 ADDITIONAL = 3
40 @classmethod
41 def _maximum(cls):
42 return 3
45class UpdateMessage(dns.message.Message): # lgtm[py/missing-equals]
46 # ignore the mypy error here as we mean to use a different enum
47 _section_enum = UpdateSection # type: ignore
49 def __init__(
50 self,
51 zone: Optional[Union[dns.name.Name, str]] = None,
52 rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
53 keyring: Optional[Any] = None,
54 keyname: Optional[dns.name.Name] = None,
55 keyalgorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm,
56 id: Optional[int] = None,
57 ):
58 """Initialize a new DNS Update object.
60 See the documentation of the Message class for a complete
61 description of the keyring dictionary.
63 *zone*, a ``dns.name.Name``, ``str``, or ``None``, the zone
64 which is being updated. ``None`` should only be used by dnspython's
65 message constructors, as a zone is required for the convenience
66 methods like ``add()``, ``replace()``, etc.
68 *rdclass*, an ``int`` or ``str``, the class of the zone.
70 The *keyring*, *keyname*, and *keyalgorithm* parameters are passed to
71 ``use_tsig()``; see its documentation for details.
72 """
73 super().__init__(id=id)
74 self.flags |= dns.opcode.to_flags(dns.opcode.UPDATE)
75 if isinstance(zone, str):
76 zone = dns.name.from_text(zone)
77 self.origin = zone
78 rdclass = dns.rdataclass.RdataClass.make(rdclass)
79 self.zone_rdclass = rdclass
80 if self.origin:
81 self.find_rrset(
82 self.zone,
83 self.origin,
84 rdclass,
85 dns.rdatatype.SOA,
86 create=True,
87 force_unique=True,
88 )
89 if keyring is not None:
90 self.use_tsig(keyring, keyname, algorithm=keyalgorithm)
92 @property
93 def zone(self) -> List[dns.rrset.RRset]:
94 """The zone section."""
95 return self.sections[0]
97 @zone.setter
98 def zone(self, v):
99 self.sections[0] = v
101 @property
102 def prerequisite(self) -> List[dns.rrset.RRset]:
103 """The prerequisite section."""
104 return self.sections[1]
106 @prerequisite.setter
107 def prerequisite(self, v):
108 self.sections[1] = v
110 @property
111 def update(self) -> List[dns.rrset.RRset]:
112 """The update section."""
113 return self.sections[2]
115 @update.setter
116 def update(self, v):
117 self.sections[2] = v
119 def _add_rr(self, name, ttl, rd, deleting=None, section=None):
120 """Add a single RR to the update section."""
122 if section is None:
123 section = self.update
124 covers = rd.covers()
125 rrset = self.find_rrset(
126 section, name, self.zone_rdclass, rd.rdtype, covers, deleting, True, True
127 )
128 rrset.add(rd, ttl)
130 def _add(self, replace, section, name, *args):
131 """Add records.
133 *replace* is the replacement mode. If ``False``,
134 RRs are added to an existing RRset; if ``True``, the RRset
135 is replaced with the specified contents. The second
136 argument is the section to add to. The third argument
137 is always a name. The other arguments can be:
139 - rdataset...
141 - ttl, rdata...
143 - ttl, rdtype, string...
144 """
146 if isinstance(name, str):
147 name = dns.name.from_text(name, None)
148 if isinstance(args[0], dns.rdataset.Rdataset):
149 for rds in args:
150 if replace:
151 self.delete(name, rds.rdtype)
152 for rd in rds:
153 self._add_rr(name, rds.ttl, rd, section=section)
154 else:
155 args = list(args)
156 ttl = int(args.pop(0))
157 if isinstance(args[0], dns.rdata.Rdata):
158 if replace:
159 self.delete(name, args[0].rdtype)
160 for rd in args:
161 self._add_rr(name, ttl, rd, section=section)
162 else:
163 rdtype = dns.rdatatype.RdataType.make(args.pop(0))
164 if replace:
165 self.delete(name, rdtype)
166 for s in args:
167 rd = dns.rdata.from_text(self.zone_rdclass, rdtype, s, self.origin)
168 self._add_rr(name, ttl, rd, section=section)
170 def add(self, name: Union[dns.name.Name, str], *args: Any) -> None:
171 """Add records.
173 The first argument is always a name. The other
174 arguments can be:
176 - rdataset...
178 - ttl, rdata...
180 - ttl, rdtype, string...
181 """
183 self._add(False, self.update, name, *args)
185 def delete(self, name: Union[dns.name.Name, str], *args: Any) -> None:
186 """Delete records.
188 The first argument is always a name. The other
189 arguments can be:
191 - *empty*
193 - rdataset...
195 - rdata...
197 - rdtype, [string...]
198 """
200 if isinstance(name, str):
201 name = dns.name.from_text(name, None)
202 if len(args) == 0:
203 self.find_rrset(
204 self.update,
205 name,
206 dns.rdataclass.ANY,
207 dns.rdatatype.ANY,
208 dns.rdatatype.NONE,
209 dns.rdataclass.ANY,
210 True,
211 True,
212 )
213 elif isinstance(args[0], dns.rdataset.Rdataset):
214 for rds in args:
215 for rd in rds:
216 self._add_rr(name, 0, rd, dns.rdataclass.NONE)
217 else:
218 largs = list(args)
219 if isinstance(largs[0], dns.rdata.Rdata):
220 for rd in largs:
221 self._add_rr(name, 0, rd, dns.rdataclass.NONE)
222 else:
223 rdtype = dns.rdatatype.RdataType.make(largs.pop(0))
224 if len(largs) == 0:
225 self.find_rrset(
226 self.update,
227 name,
228 self.zone_rdclass,
229 rdtype,
230 dns.rdatatype.NONE,
231 dns.rdataclass.ANY,
232 True,
233 True,
234 )
235 else:
236 for s in largs:
237 rd = dns.rdata.from_text(
238 self.zone_rdclass,
239 rdtype,
240 s, # type: ignore[arg-type]
241 self.origin,
242 )
243 self._add_rr(name, 0, rd, dns.rdataclass.NONE)
245 def replace(self, name: Union[dns.name.Name, str], *args: Any) -> None:
246 """Replace records.
248 The first argument is always a name. The other
249 arguments can be:
251 - rdataset...
253 - ttl, rdata...
255 - ttl, rdtype, string...
257 Note that if you want to replace the entire node, you should do
258 a delete of the name followed by one or more calls to add.
259 """
261 self._add(True, self.update, name, *args)
263 def present(self, name: Union[dns.name.Name, str], *args: Any) -> None:
264 """Require that an owner name (and optionally an rdata type,
265 or specific rdataset) exists as a prerequisite to the
266 execution of the update.
268 The first argument is always a name.
269 The other arguments can be:
271 - rdataset...
273 - rdata...
275 - rdtype, string...
276 """
278 if isinstance(name, str):
279 name = dns.name.from_text(name, None)
280 if len(args) == 0:
281 self.find_rrset(
282 self.prerequisite,
283 name,
284 dns.rdataclass.ANY,
285 dns.rdatatype.ANY,
286 dns.rdatatype.NONE,
287 None,
288 True,
289 True,
290 )
291 elif (
292 isinstance(args[0], dns.rdataset.Rdataset)
293 or isinstance(args[0], dns.rdata.Rdata)
294 or len(args) > 1
295 ):
296 if not isinstance(args[0], dns.rdataset.Rdataset):
297 # Add a 0 TTL
298 largs = list(args)
299 largs.insert(0, 0) # type: ignore[arg-type]
300 self._add(False, self.prerequisite, name, *largs)
301 else:
302 self._add(False, self.prerequisite, name, *args)
303 else:
304 rdtype = dns.rdatatype.RdataType.make(args[0])
305 self.find_rrset(
306 self.prerequisite,
307 name,
308 dns.rdataclass.ANY,
309 rdtype,
310 dns.rdatatype.NONE,
311 None,
312 True,
313 True,
314 )
316 def absent(
317 self,
318 name: Union[dns.name.Name, str],
319 rdtype: Optional[Union[dns.rdatatype.RdataType, str]] = None,
320 ) -> None:
321 """Require that an owner name (and optionally an rdata type) does
322 not exist as a prerequisite to the execution of the update."""
324 if isinstance(name, str):
325 name = dns.name.from_text(name, None)
326 if rdtype is None:
327 self.find_rrset(
328 self.prerequisite,
329 name,
330 dns.rdataclass.NONE,
331 dns.rdatatype.ANY,
332 dns.rdatatype.NONE,
333 None,
334 True,
335 True,
336 )
337 else:
338 rdtype = dns.rdatatype.RdataType.make(rdtype)
339 self.find_rrset(
340 self.prerequisite,
341 name,
342 dns.rdataclass.NONE,
343 rdtype,
344 dns.rdatatype.NONE,
345 None,
346 True,
347 True,
348 )
350 def _get_one_rr_per_rrset(self, value):
351 # Updates are always one_rr_per_rrset
352 return True
354 def _parse_rr_header(self, section, name, rdclass, rdtype):
355 deleting = None
356 empty = False
357 if section == UpdateSection.ZONE:
358 if (
359 dns.rdataclass.is_metaclass(rdclass)
360 or rdtype != dns.rdatatype.SOA
361 or self.zone
362 ):
363 raise dns.exception.FormError
364 else:
365 if not self.zone:
366 raise dns.exception.FormError
367 if rdclass in (dns.rdataclass.ANY, dns.rdataclass.NONE):
368 deleting = rdclass
369 rdclass = self.zone[0].rdclass
370 empty = (
371 deleting == dns.rdataclass.ANY or section == UpdateSection.PREREQ
372 )
373 return (rdclass, rdtype, deleting, empty)
376# backwards compatibility
377Update = UpdateMessage
379### BEGIN generated UpdateSection constants
381ZONE = UpdateSection.ZONE
382PREREQ = UpdateSection.PREREQ
383UPDATE = UpdateSection.UPDATE
384ADDITIONAL = UpdateSection.ADDITIONAL
386### END generated UpdateSection constants