Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/renderer.py: 22%
152 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
3# Copyright (C) 2001-2017 Nominum, Inc.
4#
5# Permission to use, copy, modify, and distribute this software and its
6# documentation for any purpose with or without fee is hereby granted,
7# provided that the above copyright notice and this permission notice
8# appear in all copies.
9#
10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18"""Help for building DNS wire format messages"""
20import contextlib
21import io
22import random
23import struct
24import time
26import dns.exception
27import dns.tsig
29QUESTION = 0
30ANSWER = 1
31AUTHORITY = 2
32ADDITIONAL = 3
35@contextlib.contextmanager
36def prefixed_length(output, length_length):
37 output.write(b"\00" * length_length)
38 start = output.tell()
39 yield
40 end = output.tell()
41 length = end - start
42 if length > 0:
43 try:
44 output.seek(start - length_length)
45 try:
46 output.write(length.to_bytes(length_length, "big"))
47 except OverflowError:
48 raise dns.exception.FormError
49 finally:
50 output.seek(end)
53class Renderer:
54 """Helper class for building DNS wire-format messages.
56 Most applications can use the higher-level L{dns.message.Message}
57 class and its to_wire() method to generate wire-format messages.
58 This class is for those applications which need finer control
59 over the generation of messages.
61 Typical use::
63 r = dns.renderer.Renderer(id=1, flags=0x80, max_size=512)
64 r.add_question(qname, qtype, qclass)
65 r.add_rrset(dns.renderer.ANSWER, rrset_1)
66 r.add_rrset(dns.renderer.ANSWER, rrset_2)
67 r.add_rrset(dns.renderer.AUTHORITY, ns_rrset)
68 r.add_rrset(dns.renderer.ADDITIONAL, ad_rrset_1)
69 r.add_rrset(dns.renderer.ADDITIONAL, ad_rrset_2)
70 r.add_edns(0, 0, 4096)
71 r.write_header()
72 r.add_tsig(keyname, secret, 300, 1, 0, '', request_mac)
73 wire = r.get_wire()
75 If padding is going to be used, then the OPT record MUST be
76 written after everything else in the additional section except for
77 the TSIG (if any).
79 output, an io.BytesIO, where rendering is written
81 id: the message id
83 flags: the message flags
85 max_size: the maximum size of the message
87 origin: the origin to use when rendering relative names
89 compress: the compression table
91 section: an int, the section currently being rendered
93 counts: list of the number of RRs in each section
95 mac: the MAC of the rendered message (if TSIG was used)
96 """
98 def __init__(self, id=None, flags=0, max_size=65535, origin=None):
99 """Initialize a new renderer."""
101 self.output = io.BytesIO()
102 if id is None:
103 self.id = random.randint(0, 65535)
104 else:
105 self.id = id
106 self.flags = flags
107 self.max_size = max_size
108 self.origin = origin
109 self.compress = {}
110 self.section = QUESTION
111 self.counts = [0, 0, 0, 0]
112 self.output.write(b"\x00" * 12)
113 self.mac = ""
114 self.reserved = 0
115 self.was_padded = False
117 def _rollback(self, where):
118 """Truncate the output buffer at offset *where*, and remove any
119 compression table entries that pointed beyond the truncation
120 point.
121 """
123 self.output.seek(where)
124 self.output.truncate()
125 keys_to_delete = []
126 for k, v in self.compress.items():
127 if v >= where:
128 keys_to_delete.append(k)
129 for k in keys_to_delete:
130 del self.compress[k]
132 def _set_section(self, section):
133 """Set the renderer's current section.
135 Sections must be rendered order: QUESTION, ANSWER, AUTHORITY,
136 ADDITIONAL. Sections may be empty.
138 Raises dns.exception.FormError if an attempt was made to set
139 a section value less than the current section.
140 """
142 if self.section != section:
143 if self.section > section:
144 raise dns.exception.FormError
145 self.section = section
147 @contextlib.contextmanager
148 def _track_size(self):
149 start = self.output.tell()
150 yield start
151 if self.output.tell() > self.max_size:
152 self._rollback(start)
153 raise dns.exception.TooBig
155 @contextlib.contextmanager
156 def _temporarily_seek_to(self, where):
157 current = self.output.tell()
158 try:
159 self.output.seek(where)
160 yield
161 finally:
162 self.output.seek(current)
164 def add_question(self, qname, rdtype, rdclass=dns.rdataclass.IN):
165 """Add a question to the message."""
167 self._set_section(QUESTION)
168 with self._track_size():
169 qname.to_wire(self.output, self.compress, self.origin)
170 self.output.write(struct.pack("!HH", rdtype, rdclass))
171 self.counts[QUESTION] += 1
173 def add_rrset(self, section, rrset, **kw):
174 """Add the rrset to the specified section.
176 Any keyword arguments are passed on to the rdataset's to_wire()
177 routine.
178 """
180 self._set_section(section)
181 with self._track_size():
182 n = rrset.to_wire(self.output, self.compress, self.origin, **kw)
183 self.counts[section] += n
185 def add_rdataset(self, section, name, rdataset, **kw):
186 """Add the rdataset to the specified section, using the specified
187 name as the owner name.
189 Any keyword arguments are passed on to the rdataset's to_wire()
190 routine.
191 """
193 self._set_section(section)
194 with self._track_size():
195 n = rdataset.to_wire(name, self.output, self.compress, self.origin, **kw)
196 self.counts[section] += n
198 def add_opt(self, opt, pad=0, opt_size=0, tsig_size=0):
199 """Add *opt* to the additional section, applying padding if desired. The
200 padding will take the specified precomputed OPT size and TSIG size into
201 account.
203 Note that we don't have reliable way of knowing how big a GSS-TSIG digest
204 might be, so we we might not get an even multiple of the pad in that case."""
205 if pad:
206 ttl = opt.ttl
207 assert opt_size >= 11
208 opt_rdata = opt[0]
209 size_without_padding = self.output.tell() + opt_size + tsig_size
210 remainder = size_without_padding % pad
211 if remainder:
212 pad = b"\x00" * (pad - remainder)
213 else:
214 pad = b""
215 options = list(opt_rdata.options)
216 options.append(dns.edns.GenericOption(dns.edns.OptionType.PADDING, pad))
217 opt = dns.message.Message._make_opt(ttl, opt_rdata.rdclass, options)
218 self.was_padded = True
219 self.add_rrset(ADDITIONAL, opt)
221 def add_edns(self, edns, ednsflags, payload, options=None):
222 """Add an EDNS OPT record to the message."""
224 # make sure the EDNS version in ednsflags agrees with edns
225 ednsflags &= 0xFF00FFFF
226 ednsflags |= edns << 16
227 opt = dns.message.Message._make_opt(ednsflags, payload, options)
228 self.add_opt(opt)
230 def add_tsig(
231 self,
232 keyname,
233 secret,
234 fudge,
235 id,
236 tsig_error,
237 other_data,
238 request_mac,
239 algorithm=dns.tsig.default_algorithm,
240 ):
241 """Add a TSIG signature to the message."""
243 s = self.output.getvalue()
245 if isinstance(secret, dns.tsig.Key):
246 key = secret
247 else:
248 key = dns.tsig.Key(keyname, secret, algorithm)
249 tsig = dns.message.Message._make_tsig(
250 keyname, algorithm, 0, fudge, b"", id, tsig_error, other_data
251 )
252 (tsig, _) = dns.tsig.sign(s, key, tsig[0], int(time.time()), request_mac)
253 self._write_tsig(tsig, keyname)
255 def add_multi_tsig(
256 self,
257 ctx,
258 keyname,
259 secret,
260 fudge,
261 id,
262 tsig_error,
263 other_data,
264 request_mac,
265 algorithm=dns.tsig.default_algorithm,
266 ):
267 """Add a TSIG signature to the message. Unlike add_tsig(), this can be
268 used for a series of consecutive DNS envelopes, e.g. for a zone
269 transfer over TCP [RFC2845, 4.4].
271 For the first message in the sequence, give ctx=None. For each
272 subsequent message, give the ctx that was returned from the
273 add_multi_tsig() call for the previous message."""
275 s = self.output.getvalue()
277 if isinstance(secret, dns.tsig.Key):
278 key = secret
279 else:
280 key = dns.tsig.Key(keyname, secret, algorithm)
281 tsig = dns.message.Message._make_tsig(
282 keyname, algorithm, 0, fudge, b"", id, tsig_error, other_data
283 )
284 (tsig, ctx) = dns.tsig.sign(
285 s, key, tsig[0], int(time.time()), request_mac, ctx, True
286 )
287 self._write_tsig(tsig, keyname)
288 return ctx
290 def _write_tsig(self, tsig, keyname):
291 if self.was_padded:
292 compress = None
293 else:
294 compress = self.compress
295 self._set_section(ADDITIONAL)
296 with self._track_size():
297 keyname.to_wire(self.output, compress, self.origin)
298 self.output.write(
299 struct.pack("!HHI", dns.rdatatype.TSIG, dns.rdataclass.ANY, 0)
300 )
301 with prefixed_length(self.output, 2):
302 tsig.to_wire(self.output)
304 self.counts[ADDITIONAL] += 1
305 with self._temporarily_seek_to(10):
306 self.output.write(struct.pack("!H", self.counts[ADDITIONAL]))
308 def write_header(self):
309 """Write the DNS message header.
311 Writing the DNS message header is done after all sections
312 have been rendered, but before the optional TSIG signature
313 is added.
314 """
316 with self._temporarily_seek_to(0):
317 self.output.write(
318 struct.pack(
319 "!HHHHHH",
320 self.id,
321 self.flags,
322 self.counts[0],
323 self.counts[1],
324 self.counts[2],
325 self.counts[3],
326 )
327 )
329 def get_wire(self):
330 """Return the wire format message."""
332 return self.output.getvalue()
334 def reserve(self, size: int) -> None:
335 """Reserve *size* bytes."""
336 if size < 0:
337 raise ValueError("reserved amount must be non-negative")
338 if size > self.max_size:
339 raise ValueError("cannot reserve more than the maximum size")
340 self.reserved += size
341 self.max_size -= size
343 def release_reserved(self) -> None:
344 """Release the reserved bytes."""
345 self.max_size += self.reserved
346 self.reserved = 0