Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/renderer.py: 22%

152 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-02 06:07 +0000

1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 

2 

3# Copyright (C) 2001-2017 Nominum, Inc. 

4# 

5# Permission to use, copy, modify, and distribute this software and its 

6# documentation for any purpose with or without fee is hereby granted, 

7# provided that the above copyright notice and this permission notice 

8# appear in all copies. 

9# 

10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 

11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 

12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 

13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 

14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 

15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 

16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 

17 

18"""Help for building DNS wire format messages""" 

19 

20import contextlib 

21import io 

22import random 

23import struct 

24import time 

25 

26import dns.exception 

27import dns.tsig 

28 

29QUESTION = 0 

30ANSWER = 1 

31AUTHORITY = 2 

32ADDITIONAL = 3 

33 

34 

35@contextlib.contextmanager 

36def prefixed_length(output, length_length): 

37 output.write(b"\00" * length_length) 

38 start = output.tell() 

39 yield 

40 end = output.tell() 

41 length = end - start 

42 if length > 0: 

43 try: 

44 output.seek(start - length_length) 

45 try: 

46 output.write(length.to_bytes(length_length, "big")) 

47 except OverflowError: 

48 raise dns.exception.FormError 

49 finally: 

50 output.seek(end) 

51 

52 

53class Renderer: 

54 """Helper class for building DNS wire-format messages. 

55 

56 Most applications can use the higher-level L{dns.message.Message} 

57 class and its to_wire() method to generate wire-format messages. 

58 This class is for those applications which need finer control 

59 over the generation of messages. 

60 

61 Typical use:: 

62 

63 r = dns.renderer.Renderer(id=1, flags=0x80, max_size=512) 

64 r.add_question(qname, qtype, qclass) 

65 r.add_rrset(dns.renderer.ANSWER, rrset_1) 

66 r.add_rrset(dns.renderer.ANSWER, rrset_2) 

67 r.add_rrset(dns.renderer.AUTHORITY, ns_rrset) 

68 r.add_rrset(dns.renderer.ADDITIONAL, ad_rrset_1) 

69 r.add_rrset(dns.renderer.ADDITIONAL, ad_rrset_2) 

70 r.add_edns(0, 0, 4096) 

71 r.write_header() 

72 r.add_tsig(keyname, secret, 300, 1, 0, '', request_mac) 

73 wire = r.get_wire() 

74 

75 If padding is going to be used, then the OPT record MUST be 

76 written after everything else in the additional section except for 

77 the TSIG (if any). 

78 

79 output, an io.BytesIO, where rendering is written 

80 

81 id: the message id 

82 

83 flags: the message flags 

84 

85 max_size: the maximum size of the message 

86 

87 origin: the origin to use when rendering relative names 

88 

89 compress: the compression table 

90 

91 section: an int, the section currently being rendered 

92 

93 counts: list of the number of RRs in each section 

94 

95 mac: the MAC of the rendered message (if TSIG was used) 

96 """ 

97 

98 def __init__(self, id=None, flags=0, max_size=65535, origin=None): 

99 """Initialize a new renderer.""" 

100 

101 self.output = io.BytesIO() 

102 if id is None: 

103 self.id = random.randint(0, 65535) 

104 else: 

105 self.id = id 

106 self.flags = flags 

107 self.max_size = max_size 

108 self.origin = origin 

109 self.compress = {} 

110 self.section = QUESTION 

111 self.counts = [0, 0, 0, 0] 

112 self.output.write(b"\x00" * 12) 

113 self.mac = "" 

114 self.reserved = 0 

115 self.was_padded = False 

116 

117 def _rollback(self, where): 

118 """Truncate the output buffer at offset *where*, and remove any 

119 compression table entries that pointed beyond the truncation 

120 point. 

121 """ 

122 

123 self.output.seek(where) 

124 self.output.truncate() 

125 keys_to_delete = [] 

126 for k, v in self.compress.items(): 

127 if v >= where: 

128 keys_to_delete.append(k) 

129 for k in keys_to_delete: 

130 del self.compress[k] 

131 

132 def _set_section(self, section): 

133 """Set the renderer's current section. 

134 

135 Sections must be rendered order: QUESTION, ANSWER, AUTHORITY, 

136 ADDITIONAL. Sections may be empty. 

137 

138 Raises dns.exception.FormError if an attempt was made to set 

139 a section value less than the current section. 

140 """ 

141 

142 if self.section != section: 

143 if self.section > section: 

144 raise dns.exception.FormError 

145 self.section = section 

146 

147 @contextlib.contextmanager 

148 def _track_size(self): 

149 start = self.output.tell() 

150 yield start 

151 if self.output.tell() > self.max_size: 

152 self._rollback(start) 

153 raise dns.exception.TooBig 

154 

155 @contextlib.contextmanager 

156 def _temporarily_seek_to(self, where): 

157 current = self.output.tell() 

158 try: 

159 self.output.seek(where) 

160 yield 

161 finally: 

162 self.output.seek(current) 

163 

164 def add_question(self, qname, rdtype, rdclass=dns.rdataclass.IN): 

165 """Add a question to the message.""" 

166 

167 self._set_section(QUESTION) 

168 with self._track_size(): 

169 qname.to_wire(self.output, self.compress, self.origin) 

170 self.output.write(struct.pack("!HH", rdtype, rdclass)) 

171 self.counts[QUESTION] += 1 

172 

173 def add_rrset(self, section, rrset, **kw): 

174 """Add the rrset to the specified section. 

175 

176 Any keyword arguments are passed on to the rdataset's to_wire() 

177 routine. 

178 """ 

179 

180 self._set_section(section) 

181 with self._track_size(): 

182 n = rrset.to_wire(self.output, self.compress, self.origin, **kw) 

183 self.counts[section] += n 

184 

185 def add_rdataset(self, section, name, rdataset, **kw): 

186 """Add the rdataset to the specified section, using the specified 

187 name as the owner name. 

188 

189 Any keyword arguments are passed on to the rdataset's to_wire() 

190 routine. 

191 """ 

192 

193 self._set_section(section) 

194 with self._track_size(): 

195 n = rdataset.to_wire(name, self.output, self.compress, self.origin, **kw) 

196 self.counts[section] += n 

197 

198 def add_opt(self, opt, pad=0, opt_size=0, tsig_size=0): 

199 """Add *opt* to the additional section, applying padding if desired. The 

200 padding will take the specified precomputed OPT size and TSIG size into 

201 account. 

202 

203 Note that we don't have reliable way of knowing how big a GSS-TSIG digest 

204 might be, so we we might not get an even multiple of the pad in that case.""" 

205 if pad: 

206 ttl = opt.ttl 

207 assert opt_size >= 11 

208 opt_rdata = opt[0] 

209 size_without_padding = self.output.tell() + opt_size + tsig_size 

210 remainder = size_without_padding % pad 

211 if remainder: 

212 pad = b"\x00" * (pad - remainder) 

213 else: 

214 pad = b"" 

215 options = list(opt_rdata.options) 

216 options.append(dns.edns.GenericOption(dns.edns.OptionType.PADDING, pad)) 

217 opt = dns.message.Message._make_opt(ttl, opt_rdata.rdclass, options) 

218 self.was_padded = True 

219 self.add_rrset(ADDITIONAL, opt) 

220 

221 def add_edns(self, edns, ednsflags, payload, options=None): 

222 """Add an EDNS OPT record to the message.""" 

223 

224 # make sure the EDNS version in ednsflags agrees with edns 

225 ednsflags &= 0xFF00FFFF 

226 ednsflags |= edns << 16 

227 opt = dns.message.Message._make_opt(ednsflags, payload, options) 

228 self.add_opt(opt) 

229 

230 def add_tsig( 

231 self, 

232 keyname, 

233 secret, 

234 fudge, 

235 id, 

236 tsig_error, 

237 other_data, 

238 request_mac, 

239 algorithm=dns.tsig.default_algorithm, 

240 ): 

241 """Add a TSIG signature to the message.""" 

242 

243 s = self.output.getvalue() 

244 

245 if isinstance(secret, dns.tsig.Key): 

246 key = secret 

247 else: 

248 key = dns.tsig.Key(keyname, secret, algorithm) 

249 tsig = dns.message.Message._make_tsig( 

250 keyname, algorithm, 0, fudge, b"", id, tsig_error, other_data 

251 ) 

252 (tsig, _) = dns.tsig.sign(s, key, tsig[0], int(time.time()), request_mac) 

253 self._write_tsig(tsig, keyname) 

254 

255 def add_multi_tsig( 

256 self, 

257 ctx, 

258 keyname, 

259 secret, 

260 fudge, 

261 id, 

262 tsig_error, 

263 other_data, 

264 request_mac, 

265 algorithm=dns.tsig.default_algorithm, 

266 ): 

267 """Add a TSIG signature to the message. Unlike add_tsig(), this can be 

268 used for a series of consecutive DNS envelopes, e.g. for a zone 

269 transfer over TCP [RFC2845, 4.4]. 

270 

271 For the first message in the sequence, give ctx=None. For each 

272 subsequent message, give the ctx that was returned from the 

273 add_multi_tsig() call for the previous message.""" 

274 

275 s = self.output.getvalue() 

276 

277 if isinstance(secret, dns.tsig.Key): 

278 key = secret 

279 else: 

280 key = dns.tsig.Key(keyname, secret, algorithm) 

281 tsig = dns.message.Message._make_tsig( 

282 keyname, algorithm, 0, fudge, b"", id, tsig_error, other_data 

283 ) 

284 (tsig, ctx) = dns.tsig.sign( 

285 s, key, tsig[0], int(time.time()), request_mac, ctx, True 

286 ) 

287 self._write_tsig(tsig, keyname) 

288 return ctx 

289 

290 def _write_tsig(self, tsig, keyname): 

291 if self.was_padded: 

292 compress = None 

293 else: 

294 compress = self.compress 

295 self._set_section(ADDITIONAL) 

296 with self._track_size(): 

297 keyname.to_wire(self.output, compress, self.origin) 

298 self.output.write( 

299 struct.pack("!HHI", dns.rdatatype.TSIG, dns.rdataclass.ANY, 0) 

300 ) 

301 with prefixed_length(self.output, 2): 

302 tsig.to_wire(self.output) 

303 

304 self.counts[ADDITIONAL] += 1 

305 with self._temporarily_seek_to(10): 

306 self.output.write(struct.pack("!H", self.counts[ADDITIONAL])) 

307 

308 def write_header(self): 

309 """Write the DNS message header. 

310 

311 Writing the DNS message header is done after all sections 

312 have been rendered, but before the optional TSIG signature 

313 is added. 

314 """ 

315 

316 with self._temporarily_seek_to(0): 

317 self.output.write( 

318 struct.pack( 

319 "!HHHHHH", 

320 self.id, 

321 self.flags, 

322 self.counts[0], 

323 self.counts[1], 

324 self.counts[2], 

325 self.counts[3], 

326 ) 

327 ) 

328 

329 def get_wire(self): 

330 """Return the wire format message.""" 

331 

332 return self.output.getvalue() 

333 

334 def reserve(self, size: int) -> None: 

335 """Reserve *size* bytes.""" 

336 if size < 0: 

337 raise ValueError("reserved amount must be non-negative") 

338 if size > self.max_size: 

339 raise ValueError("cannot reserve more than the maximum size") 

340 self.reserved += size 

341 self.max_size -= size 

342 

343 def release_reserved(self) -> None: 

344 """Release the reserved bytes.""" 

345 self.max_size += self.reserved 

346 self.reserved = 0