Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/message.py: 47%

805 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-02 06:07 +0000

1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 

2 

3# Copyright (C) 2001-2017 Nominum, Inc. 

4# 

5# Permission to use, copy, modify, and distribute this software and its 

6# documentation for any purpose with or without fee is hereby granted, 

7# provided that the above copyright notice and this permission notice 

8# appear in all copies. 

9# 

10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 

11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 

12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 

13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 

14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 

15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 

16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 

17 

18"""DNS Messages""" 

19 

20import contextlib 

21import io 

22import time 

23from typing import Any, Dict, List, Optional, Tuple, Union 

24 

25import dns.edns 

26import dns.entropy 

27import dns.enum 

28import dns.exception 

29import dns.flags 

30import dns.name 

31import dns.opcode 

32import dns.rcode 

33import dns.rdata 

34import dns.rdataclass 

35import dns.rdatatype 

36import dns.rdtypes.ANY.OPT 

37import dns.rdtypes.ANY.TSIG 

38import dns.renderer 

39import dns.rrset 

40import dns.tsig 

41import dns.ttl 

42import dns.wire 

43 

44 

45class ShortHeader(dns.exception.FormError): 

46 """The DNS packet passed to from_wire() is too short.""" 

47 

48 

49class TrailingJunk(dns.exception.FormError): 

50 """The DNS packet passed to from_wire() has extra junk at the end of it.""" 

51 

52 

53class UnknownHeaderField(dns.exception.DNSException): 

54 """The header field name was not recognized when converting from text 

55 into a message.""" 

56 

57 

58class BadEDNS(dns.exception.FormError): 

59 """An OPT record occurred somewhere other than 

60 the additional data section.""" 

61 

62 

63class BadTSIG(dns.exception.FormError): 

64 """A TSIG record occurred somewhere other than the end of 

65 the additional data section.""" 

66 

67 

68class UnknownTSIGKey(dns.exception.DNSException): 

69 """A TSIG with an unknown key was received.""" 

70 

71 

72class Truncated(dns.exception.DNSException): 

73 """The truncated flag is set.""" 

74 

75 supp_kwargs = {"message"} 

76 

77 # We do this as otherwise mypy complains about unexpected keyword argument 

78 # idna_exception 

79 def __init__(self, *args, **kwargs): 

80 super().__init__(*args, **kwargs) 

81 

82 def message(self): 

83 """As much of the message as could be processed. 

84 

85 Returns a ``dns.message.Message``. 

86 """ 

87 return self.kwargs["message"] 

88 

89 

90class NotQueryResponse(dns.exception.DNSException): 

91 """Message is not a response to a query.""" 

92 

93 

94class ChainTooLong(dns.exception.DNSException): 

95 """The CNAME chain is too long.""" 

96 

97 

98class AnswerForNXDOMAIN(dns.exception.DNSException): 

99 """The rcode is NXDOMAIN but an answer was found.""" 

100 

101 

102class NoPreviousName(dns.exception.SyntaxError): 

103 """No previous name was known.""" 

104 

105 

106class MessageSection(dns.enum.IntEnum): 

107 """Message sections""" 

108 

109 QUESTION = 0 

110 ANSWER = 1 

111 AUTHORITY = 2 

112 ADDITIONAL = 3 

113 

114 @classmethod 

115 def _maximum(cls): 

116 return 3 

117 

118 

119class MessageError: 

120 def __init__(self, exception: Exception, offset: int): 

121 self.exception = exception 

122 self.offset = offset 

123 

124 

125DEFAULT_EDNS_PAYLOAD = 1232 

126MAX_CHAIN = 16 

127 

128IndexKeyType = Tuple[ 

129 int, 

130 dns.name.Name, 

131 dns.rdataclass.RdataClass, 

132 dns.rdatatype.RdataType, 

133 Optional[dns.rdatatype.RdataType], 

134 Optional[dns.rdataclass.RdataClass], 

135] 

136IndexType = Dict[IndexKeyType, dns.rrset.RRset] 

137SectionType = Union[int, str, List[dns.rrset.RRset]] 

138 

139 

140class Message: 

141 """A DNS message.""" 

142 

143 _section_enum = MessageSection 

144 

145 def __init__(self, id: Optional[int] = None): 

146 if id is None: 

147 self.id = dns.entropy.random_16() 

148 else: 

149 self.id = id 

150 self.flags = 0 

151 self.sections: List[List[dns.rrset.RRset]] = [[], [], [], []] 

152 self.opt: Optional[dns.rrset.RRset] = None 

153 self.request_payload = 0 

154 self.pad = 0 

155 self.keyring: Any = None 

156 self.tsig: Optional[dns.rrset.RRset] = None 

157 self.request_mac = b"" 

158 self.xfr = False 

159 self.origin: Optional[dns.name.Name] = None 

160 self.tsig_ctx: Optional[Any] = None 

161 self.index: IndexType = {} 

162 self.errors: List[MessageError] = [] 

163 self.time = 0.0 

164 

165 @property 

166 def question(self) -> List[dns.rrset.RRset]: 

167 """The question section.""" 

168 return self.sections[0] 

169 

170 @question.setter 

171 def question(self, v): 

172 self.sections[0] = v 

173 

174 @property 

175 def answer(self) -> List[dns.rrset.RRset]: 

176 """The answer section.""" 

177 return self.sections[1] 

178 

179 @answer.setter 

180 def answer(self, v): 

181 self.sections[1] = v 

182 

183 @property 

184 def authority(self) -> List[dns.rrset.RRset]: 

185 """The authority section.""" 

186 return self.sections[2] 

187 

188 @authority.setter 

189 def authority(self, v): 

190 self.sections[2] = v 

191 

192 @property 

193 def additional(self) -> List[dns.rrset.RRset]: 

194 """The additional data section.""" 

195 return self.sections[3] 

196 

197 @additional.setter 

198 def additional(self, v): 

199 self.sections[3] = v 

200 

201 def __repr__(self): 

202 return "<DNS message, ID " + repr(self.id) + ">" 

203 

204 def __str__(self): 

205 return self.to_text() 

206 

207 def to_text( 

208 self, 

209 origin: Optional[dns.name.Name] = None, 

210 relativize: bool = True, 

211 **kw: Dict[str, Any], 

212 ) -> str: 

213 """Convert the message to text. 

214 

215 The *origin*, *relativize*, and any other keyword 

216 arguments are passed to the RRset ``to_wire()`` method. 

217 

218 Returns a ``str``. 

219 """ 

220 

221 s = io.StringIO() 

222 s.write("id %d\n" % self.id) 

223 s.write("opcode %s\n" % dns.opcode.to_text(self.opcode())) 

224 s.write("rcode %s\n" % dns.rcode.to_text(self.rcode())) 

225 s.write("flags %s\n" % dns.flags.to_text(self.flags)) 

226 if self.edns >= 0: 

227 s.write("edns %s\n" % self.edns) 

228 if self.ednsflags != 0: 

229 s.write("eflags %s\n" % dns.flags.edns_to_text(self.ednsflags)) 

230 s.write("payload %d\n" % self.payload) 

231 for opt in self.options: 

232 s.write("option %s\n" % opt.to_text()) 

233 for name, which in self._section_enum.__members__.items(): 

234 s.write(f";{name}\n") 

235 for rrset in self.section_from_number(which): 

236 s.write(rrset.to_text(origin, relativize, **kw)) 

237 s.write("\n") 

238 # 

239 # We strip off the final \n so the caller can print the result without 

240 # doing weird things to get around eccentricities in Python print 

241 # formatting 

242 # 

243 return s.getvalue()[:-1] 

244 

245 def __eq__(self, other): 

246 """Two messages are equal if they have the same content in the 

247 header, question, answer, and authority sections. 

248 

249 Returns a ``bool``. 

250 """ 

251 

252 if not isinstance(other, Message): 

253 return False 

254 if self.id != other.id: 

255 return False 

256 if self.flags != other.flags: 

257 return False 

258 for i, section in enumerate(self.sections): 

259 other_section = other.sections[i] 

260 for n in section: 

261 if n not in other_section: 

262 return False 

263 for n in other_section: 

264 if n not in section: 

265 return False 

266 return True 

267 

268 def __ne__(self, other): 

269 return not self.__eq__(other) 

270 

271 def is_response(self, other: "Message") -> bool: 

272 """Is *other*, also a ``dns.message.Message``, a response to this 

273 message? 

274 

275 Returns a ``bool``. 

276 """ 

277 

278 if ( 

279 other.flags & dns.flags.QR == 0 

280 or self.id != other.id 

281 or dns.opcode.from_flags(self.flags) != dns.opcode.from_flags(other.flags) 

282 ): 

283 return False 

284 if other.rcode() in { 

285 dns.rcode.FORMERR, 

286 dns.rcode.SERVFAIL, 

287 dns.rcode.NOTIMP, 

288 dns.rcode.REFUSED, 

289 }: 

290 # We don't check the question section in these cases if 

291 # the other question section is empty, even though they 

292 # still really ought to have a question section. 

293 if len(other.question) == 0: 

294 return True 

295 if dns.opcode.is_update(self.flags): 

296 # This is assuming the "sender doesn't include anything 

297 # from the update", but we don't care to check the other 

298 # case, which is that all the sections are returned and 

299 # identical. 

300 return True 

301 for n in self.question: 

302 if n not in other.question: 

303 return False 

304 for n in other.question: 

305 if n not in self.question: 

306 return False 

307 return True 

308 

309 def section_number(self, section: List[dns.rrset.RRset]) -> int: 

310 """Return the "section number" of the specified section for use 

311 in indexing. 

312 

313 *section* is one of the section attributes of this message. 

314 

315 Raises ``ValueError`` if the section isn't known. 

316 

317 Returns an ``int``. 

318 """ 

319 

320 for i, our_section in enumerate(self.sections): 

321 if section is our_section: 

322 return self._section_enum(i) 

323 raise ValueError("unknown section") 

324 

325 def section_from_number(self, number: int) -> List[dns.rrset.RRset]: 

326 """Return the section list associated with the specified section 

327 number. 

328 

329 *number* is a section number `int` or the text form of a section 

330 name. 

331 

332 Raises ``ValueError`` if the section isn't known. 

333 

334 Returns a ``list``. 

335 """ 

336 

337 section = self._section_enum.make(number) 

338 return self.sections[section] 

339 

340 def find_rrset( 

341 self, 

342 section: SectionType, 

343 name: dns.name.Name, 

344 rdclass: dns.rdataclass.RdataClass, 

345 rdtype: dns.rdatatype.RdataType, 

346 covers: dns.rdatatype.RdataType = dns.rdatatype.NONE, 

347 deleting: Optional[dns.rdataclass.RdataClass] = None, 

348 create: bool = False, 

349 force_unique: bool = False, 

350 idna_codec: Optional[dns.name.IDNACodec] = None, 

351 ) -> dns.rrset.RRset: 

352 """Find the RRset with the given attributes in the specified section. 

353 

354 *section*, an ``int`` section number, a ``str`` section name, or one of 

355 the section attributes of this message. This specifies the 

356 the section of the message to search. For example:: 

357 

358 my_message.find_rrset(my_message.answer, name, rdclass, rdtype) 

359 my_message.find_rrset(dns.message.ANSWER, name, rdclass, rdtype) 

360 my_message.find_rrset("ANSWER", name, rdclass, rdtype) 

361 

362 *name*, a ``dns.name.Name`` or ``str``, the name of the RRset. 

363 

364 *rdclass*, an ``int`` or ``str``, the class of the RRset. 

365 

366 *rdtype*, an ``int`` or ``str``, the type of the RRset. 

367 

368 *covers*, an ``int`` or ``str``, the covers value of the RRset. 

369 The default is ``dns.rdatatype.NONE``. 

370 

371 *deleting*, an ``int``, ``str``, or ``None``, the deleting value of the 

372 RRset. The default is ``None``. 

373 

374 *create*, a ``bool``. If ``True``, create the RRset if it is not found. 

375 The created RRset is appended to *section*. 

376 

377 *force_unique*, a ``bool``. If ``True`` and *create* is also ``True``, 

378 create a new RRset regardless of whether a matching RRset exists 

379 already. The default is ``False``. This is useful when creating 

380 DDNS Update messages, as order matters for them. 

381 

382 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA 

383 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder 

384 is used. 

385 

386 Raises ``KeyError`` if the RRset was not found and create was 

387 ``False``. 

388 

389 Returns a ``dns.rrset.RRset object``. 

390 """ 

391 

392 if isinstance(section, int): 

393 section_number = section 

394 section = self.section_from_number(section_number) 

395 elif isinstance(section, str): 

396 section_number = self._section_enum.from_text(section) 

397 section = self.section_from_number(section_number) 

398 else: 

399 section_number = self.section_number(section) 

400 if isinstance(name, str): 

401 name = dns.name.from_text(name, idna_codec=idna_codec) 

402 rdtype = dns.rdatatype.RdataType.make(rdtype) 

403 rdclass = dns.rdataclass.RdataClass.make(rdclass) 

404 covers = dns.rdatatype.RdataType.make(covers) 

405 if deleting is not None: 

406 deleting = dns.rdataclass.RdataClass.make(deleting) 

407 key = (section_number, name, rdclass, rdtype, covers, deleting) 

408 if not force_unique: 

409 if self.index is not None: 

410 rrset = self.index.get(key) 

411 if rrset is not None: 

412 return rrset 

413 else: 

414 for rrset in section: 

415 if rrset.full_match(name, rdclass, rdtype, covers, deleting): 

416 return rrset 

417 if not create: 

418 raise KeyError 

419 rrset = dns.rrset.RRset(name, rdclass, rdtype, covers, deleting) 

420 section.append(rrset) 

421 if self.index is not None: 

422 self.index[key] = rrset 

423 return rrset 

424 

425 def get_rrset( 

426 self, 

427 section: SectionType, 

428 name: dns.name.Name, 

429 rdclass: dns.rdataclass.RdataClass, 

430 rdtype: dns.rdatatype.RdataType, 

431 covers: dns.rdatatype.RdataType = dns.rdatatype.NONE, 

432 deleting: Optional[dns.rdataclass.RdataClass] = None, 

433 create: bool = False, 

434 force_unique: bool = False, 

435 idna_codec: Optional[dns.name.IDNACodec] = None, 

436 ) -> Optional[dns.rrset.RRset]: 

437 """Get the RRset with the given attributes in the specified section. 

438 

439 If the RRset is not found, None is returned. 

440 

441 *section*, an ``int`` section number, a ``str`` section name, or one of 

442 the section attributes of this message. This specifies the 

443 the section of the message to search. For example:: 

444 

445 my_message.get_rrset(my_message.answer, name, rdclass, rdtype) 

446 my_message.get_rrset(dns.message.ANSWER, name, rdclass, rdtype) 

447 my_message.get_rrset("ANSWER", name, rdclass, rdtype) 

448 

449 *name*, a ``dns.name.Name`` or ``str``, the name of the RRset. 

450 

451 *rdclass*, an ``int`` or ``str``, the class of the RRset. 

452 

453 *rdtype*, an ``int`` or ``str``, the type of the RRset. 

454 

455 *covers*, an ``int`` or ``str``, the covers value of the RRset. 

456 The default is ``dns.rdatatype.NONE``. 

457 

458 *deleting*, an ``int``, ``str``, or ``None``, the deleting value of the 

459 RRset. The default is ``None``. 

460 

461 *create*, a ``bool``. If ``True``, create the RRset if it is not found. 

462 The created RRset is appended to *section*. 

463 

464 *force_unique*, a ``bool``. If ``True`` and *create* is also ``True``, 

465 create a new RRset regardless of whether a matching RRset exists 

466 already. The default is ``False``. This is useful when creating 

467 DDNS Update messages, as order matters for them. 

468 

469 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA 

470 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder 

471 is used. 

472 

473 Returns a ``dns.rrset.RRset object`` or ``None``. 

474 """ 

475 

476 try: 

477 rrset = self.find_rrset( 

478 section, 

479 name, 

480 rdclass, 

481 rdtype, 

482 covers, 

483 deleting, 

484 create, 

485 force_unique, 

486 idna_codec, 

487 ) 

488 except KeyError: 

489 rrset = None 

490 return rrset 

491 

492 def section_count(self, section: SectionType) -> int: 

493 """Returns the number of records in the specified section. 

494 

495 *section*, an ``int`` section number, a ``str`` section name, or one of 

496 the section attributes of this message. This specifies the 

497 the section of the message to count. For example:: 

498 

499 my_message.section_count(my_message.answer) 

500 my_message.section_count(dns.message.ANSWER) 

501 my_message.section_count("ANSWER") 

502 """ 

503 

504 if isinstance(section, int): 

505 section_number = section 

506 section = self.section_from_number(section_number) 

507 elif isinstance(section, str): 

508 section_number = self._section_enum.from_text(section) 

509 section = self.section_from_number(section_number) 

510 else: 

511 section_number = self.section_number(section) 

512 count = sum(max(1, len(rrs)) for rrs in section) 

513 if section_number == MessageSection.ADDITIONAL: 

514 if self.opt is not None: 

515 count += 1 

516 if self.tsig is not None: 

517 count += 1 

518 return count 

519 

520 def _compute_opt_reserve(self) -> int: 

521 """Compute the size required for the OPT RR, padding excluded""" 

522 if not self.opt: 

523 return 0 

524 # 1 byte for the root name, 10 for the standard RR fields 

525 size = 11 

526 # This would be more efficient if options had a size() method, but we won't 

527 # worry about that for now. We also don't worry if there is an existing padding 

528 # option, as it is unlikely and probably harmless, as the worst case is that we 

529 # may add another, and this seems to be legal. 

530 for option in self.opt[0].options: 

531 wire = option.to_wire() 

532 # We add 4 here to account for the option type and length 

533 size += len(wire) + 4 

534 if self.pad: 

535 # Padding will be added, so again add the option type and length. 

536 size += 4 

537 return size 

538 

539 def _compute_tsig_reserve(self) -> int: 

540 """Compute the size required for the TSIG RR""" 

541 # This would be more efficient if TSIGs had a size method, but we won't 

542 # worry about for now. Also, we can't really cope with the potential 

543 # compressibility of the TSIG owner name, so we estimate with the uncompressed 

544 # size. We will disable compression when TSIG and padding are both is active 

545 # so that the padding comes out right. 

546 if not self.tsig: 

547 return 0 

548 f = io.BytesIO() 

549 self.tsig.to_wire(f) 

550 return len(f.getvalue()) 

551 

552 def to_wire( 

553 self, 

554 origin: Optional[dns.name.Name] = None, 

555 max_size: int = 0, 

556 multi: bool = False, 

557 tsig_ctx: Optional[Any] = None, 

558 prepend_length: bool = False, 

559 prefer_truncation: bool = False, 

560 **kw: Dict[str, Any], 

561 ) -> bytes: 

562 """Return a string containing the message in DNS compressed wire 

563 format. 

564 

565 Additional keyword arguments are passed to the RRset ``to_wire()`` 

566 method. 

567 

568 *origin*, a ``dns.name.Name`` or ``None``, the origin to be appended 

569 to any relative names. If ``None``, and the message has an origin 

570 attribute that is not ``None``, then it will be used. 

571 

572 *max_size*, an ``int``, the maximum size of the wire format 

573 output; default is 0, which means "the message's request 

574 payload, if nonzero, or 65535". 

575 

576 *multi*, a ``bool``, should be set to ``True`` if this message is 

577 part of a multiple message sequence. 

578 

579 *tsig_ctx*, a ``dns.tsig.HMACTSig`` or ``dns.tsig.GSSTSig`` object, the 

580 ongoing TSIG context, used when signing zone transfers. 

581 

582 *prepend_length*, a ``bool``, should be set to ``True`` if the caller 

583 wants the message length prepended to the message itself. This is 

584 useful for messages sent over TCP, TLS (DoT), or QUIC (DoQ). 

585 

586 *prefer_truncation*, a ``bool``, should be set to ``True`` if the caller 

587 wants the message to be truncated if it would otherwise exceed the 

588 maximum length. If the truncation occurs before the additional section, 

589 the TC bit will be set. 

590 

591 Raises ``dns.exception.TooBig`` if *max_size* was exceeded. 

592 

593 Returns a ``bytes``. 

594 """ 

595 

596 if origin is None and self.origin is not None: 

597 origin = self.origin 

598 if max_size == 0: 

599 if self.request_payload != 0: 

600 max_size = self.request_payload 

601 else: 

602 max_size = 65535 

603 if max_size < 512: 

604 max_size = 512 

605 elif max_size > 65535: 

606 max_size = 65535 

607 r = dns.renderer.Renderer(self.id, self.flags, max_size, origin) 

608 opt_reserve = self._compute_opt_reserve() 

609 r.reserve(opt_reserve) 

610 tsig_reserve = self._compute_tsig_reserve() 

611 r.reserve(tsig_reserve) 

612 try: 

613 for rrset in self.question: 

614 r.add_question(rrset.name, rrset.rdtype, rrset.rdclass) 

615 for rrset in self.answer: 

616 r.add_rrset(dns.renderer.ANSWER, rrset, **kw) 

617 for rrset in self.authority: 

618 r.add_rrset(dns.renderer.AUTHORITY, rrset, **kw) 

619 for rrset in self.additional: 

620 r.add_rrset(dns.renderer.ADDITIONAL, rrset, **kw) 

621 except dns.exception.TooBig: 

622 if prefer_truncation: 

623 if r.section < dns.renderer.ADDITIONAL: 

624 r.flags |= dns.flags.TC 

625 else: 

626 raise 

627 r.release_reserved() 

628 if self.opt is not None: 

629 r.add_opt(self.opt, self.pad, opt_reserve, tsig_reserve) 

630 r.write_header() 

631 if self.tsig is not None: 

632 (new_tsig, ctx) = dns.tsig.sign( 

633 r.get_wire(), 

634 self.keyring, 

635 self.tsig[0], 

636 int(time.time()), 

637 self.request_mac, 

638 tsig_ctx, 

639 multi, 

640 ) 

641 self.tsig.clear() 

642 self.tsig.add(new_tsig) 

643 r.add_rrset(dns.renderer.ADDITIONAL, self.tsig) 

644 r.write_header() 

645 if multi: 

646 self.tsig_ctx = ctx 

647 wire = r.get_wire() 

648 if prepend_length: 

649 wire = len(wire).to_bytes(2, "big") + wire 

650 return wire 

651 

652 @staticmethod 

653 def _make_tsig( 

654 keyname, algorithm, time_signed, fudge, mac, original_id, error, other 

655 ): 

656 tsig = dns.rdtypes.ANY.TSIG.TSIG( 

657 dns.rdataclass.ANY, 

658 dns.rdatatype.TSIG, 

659 algorithm, 

660 time_signed, 

661 fudge, 

662 mac, 

663 original_id, 

664 error, 

665 other, 

666 ) 

667 return dns.rrset.from_rdata(keyname, 0, tsig) 

668 

669 def use_tsig( 

670 self, 

671 keyring: Any, 

672 keyname: Optional[Union[dns.name.Name, str]] = None, 

673 fudge: int = 300, 

674 original_id: Optional[int] = None, 

675 tsig_error: int = 0, 

676 other_data: bytes = b"", 

677 algorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm, 

678 ) -> None: 

679 """When sending, a TSIG signature using the specified key 

680 should be added. 

681 

682 *key*, a ``dns.tsig.Key`` is the key to use. If a key is specified, 

683 the *keyring* and *algorithm* fields are not used. 

684 

685 *keyring*, a ``dict``, ``callable`` or ``dns.tsig.Key``, is either 

686 the TSIG keyring or key to use. 

687 

688 The format of a keyring dict is a mapping from TSIG key name, as 

689 ``dns.name.Name`` to ``dns.tsig.Key`` or a TSIG secret, a ``bytes``. 

690 If a ``dict`` *keyring* is specified but a *keyname* is not, the key 

691 used will be the first key in the *keyring*. Note that the order of 

692 keys in a dictionary is not defined, so applications should supply a 

693 keyname when a ``dict`` keyring is used, unless they know the keyring 

694 contains only one key. If a ``callable`` keyring is specified, the 

695 callable will be called with the message and the keyname, and is 

696 expected to return a key. 

697 

698 *keyname*, a ``dns.name.Name``, ``str`` or ``None``, the name of 

699 this TSIG key to use; defaults to ``None``. If *keyring* is a 

700 ``dict``, the key must be defined in it. If *keyring* is a 

701 ``dns.tsig.Key``, this is ignored. 

702 

703 *fudge*, an ``int``, the TSIG time fudge. 

704 

705 *original_id*, an ``int``, the TSIG original id. If ``None``, 

706 the message's id is used. 

707 

708 *tsig_error*, an ``int``, the TSIG error code. 

709 

710 *other_data*, a ``bytes``, the TSIG other data. 

711 

712 *algorithm*, a ``dns.name.Name`` or ``str``, the TSIG algorithm to use. This is 

713 only used if *keyring* is a ``dict``, and the key entry is a ``bytes``. 

714 """ 

715 

716 if isinstance(keyring, dns.tsig.Key): 

717 key = keyring 

718 keyname = key.name 

719 elif callable(keyring): 

720 key = keyring(self, keyname) 

721 else: 

722 if isinstance(keyname, str): 

723 keyname = dns.name.from_text(keyname) 

724 if keyname is None: 

725 keyname = next(iter(keyring)) 

726 key = keyring[keyname] 

727 if isinstance(key, bytes): 

728 key = dns.tsig.Key(keyname, key, algorithm) 

729 self.keyring = key 

730 if original_id is None: 

731 original_id = self.id 

732 self.tsig = self._make_tsig( 

733 keyname, 

734 self.keyring.algorithm, 

735 0, 

736 fudge, 

737 b"\x00" * dns.tsig.mac_sizes[self.keyring.algorithm], 

738 original_id, 

739 tsig_error, 

740 other_data, 

741 ) 

742 

743 @property 

744 def keyname(self) -> Optional[dns.name.Name]: 

745 if self.tsig: 

746 return self.tsig.name 

747 else: 

748 return None 

749 

750 @property 

751 def keyalgorithm(self) -> Optional[dns.name.Name]: 

752 if self.tsig: 

753 return self.tsig[0].algorithm 

754 else: 

755 return None 

756 

757 @property 

758 def mac(self) -> Optional[bytes]: 

759 if self.tsig: 

760 return self.tsig[0].mac 

761 else: 

762 return None 

763 

764 @property 

765 def tsig_error(self) -> Optional[int]: 

766 if self.tsig: 

767 return self.tsig[0].error 

768 else: 

769 return None 

770 

771 @property 

772 def had_tsig(self) -> bool: 

773 return bool(self.tsig) 

774 

775 @staticmethod 

776 def _make_opt(flags=0, payload=DEFAULT_EDNS_PAYLOAD, options=None): 

777 opt = dns.rdtypes.ANY.OPT.OPT(payload, dns.rdatatype.OPT, options or ()) 

778 return dns.rrset.from_rdata(dns.name.root, int(flags), opt) 

779 

780 def use_edns( 

781 self, 

782 edns: Optional[Union[int, bool]] = 0, 

783 ednsflags: int = 0, 

784 payload: int = DEFAULT_EDNS_PAYLOAD, 

785 request_payload: Optional[int] = None, 

786 options: Optional[List[dns.edns.Option]] = None, 

787 pad: int = 0, 

788 ) -> None: 

789 """Configure EDNS behavior. 

790 

791 *edns*, an ``int``, is the EDNS level to use. Specifying ``None``, ``False``, 

792 or ``-1`` means "do not use EDNS", and in this case the other parameters are 

793 ignored. Specifying ``True`` is equivalent to specifying 0, i.e. "use EDNS0". 

794 

795 *ednsflags*, an ``int``, the EDNS flag values. 

796 

797 *payload*, an ``int``, is the EDNS sender's payload field, which is the maximum 

798 size of UDP datagram the sender can handle. I.e. how big a response to this 

799 message can be. 

800 

801 *request_payload*, an ``int``, is the EDNS payload size to use when sending this 

802 message. If not specified, defaults to the value of *payload*. 

803 

804 *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS options. 

805 

806 *pad*, a non-negative ``int``. If 0, the default, do not pad; otherwise add 

807 padding bytes to make the message size a multiple of *pad*. Note that if 

808 padding is non-zero, an EDNS PADDING option will always be added to the 

809 message. 

810 """ 

811 

812 if edns is None or edns is False: 

813 edns = -1 

814 elif edns is True: 

815 edns = 0 

816 if edns < 0: 

817 self.opt = None 

818 self.request_payload = 0 

819 else: 

820 # make sure the EDNS version in ednsflags agrees with edns 

821 ednsflags &= 0xFF00FFFF 

822 ednsflags |= edns << 16 

823 if options is None: 

824 options = [] 

825 self.opt = self._make_opt(ednsflags, payload, options) 

826 if request_payload is None: 

827 request_payload = payload 

828 self.request_payload = request_payload 

829 if pad < 0: 

830 raise ValueError("pad must be non-negative") 

831 self.pad = pad 

832 

833 @property 

834 def edns(self) -> int: 

835 if self.opt: 

836 return (self.ednsflags & 0xFF0000) >> 16 

837 else: 

838 return -1 

839 

840 @property 

841 def ednsflags(self) -> int: 

842 if self.opt: 

843 return self.opt.ttl 

844 else: 

845 return 0 

846 

847 @ednsflags.setter 

848 def ednsflags(self, v): 

849 if self.opt: 

850 self.opt.ttl = v 

851 elif v: 

852 self.opt = self._make_opt(v) 

853 

854 @property 

855 def payload(self) -> int: 

856 if self.opt: 

857 return self.opt[0].payload 

858 else: 

859 return 0 

860 

861 @property 

862 def options(self) -> Tuple: 

863 if self.opt: 

864 return self.opt[0].options 

865 else: 

866 return () 

867 

868 def want_dnssec(self, wanted: bool = True) -> None: 

869 """Enable or disable 'DNSSEC desired' flag in requests. 

870 

871 *wanted*, a ``bool``. If ``True``, then DNSSEC data is 

872 desired in the response, EDNS is enabled if required, and then 

873 the DO bit is set. If ``False``, the DO bit is cleared if 

874 EDNS is enabled. 

875 """ 

876 

877 if wanted: 

878 self.ednsflags |= dns.flags.DO 

879 elif self.opt: 

880 self.ednsflags &= ~int(dns.flags.DO) 

881 

882 def rcode(self) -> dns.rcode.Rcode: 

883 """Return the rcode. 

884 

885 Returns a ``dns.rcode.Rcode``. 

886 """ 

887 return dns.rcode.from_flags(int(self.flags), int(self.ednsflags)) 

888 

889 def set_rcode(self, rcode: dns.rcode.Rcode) -> None: 

890 """Set the rcode. 

891 

892 *rcode*, a ``dns.rcode.Rcode``, is the rcode to set. 

893 """ 

894 (value, evalue) = dns.rcode.to_flags(rcode) 

895 self.flags &= 0xFFF0 

896 self.flags |= value 

897 self.ednsflags &= 0x00FFFFFF 

898 self.ednsflags |= evalue 

899 

900 def opcode(self) -> dns.opcode.Opcode: 

901 """Return the opcode. 

902 

903 Returns a ``dns.opcode.Opcode``. 

904 """ 

905 return dns.opcode.from_flags(int(self.flags)) 

906 

907 def set_opcode(self, opcode: dns.opcode.Opcode) -> None: 

908 """Set the opcode. 

909 

910 *opcode*, a ``dns.opcode.Opcode``, is the opcode to set. 

911 """ 

912 self.flags &= 0x87FF 

913 self.flags |= dns.opcode.to_flags(opcode) 

914 

915 def _get_one_rr_per_rrset(self, value): 

916 # What the caller picked is fine. 

917 return value 

918 

919 # pylint: disable=unused-argument 

920 

921 def _parse_rr_header(self, section, name, rdclass, rdtype): 

922 return (rdclass, rdtype, None, False) 

923 

924 # pylint: enable=unused-argument 

925 

926 def _parse_special_rr_header(self, section, count, position, name, rdclass, rdtype): 

927 if rdtype == dns.rdatatype.OPT: 

928 if ( 

929 section != MessageSection.ADDITIONAL 

930 or self.opt 

931 or name != dns.name.root 

932 ): 

933 raise BadEDNS 

934 elif rdtype == dns.rdatatype.TSIG: 

935 if ( 

936 section != MessageSection.ADDITIONAL 

937 or rdclass != dns.rdatatype.ANY 

938 or position != count - 1 

939 ): 

940 raise BadTSIG 

941 return (rdclass, rdtype, None, False) 

942 

943 

944class ChainingResult: 

945 """The result of a call to dns.message.QueryMessage.resolve_chaining(). 

946 

947 The ``answer`` attribute is the answer RRSet, or ``None`` if it doesn't 

948 exist. 

949 

950 The ``canonical_name`` attribute is the canonical name after all 

951 chaining has been applied (this is the same name as ``rrset.name`` in cases 

952 where rrset is not ``None``). 

953 

954 The ``minimum_ttl`` attribute is the minimum TTL, i.e. the TTL to 

955 use if caching the data. It is the smallest of all the CNAME TTLs 

956 and either the answer TTL if it exists or the SOA TTL and SOA 

957 minimum values for negative answers. 

958 

959 The ``cnames`` attribute is a list of all the CNAME RRSets followed to 

960 get to the canonical name. 

961 """ 

962 

963 def __init__( 

964 self, 

965 canonical_name: dns.name.Name, 

966 answer: Optional[dns.rrset.RRset], 

967 minimum_ttl: int, 

968 cnames: List[dns.rrset.RRset], 

969 ): 

970 self.canonical_name = canonical_name 

971 self.answer = answer 

972 self.minimum_ttl = minimum_ttl 

973 self.cnames = cnames 

974 

975 

976class QueryMessage(Message): 

977 def resolve_chaining(self) -> ChainingResult: 

978 """Follow the CNAME chain in the response to determine the answer 

979 RRset. 

980 

981 Raises ``dns.message.NotQueryResponse`` if the message is not 

982 a response. 

983 

984 Raises ``dns.message.ChainTooLong`` if the CNAME chain is too long. 

985 

986 Raises ``dns.message.AnswerForNXDOMAIN`` if the rcode is NXDOMAIN 

987 but an answer was found. 

988 

989 Raises ``dns.exception.FormError`` if the question count is not 1. 

990 

991 Returns a ChainingResult object. 

992 """ 

993 if self.flags & dns.flags.QR == 0: 

994 raise NotQueryResponse 

995 if len(self.question) != 1: 

996 raise dns.exception.FormError 

997 question = self.question[0] 

998 qname = question.name 

999 min_ttl = dns.ttl.MAX_TTL 

1000 answer = None 

1001 count = 0 

1002 cnames = [] 

1003 while count < MAX_CHAIN: 

1004 try: 

1005 answer = self.find_rrset( 

1006 self.answer, qname, question.rdclass, question.rdtype 

1007 ) 

1008 min_ttl = min(min_ttl, answer.ttl) 

1009 break 

1010 except KeyError: 

1011 if question.rdtype != dns.rdatatype.CNAME: 

1012 try: 

1013 crrset = self.find_rrset( 

1014 self.answer, qname, question.rdclass, dns.rdatatype.CNAME 

1015 ) 

1016 cnames.append(crrset) 

1017 min_ttl = min(min_ttl, crrset.ttl) 

1018 for rd in crrset: 

1019 qname = rd.target 

1020 break 

1021 count += 1 

1022 continue 

1023 except KeyError: 

1024 # Exit the chaining loop 

1025 break 

1026 else: 

1027 # Exit the chaining loop 

1028 break 

1029 if count >= MAX_CHAIN: 

1030 raise ChainTooLong 

1031 if self.rcode() == dns.rcode.NXDOMAIN and answer is not None: 

1032 raise AnswerForNXDOMAIN 

1033 if answer is None: 

1034 # Further minimize the TTL with NCACHE. 

1035 auname = qname 

1036 while True: 

1037 # Look for an SOA RR whose owner name is a superdomain 

1038 # of qname. 

1039 try: 

1040 srrset = self.find_rrset( 

1041 self.authority, auname, question.rdclass, dns.rdatatype.SOA 

1042 ) 

1043 min_ttl = min(min_ttl, srrset.ttl, srrset[0].minimum) 

1044 break 

1045 except KeyError: 

1046 try: 

1047 auname = auname.parent() 

1048 except dns.name.NoParent: 

1049 break 

1050 return ChainingResult(qname, answer, min_ttl, cnames) 

1051 

1052 def canonical_name(self) -> dns.name.Name: 

1053 """Return the canonical name of the first name in the question 

1054 section. 

1055 

1056 Raises ``dns.message.NotQueryResponse`` if the message is not 

1057 a response. 

1058 

1059 Raises ``dns.message.ChainTooLong`` if the CNAME chain is too long. 

1060 

1061 Raises ``dns.message.AnswerForNXDOMAIN`` if the rcode is NXDOMAIN 

1062 but an answer was found. 

1063 

1064 Raises ``dns.exception.FormError`` if the question count is not 1. 

1065 """ 

1066 return self.resolve_chaining().canonical_name 

1067 

1068 

1069def _maybe_import_update(): 

1070 # We avoid circular imports by doing this here. We do it in another 

1071 # function as doing it in _message_factory_from_opcode() makes "dns" 

1072 # a local symbol, and the first line fails :) 

1073 

1074 # pylint: disable=redefined-outer-name,import-outside-toplevel,unused-import 

1075 import dns.update # noqa: F401 

1076 

1077 

1078def _message_factory_from_opcode(opcode): 

1079 if opcode == dns.opcode.QUERY: 

1080 return QueryMessage 

1081 elif opcode == dns.opcode.UPDATE: 

1082 _maybe_import_update() 

1083 return dns.update.UpdateMessage 

1084 else: 

1085 return Message 

1086 

1087 

1088class _WireReader: 

1089 

1090 """Wire format reader. 

1091 

1092 parser: the binary parser 

1093 message: The message object being built 

1094 initialize_message: Callback to set message parsing options 

1095 question_only: Are we only reading the question? 

1096 one_rr_per_rrset: Put each RR into its own RRset? 

1097 keyring: TSIG keyring 

1098 ignore_trailing: Ignore trailing junk at end of request? 

1099 multi: Is this message part of a multi-message sequence? 

1100 DNS dynamic updates. 

1101 continue_on_error: try to extract as much information as possible from 

1102 the message, accumulating MessageErrors in the *errors* attribute instead of 

1103 raising them. 

1104 """ 

1105 

1106 def __init__( 

1107 self, 

1108 wire, 

1109 initialize_message, 

1110 question_only=False, 

1111 one_rr_per_rrset=False, 

1112 ignore_trailing=False, 

1113 keyring=None, 

1114 multi=False, 

1115 continue_on_error=False, 

1116 ): 

1117 self.parser = dns.wire.Parser(wire) 

1118 self.message = None 

1119 self.initialize_message = initialize_message 

1120 self.question_only = question_only 

1121 self.one_rr_per_rrset = one_rr_per_rrset 

1122 self.ignore_trailing = ignore_trailing 

1123 self.keyring = keyring 

1124 self.multi = multi 

1125 self.continue_on_error = continue_on_error 

1126 self.errors = [] 

1127 

1128 def _get_question(self, section_number, qcount): 

1129 """Read the next *qcount* records from the wire data and add them to 

1130 the question section. 

1131 """ 

1132 assert self.message is not None 

1133 section = self.message.sections[section_number] 

1134 for _ in range(qcount): 

1135 qname = self.parser.get_name(self.message.origin) 

1136 (rdtype, rdclass) = self.parser.get_struct("!HH") 

1137 (rdclass, rdtype, _, _) = self.message._parse_rr_header( 

1138 section_number, qname, rdclass, rdtype 

1139 ) 

1140 self.message.find_rrset( 

1141 section, qname, rdclass, rdtype, create=True, force_unique=True 

1142 ) 

1143 

1144 def _add_error(self, e): 

1145 self.errors.append(MessageError(e, self.parser.current)) 

1146 

1147 def _get_section(self, section_number, count): 

1148 """Read the next I{count} records from the wire data and add them to 

1149 the specified section. 

1150 

1151 section_number: the section of the message to which to add records 

1152 count: the number of records to read 

1153 """ 

1154 assert self.message is not None 

1155 section = self.message.sections[section_number] 

1156 force_unique = self.one_rr_per_rrset 

1157 for i in range(count): 

1158 rr_start = self.parser.current 

1159 absolute_name = self.parser.get_name() 

1160 if self.message.origin is not None: 

1161 name = absolute_name.relativize(self.message.origin) 

1162 else: 

1163 name = absolute_name 

1164 (rdtype, rdclass, ttl, rdlen) = self.parser.get_struct("!HHIH") 

1165 if rdtype in (dns.rdatatype.OPT, dns.rdatatype.TSIG): 

1166 ( 

1167 rdclass, 

1168 rdtype, 

1169 deleting, 

1170 empty, 

1171 ) = self.message._parse_special_rr_header( 

1172 section_number, count, i, name, rdclass, rdtype 

1173 ) 

1174 else: 

1175 (rdclass, rdtype, deleting, empty) = self.message._parse_rr_header( 

1176 section_number, name, rdclass, rdtype 

1177 ) 

1178 rdata_start = self.parser.current 

1179 try: 

1180 if empty: 

1181 if rdlen > 0: 

1182 raise dns.exception.FormError 

1183 rd = None 

1184 covers = dns.rdatatype.NONE 

1185 else: 

1186 with self.parser.restrict_to(rdlen): 

1187 rd = dns.rdata.from_wire_parser( 

1188 rdclass, rdtype, self.parser, self.message.origin 

1189 ) 

1190 covers = rd.covers() 

1191 if self.message.xfr and rdtype == dns.rdatatype.SOA: 

1192 force_unique = True 

1193 if rdtype == dns.rdatatype.OPT: 

1194 self.message.opt = dns.rrset.from_rdata(name, ttl, rd) 

1195 elif rdtype == dns.rdatatype.TSIG: 

1196 if self.keyring is None: 

1197 raise UnknownTSIGKey("got signed message without keyring") 

1198 if isinstance(self.keyring, dict): 

1199 key = self.keyring.get(absolute_name) 

1200 if isinstance(key, bytes): 

1201 key = dns.tsig.Key(absolute_name, key, rd.algorithm) 

1202 elif callable(self.keyring): 

1203 key = self.keyring(self.message, absolute_name) 

1204 else: 

1205 key = self.keyring 

1206 if key is None: 

1207 raise UnknownTSIGKey("key '%s' unknown" % name) 

1208 self.message.keyring = key 

1209 self.message.tsig_ctx = dns.tsig.validate( 

1210 self.parser.wire, 

1211 key, 

1212 absolute_name, 

1213 rd, 

1214 int(time.time()), 

1215 self.message.request_mac, 

1216 rr_start, 

1217 self.message.tsig_ctx, 

1218 self.multi, 

1219 ) 

1220 self.message.tsig = dns.rrset.from_rdata(absolute_name, 0, rd) 

1221 else: 

1222 rrset = self.message.find_rrset( 

1223 section, 

1224 name, 

1225 rdclass, 

1226 rdtype, 

1227 covers, 

1228 deleting, 

1229 True, 

1230 force_unique, 

1231 ) 

1232 if rd is not None: 

1233 if ttl > 0x7FFFFFFF: 

1234 ttl = 0 

1235 rrset.add(rd, ttl) 

1236 except Exception as e: 

1237 if self.continue_on_error: 

1238 self._add_error(e) 

1239 self.parser.seek(rdata_start + rdlen) 

1240 else: 

1241 raise 

1242 

1243 def read(self): 

1244 """Read a wire format DNS message and build a dns.message.Message 

1245 object.""" 

1246 

1247 if self.parser.remaining() < 12: 

1248 raise ShortHeader 

1249 (id, flags, qcount, ancount, aucount, adcount) = self.parser.get_struct( 

1250 "!HHHHHH" 

1251 ) 

1252 factory = _message_factory_from_opcode(dns.opcode.from_flags(flags)) 

1253 self.message = factory(id=id) 

1254 self.message.flags = dns.flags.Flag(flags) 

1255 self.initialize_message(self.message) 

1256 self.one_rr_per_rrset = self.message._get_one_rr_per_rrset( 

1257 self.one_rr_per_rrset 

1258 ) 

1259 try: 

1260 self._get_question(MessageSection.QUESTION, qcount) 

1261 if self.question_only: 

1262 return self.message 

1263 self._get_section(MessageSection.ANSWER, ancount) 

1264 self._get_section(MessageSection.AUTHORITY, aucount) 

1265 self._get_section(MessageSection.ADDITIONAL, adcount) 

1266 if not self.ignore_trailing and self.parser.remaining() != 0: 

1267 raise TrailingJunk 

1268 if self.multi and self.message.tsig_ctx and not self.message.had_tsig: 

1269 self.message.tsig_ctx.update(self.parser.wire) 

1270 except Exception as e: 

1271 if self.continue_on_error: 

1272 self._add_error(e) 

1273 else: 

1274 raise 

1275 return self.message 

1276 

1277 

1278def from_wire( 

1279 wire: bytes, 

1280 keyring: Optional[Any] = None, 

1281 request_mac: Optional[bytes] = b"", 

1282 xfr: bool = False, 

1283 origin: Optional[dns.name.Name] = None, 

1284 tsig_ctx: Optional[Union[dns.tsig.HMACTSig, dns.tsig.GSSTSig]] = None, 

1285 multi: bool = False, 

1286 question_only: bool = False, 

1287 one_rr_per_rrset: bool = False, 

1288 ignore_trailing: bool = False, 

1289 raise_on_truncation: bool = False, 

1290 continue_on_error: bool = False, 

1291) -> Message: 

1292 """Convert a DNS wire format message into a message object. 

1293 

1294 *keyring*, a ``dns.tsig.Key`` or ``dict``, the key or keyring to use if the message 

1295 is signed. 

1296 

1297 *request_mac*, a ``bytes`` or ``None``. If the message is a response to a 

1298 TSIG-signed request, *request_mac* should be set to the MAC of that request. 

1299 

1300 *xfr*, a ``bool``, should be set to ``True`` if this message is part of a zone 

1301 transfer. 

1302 

1303 *origin*, a ``dns.name.Name`` or ``None``. If the message is part of a zone 

1304 transfer, *origin* should be the origin name of the zone. If not ``None``, names 

1305 will be relativized to the origin. 

1306 

1307 *tsig_ctx*, a ``dns.tsig.HMACTSig`` or ``dns.tsig.GSSTSig`` object, the ongoing TSIG 

1308 context, used when validating zone transfers. 

1309 

1310 *multi*, a ``bool``, should be set to ``True`` if this message is part of a multiple 

1311 message sequence. 

1312 

1313 *question_only*, a ``bool``. If ``True``, read only up to the end of the question 

1314 section. 

1315 

1316 *one_rr_per_rrset*, a ``bool``. If ``True``, put each RR into its own RRset. 

1317 

1318 *ignore_trailing*, a ``bool``. If ``True``, ignore trailing junk at end of the 

1319 message. 

1320 

1321 *raise_on_truncation*, a ``bool``. If ``True``, raise an exception if the TC bit is 

1322 set. 

1323 

1324 *continue_on_error*, a ``bool``. If ``True``, try to continue parsing even if 

1325 errors occur. Erroneous rdata will be ignored. Errors will be accumulated as a 

1326 list of MessageError objects in the message's ``errors`` attribute. This option is 

1327 recommended only for DNS analysis tools, or for use in a server as part of an error 

1328 handling path. The default is ``False``. 

1329 

1330 Raises ``dns.message.ShortHeader`` if the message is less than 12 octets long. 

1331 

1332 Raises ``dns.message.TrailingJunk`` if there were octets in the message past the end 

1333 of the proper DNS message, and *ignore_trailing* is ``False``. 

1334 

1335 Raises ``dns.message.BadEDNS`` if an OPT record was in the wrong section, or 

1336 occurred more than once. 

1337 

1338 Raises ``dns.message.BadTSIG`` if a TSIG record was not the last record of the 

1339 additional data section. 

1340 

1341 Raises ``dns.message.Truncated`` if the TC flag is set and *raise_on_truncation* is 

1342 ``True``. 

1343 

1344 Returns a ``dns.message.Message``. 

1345 """ 

1346 

1347 # We permit None for request_mac solely for backwards compatibility 

1348 if request_mac is None: 

1349 request_mac = b"" 

1350 

1351 def initialize_message(message): 

1352 message.request_mac = request_mac 

1353 message.xfr = xfr 

1354 message.origin = origin 

1355 message.tsig_ctx = tsig_ctx 

1356 

1357 reader = _WireReader( 

1358 wire, 

1359 initialize_message, 

1360 question_only, 

1361 one_rr_per_rrset, 

1362 ignore_trailing, 

1363 keyring, 

1364 multi, 

1365 continue_on_error, 

1366 ) 

1367 try: 

1368 m = reader.read() 

1369 except dns.exception.FormError: 

1370 if ( 

1371 reader.message 

1372 and (reader.message.flags & dns.flags.TC) 

1373 and raise_on_truncation 

1374 ): 

1375 raise Truncated(message=reader.message) 

1376 else: 

1377 raise 

1378 # Reading a truncated message might not have any errors, so we 

1379 # have to do this check here too. 

1380 if m.flags & dns.flags.TC and raise_on_truncation: 

1381 raise Truncated(message=m) 

1382 if continue_on_error: 

1383 m.errors = reader.errors 

1384 

1385 return m 

1386 

1387 

1388class _TextReader: 

1389 

1390 """Text format reader. 

1391 

1392 tok: the tokenizer. 

1393 message: The message object being built. 

1394 DNS dynamic updates. 

1395 last_name: The most recently read name when building a message object. 

1396 one_rr_per_rrset: Put each RR into its own RRset? 

1397 origin: The origin for relative names 

1398 relativize: relativize names? 

1399 relativize_to: the origin to relativize to. 

1400 """ 

1401 

1402 def __init__( 

1403 self, 

1404 text, 

1405 idna_codec, 

1406 one_rr_per_rrset=False, 

1407 origin=None, 

1408 relativize=True, 

1409 relativize_to=None, 

1410 ): 

1411 self.message = None 

1412 self.tok = dns.tokenizer.Tokenizer(text, idna_codec=idna_codec) 

1413 self.last_name = None 

1414 self.one_rr_per_rrset = one_rr_per_rrset 

1415 self.origin = origin 

1416 self.relativize = relativize 

1417 self.relativize_to = relativize_to 

1418 self.id = None 

1419 self.edns = -1 

1420 self.ednsflags = 0 

1421 self.payload = DEFAULT_EDNS_PAYLOAD 

1422 self.rcode = None 

1423 self.opcode = dns.opcode.QUERY 

1424 self.flags = 0 

1425 

1426 def _header_line(self, _): 

1427 """Process one line from the text format header section.""" 

1428 

1429 token = self.tok.get() 

1430 what = token.value 

1431 if what == "id": 

1432 self.id = self.tok.get_int() 

1433 elif what == "flags": 

1434 while True: 

1435 token = self.tok.get() 

1436 if not token.is_identifier(): 

1437 self.tok.unget(token) 

1438 break 

1439 self.flags = self.flags | dns.flags.from_text(token.value) 

1440 elif what == "edns": 

1441 self.edns = self.tok.get_int() 

1442 self.ednsflags = self.ednsflags | (self.edns << 16) 

1443 elif what == "eflags": 

1444 if self.edns < 0: 

1445 self.edns = 0 

1446 while True: 

1447 token = self.tok.get() 

1448 if not token.is_identifier(): 

1449 self.tok.unget(token) 

1450 break 

1451 self.ednsflags = self.ednsflags | dns.flags.edns_from_text(token.value) 

1452 elif what == "payload": 

1453 self.payload = self.tok.get_int() 

1454 if self.edns < 0: 

1455 self.edns = 0 

1456 elif what == "opcode": 

1457 text = self.tok.get_string() 

1458 self.opcode = dns.opcode.from_text(text) 

1459 self.flags = self.flags | dns.opcode.to_flags(self.opcode) 

1460 elif what == "rcode": 

1461 text = self.tok.get_string() 

1462 self.rcode = dns.rcode.from_text(text) 

1463 else: 

1464 raise UnknownHeaderField 

1465 self.tok.get_eol() 

1466 

1467 def _question_line(self, section_number): 

1468 """Process one line from the text format question section.""" 

1469 

1470 section = self.message.sections[section_number] 

1471 token = self.tok.get(want_leading=True) 

1472 if not token.is_whitespace(): 

1473 self.last_name = self.tok.as_name( 

1474 token, self.message.origin, self.relativize, self.relativize_to 

1475 ) 

1476 name = self.last_name 

1477 if name is None: 

1478 raise NoPreviousName 

1479 token = self.tok.get() 

1480 if not token.is_identifier(): 

1481 raise dns.exception.SyntaxError 

1482 # Class 

1483 try: 

1484 rdclass = dns.rdataclass.from_text(token.value) 

1485 token = self.tok.get() 

1486 if not token.is_identifier(): 

1487 raise dns.exception.SyntaxError 

1488 except dns.exception.SyntaxError: 

1489 raise dns.exception.SyntaxError 

1490 except Exception: 

1491 rdclass = dns.rdataclass.IN 

1492 # Type 

1493 rdtype = dns.rdatatype.from_text(token.value) 

1494 (rdclass, rdtype, _, _) = self.message._parse_rr_header( 

1495 section_number, name, rdclass, rdtype 

1496 ) 

1497 self.message.find_rrset( 

1498 section, name, rdclass, rdtype, create=True, force_unique=True 

1499 ) 

1500 self.tok.get_eol() 

1501 

1502 def _rr_line(self, section_number): 

1503 """Process one line from the text format answer, authority, or 

1504 additional data sections. 

1505 """ 

1506 

1507 section = self.message.sections[section_number] 

1508 # Name 

1509 token = self.tok.get(want_leading=True) 

1510 if not token.is_whitespace(): 

1511 self.last_name = self.tok.as_name( 

1512 token, self.message.origin, self.relativize, self.relativize_to 

1513 ) 

1514 name = self.last_name 

1515 if name is None: 

1516 raise NoPreviousName 

1517 token = self.tok.get() 

1518 if not token.is_identifier(): 

1519 raise dns.exception.SyntaxError 

1520 # TTL 

1521 try: 

1522 ttl = int(token.value, 0) 

1523 token = self.tok.get() 

1524 if not token.is_identifier(): 

1525 raise dns.exception.SyntaxError 

1526 except dns.exception.SyntaxError: 

1527 raise dns.exception.SyntaxError 

1528 except Exception: 

1529 ttl = 0 

1530 # Class 

1531 try: 

1532 rdclass = dns.rdataclass.from_text(token.value) 

1533 token = self.tok.get() 

1534 if not token.is_identifier(): 

1535 raise dns.exception.SyntaxError 

1536 except dns.exception.SyntaxError: 

1537 raise dns.exception.SyntaxError 

1538 except Exception: 

1539 rdclass = dns.rdataclass.IN 

1540 # Type 

1541 rdtype = dns.rdatatype.from_text(token.value) 

1542 (rdclass, rdtype, deleting, empty) = self.message._parse_rr_header( 

1543 section_number, name, rdclass, rdtype 

1544 ) 

1545 token = self.tok.get() 

1546 if empty and not token.is_eol_or_eof(): 

1547 raise dns.exception.SyntaxError 

1548 if not empty and token.is_eol_or_eof(): 

1549 raise dns.exception.UnexpectedEnd 

1550 if not token.is_eol_or_eof(): 

1551 self.tok.unget(token) 

1552 rd = dns.rdata.from_text( 

1553 rdclass, 

1554 rdtype, 

1555 self.tok, 

1556 self.message.origin, 

1557 self.relativize, 

1558 self.relativize_to, 

1559 ) 

1560 covers = rd.covers() 

1561 else: 

1562 rd = None 

1563 covers = dns.rdatatype.NONE 

1564 rrset = self.message.find_rrset( 

1565 section, 

1566 name, 

1567 rdclass, 

1568 rdtype, 

1569 covers, 

1570 deleting, 

1571 True, 

1572 self.one_rr_per_rrset, 

1573 ) 

1574 if rd is not None: 

1575 rrset.add(rd, ttl) 

1576 

1577 def _make_message(self): 

1578 factory = _message_factory_from_opcode(self.opcode) 

1579 message = factory(id=self.id) 

1580 message.flags = self.flags 

1581 if self.edns >= 0: 

1582 message.use_edns(self.edns, self.ednsflags, self.payload) 

1583 if self.rcode: 

1584 message.set_rcode(self.rcode) 

1585 if self.origin: 

1586 message.origin = self.origin 

1587 return message 

1588 

1589 def read(self): 

1590 """Read a text format DNS message and build a dns.message.Message 

1591 object.""" 

1592 

1593 line_method = self._header_line 

1594 section_number = None 

1595 while 1: 

1596 token = self.tok.get(True, True) 

1597 if token.is_eol_or_eof(): 

1598 break 

1599 if token.is_comment(): 

1600 u = token.value.upper() 

1601 if u == "HEADER": 

1602 line_method = self._header_line 

1603 

1604 if self.message: 

1605 message = self.message 

1606 else: 

1607 # If we don't have a message, create one with the current 

1608 # opcode, so that we know which section names to parse. 

1609 message = self._make_message() 

1610 try: 

1611 section_number = message._section_enum.from_text(u) 

1612 # We found a section name. If we don't have a message, 

1613 # use the one we just created. 

1614 if not self.message: 

1615 self.message = message 

1616 self.one_rr_per_rrset = message._get_one_rr_per_rrset( 

1617 self.one_rr_per_rrset 

1618 ) 

1619 if section_number == MessageSection.QUESTION: 

1620 line_method = self._question_line 

1621 else: 

1622 line_method = self._rr_line 

1623 except Exception: 

1624 # It's just a comment. 

1625 pass 

1626 self.tok.get_eol() 

1627 continue 

1628 self.tok.unget(token) 

1629 line_method(section_number) 

1630 if not self.message: 

1631 self.message = self._make_message() 

1632 return self.message 

1633 

1634 

1635def from_text( 

1636 text: str, 

1637 idna_codec: Optional[dns.name.IDNACodec] = None, 

1638 one_rr_per_rrset: bool = False, 

1639 origin: Optional[dns.name.Name] = None, 

1640 relativize: bool = True, 

1641 relativize_to: Optional[dns.name.Name] = None, 

1642) -> Message: 

1643 """Convert the text format message into a message object. 

1644 

1645 The reader stops after reading the first blank line in the input to 

1646 facilitate reading multiple messages from a single file with 

1647 ``dns.message.from_file()``. 

1648 

1649 *text*, a ``str``, the text format message. 

1650 

1651 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA 

1652 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder 

1653 is used. 

1654 

1655 *one_rr_per_rrset*, a ``bool``. If ``True``, then each RR is put 

1656 into its own rrset. The default is ``False``. 

1657 

1658 *origin*, a ``dns.name.Name`` (or ``None``), the 

1659 origin to use for relative names. 

1660 

1661 *relativize*, a ``bool``. If true, name will be relativized. 

1662 

1663 *relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use 

1664 when relativizing names. If not set, the *origin* value will be used. 

1665 

1666 Raises ``dns.message.UnknownHeaderField`` if a header is unknown. 

1667 

1668 Raises ``dns.exception.SyntaxError`` if the text is badly formed. 

1669 

1670 Returns a ``dns.message.Message object`` 

1671 """ 

1672 

1673 # 'text' can also be a file, but we don't publish that fact 

1674 # since it's an implementation detail. The official file 

1675 # interface is from_file(). 

1676 

1677 reader = _TextReader( 

1678 text, idna_codec, one_rr_per_rrset, origin, relativize, relativize_to 

1679 ) 

1680 return reader.read() 

1681 

1682 

1683def from_file( 

1684 f: Any, 

1685 idna_codec: Optional[dns.name.IDNACodec] = None, 

1686 one_rr_per_rrset: bool = False, 

1687) -> Message: 

1688 """Read the next text format message from the specified file. 

1689 

1690 Message blocks are separated by a single blank line. 

1691 

1692 *f*, a ``file`` or ``str``. If *f* is text, it is treated as the 

1693 pathname of a file to open. 

1694 

1695 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA 

1696 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder 

1697 is used. 

1698 

1699 *one_rr_per_rrset*, a ``bool``. If ``True``, then each RR is put 

1700 into its own rrset. The default is ``False``. 

1701 

1702 Raises ``dns.message.UnknownHeaderField`` if a header is unknown. 

1703 

1704 Raises ``dns.exception.SyntaxError`` if the text is badly formed. 

1705 

1706 Returns a ``dns.message.Message object`` 

1707 """ 

1708 

1709 if isinstance(f, str): 

1710 cm: contextlib.AbstractContextManager = open(f) 

1711 else: 

1712 cm = contextlib.nullcontext(f) 

1713 with cm as f: 

1714 return from_text(f, idna_codec, one_rr_per_rrset) 

1715 assert False # for mypy lgtm[py/unreachable-statement] 

1716 

1717 

1718def make_query( 

1719 qname: Union[dns.name.Name, str], 

1720 rdtype: Union[dns.rdatatype.RdataType, str], 

1721 rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, 

1722 use_edns: Optional[Union[int, bool]] = None, 

1723 want_dnssec: bool = False, 

1724 ednsflags: Optional[int] = None, 

1725 payload: Optional[int] = None, 

1726 request_payload: Optional[int] = None, 

1727 options: Optional[List[dns.edns.Option]] = None, 

1728 idna_codec: Optional[dns.name.IDNACodec] = None, 

1729 id: Optional[int] = None, 

1730 flags: int = dns.flags.RD, 

1731 pad: int = 0, 

1732) -> QueryMessage: 

1733 """Make a query message. 

1734 

1735 The query name, type, and class may all be specified either 

1736 as objects of the appropriate type, or as strings. 

1737 

1738 The query will have a randomly chosen query id, and its DNS flags 

1739 will be set to dns.flags.RD. 

1740 

1741 qname, a ``dns.name.Name`` or ``str``, the query name. 

1742 

1743 *rdtype*, an ``int`` or ``str``, the desired rdata type. 

1744 

1745 *rdclass*, an ``int`` or ``str``, the desired rdata class; the default 

1746 is class IN. 

1747 

1748 *use_edns*, an ``int``, ``bool`` or ``None``. The EDNS level to use; the 

1749 default is ``None``. If ``None``, EDNS will be enabled only if other 

1750 parameters (*ednsflags*, *payload*, *request_payload*, or *options*) are 

1751 set. 

1752 See the description of dns.message.Message.use_edns() for the possible 

1753 values for use_edns and their meanings. 

1754 

1755 *want_dnssec*, a ``bool``. If ``True``, DNSSEC data is desired. 

1756 

1757 *ednsflags*, an ``int``, the EDNS flag values. 

1758 

1759 *payload*, an ``int``, is the EDNS sender's payload field, which is the 

1760 maximum size of UDP datagram the sender can handle. I.e. how big 

1761 a response to this message can be. 

1762 

1763 *request_payload*, an ``int``, is the EDNS payload size to use when 

1764 sending this message. If not specified, defaults to the value of 

1765 *payload*. 

1766 

1767 *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS 

1768 options. 

1769 

1770 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA 

1771 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder 

1772 is used. 

1773 

1774 *id*, an ``int`` or ``None``, the desired query id. The default is 

1775 ``None``, which generates a random query id. 

1776 

1777 *flags*, an ``int``, the desired query flags. The default is 

1778 ``dns.flags.RD``. 

1779 

1780 *pad*, a non-negative ``int``. If 0, the default, do not pad; otherwise add 

1781 padding bytes to make the message size a multiple of *pad*. Note that if 

1782 padding is non-zero, an EDNS PADDING option will always be added to the 

1783 message. 

1784 

1785 Returns a ``dns.message.QueryMessage`` 

1786 """ 

1787 

1788 if isinstance(qname, str): 

1789 qname = dns.name.from_text(qname, idna_codec=idna_codec) 

1790 rdtype = dns.rdatatype.RdataType.make(rdtype) 

1791 rdclass = dns.rdataclass.RdataClass.make(rdclass) 

1792 m = QueryMessage(id=id) 

1793 m.flags = dns.flags.Flag(flags) 

1794 m.find_rrset(m.question, qname, rdclass, rdtype, create=True, force_unique=True) 

1795 # only pass keywords on to use_edns if they have been set to a 

1796 # non-None value. Setting a field will turn EDNS on if it hasn't 

1797 # been configured. 

1798 kwargs: Dict[str, Any] = {} 

1799 if ednsflags is not None: 

1800 kwargs["ednsflags"] = ednsflags 

1801 if payload is not None: 

1802 kwargs["payload"] = payload 

1803 if request_payload is not None: 

1804 kwargs["request_payload"] = request_payload 

1805 if options is not None: 

1806 kwargs["options"] = options 

1807 if kwargs and use_edns is None: 

1808 use_edns = 0 

1809 kwargs["edns"] = use_edns 

1810 kwargs["pad"] = pad 

1811 m.use_edns(**kwargs) 

1812 m.want_dnssec(want_dnssec) 

1813 return m 

1814 

1815 

1816def make_response( 

1817 query: Message, 

1818 recursion_available: bool = False, 

1819 our_payload: int = 8192, 

1820 fudge: int = 300, 

1821 tsig_error: int = 0, 

1822 pad: Optional[int] = None, 

1823) -> Message: 

1824 """Make a message which is a response for the specified query. 

1825 The message returned is really a response skeleton; it has all of the infrastructure 

1826 required of a response, but none of the content. 

1827 

1828 The response's question section is a shallow copy of the query's question section, 

1829 so the query's question RRsets should not be changed. 

1830 

1831 *query*, a ``dns.message.Message``, the query to respond to. 

1832 

1833 *recursion_available*, a ``bool``, should RA be set in the response? 

1834 

1835 *our_payload*, an ``int``, the payload size to advertise in EDNS responses. 

1836 

1837 *fudge*, an ``int``, the TSIG time fudge. 

1838 

1839 *tsig_error*, an ``int``, the TSIG error. 

1840 

1841 *pad*, a non-negative ``int`` or ``None``. If 0, the default, do not pad; otherwise 

1842 if not ``None`` add padding bytes to make the message size a multiple of *pad*. 

1843 Note that if padding is non-zero, an EDNS PADDING option will always be added to the 

1844 message. If ``None``, add padding following RFC 8467, namely if the request is 

1845 padded, pad the response to 468 otherwise do not pad. 

1846 

1847 Returns a ``dns.message.Message`` object whose specific class is appropriate for the 

1848 query. For example, if query is a ``dns.update.UpdateMessage``, response will be 

1849 too. 

1850 """ 

1851 

1852 if query.flags & dns.flags.QR: 

1853 raise dns.exception.FormError("specified query message is not a query") 

1854 factory = _message_factory_from_opcode(query.opcode()) 

1855 response = factory(id=query.id) 

1856 response.flags = dns.flags.QR | (query.flags & dns.flags.RD) 

1857 if recursion_available: 

1858 response.flags |= dns.flags.RA 

1859 response.set_opcode(query.opcode()) 

1860 response.question = list(query.question) 

1861 if query.edns >= 0: 

1862 if pad is None: 

1863 # Set response padding per RFC 8467 

1864 pad = 0 

1865 for option in query.options: 

1866 if option.otype == dns.edns.OptionType.PADDING: 

1867 pad = 468 

1868 response.use_edns(0, 0, our_payload, query.payload, pad=pad) 

1869 if query.had_tsig: 

1870 response.use_tsig( 

1871 query.keyring, 

1872 query.keyname, 

1873 fudge, 

1874 None, 

1875 tsig_error, 

1876 b"", 

1877 query.keyalgorithm, 

1878 ) 

1879 response.request_mac = query.mac 

1880 return response 

1881 

1882 

1883### BEGIN generated MessageSection constants 

1884 

1885QUESTION = MessageSection.QUESTION 

1886ANSWER = MessageSection.ANSWER 

1887AUTHORITY = MessageSection.AUTHORITY 

1888ADDITIONAL = MessageSection.ADDITIONAL 

1889 

1890### END generated MessageSection constants