Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/message.py: 48%

777 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:58 +0000

1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 

2 

3# Copyright (C) 2001-2017 Nominum, Inc. 

4# 

5# Permission to use, copy, modify, and distribute this software and its 

6# documentation for any purpose with or without fee is hereby granted, 

7# provided that the above copyright notice and this permission notice 

8# appear in all copies. 

9# 

10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 

11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 

12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 

13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 

14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 

15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 

16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 

17 

18"""DNS Messages""" 

19 

20import contextlib 

21import io 

22import time 

23from typing import Any, Dict, List, Optional, Tuple, Union 

24 

25import dns.edns 

26import dns.entropy 

27import dns.enum 

28import dns.exception 

29import dns.flags 

30import dns.name 

31import dns.opcode 

32import dns.rcode 

33import dns.rdata 

34import dns.rdataclass 

35import dns.rdatatype 

36import dns.rdtypes.ANY.OPT 

37import dns.rdtypes.ANY.TSIG 

38import dns.renderer 

39import dns.rrset 

40import dns.tsig 

41import dns.ttl 

42import dns.wire 

43 

44 

45class ShortHeader(dns.exception.FormError): 

46 """The DNS packet passed to from_wire() is too short.""" 

47 

48 

49class TrailingJunk(dns.exception.FormError): 

50 """The DNS packet passed to from_wire() has extra junk at the end of it.""" 

51 

52 

53class UnknownHeaderField(dns.exception.DNSException): 

54 """The header field name was not recognized when converting from text 

55 into a message.""" 

56 

57 

58class BadEDNS(dns.exception.FormError): 

59 """An OPT record occurred somewhere other than 

60 the additional data section.""" 

61 

62 

63class BadTSIG(dns.exception.FormError): 

64 """A TSIG record occurred somewhere other than the end of 

65 the additional data section.""" 

66 

67 

68class UnknownTSIGKey(dns.exception.DNSException): 

69 """A TSIG with an unknown key was received.""" 

70 

71 

72class Truncated(dns.exception.DNSException): 

73 """The truncated flag is set.""" 

74 

75 supp_kwargs = {"message"} 

76 

77 # We do this as otherwise mypy complains about unexpected keyword argument 

78 # idna_exception 

79 def __init__(self, *args, **kwargs): 

80 super().__init__(*args, **kwargs) 

81 

82 def message(self): 

83 """As much of the message as could be processed. 

84 

85 Returns a ``dns.message.Message``. 

86 """ 

87 return self.kwargs["message"] 

88 

89 

90class NotQueryResponse(dns.exception.DNSException): 

91 """Message is not a response to a query.""" 

92 

93 

94class ChainTooLong(dns.exception.DNSException): 

95 """The CNAME chain is too long.""" 

96 

97 

98class AnswerForNXDOMAIN(dns.exception.DNSException): 

99 """The rcode is NXDOMAIN but an answer was found.""" 

100 

101 

102class NoPreviousName(dns.exception.SyntaxError): 

103 """No previous name was known.""" 

104 

105 

106class MessageSection(dns.enum.IntEnum): 

107 """Message sections""" 

108 

109 QUESTION = 0 

110 ANSWER = 1 

111 AUTHORITY = 2 

112 ADDITIONAL = 3 

113 

114 @classmethod 

115 def _maximum(cls): 

116 return 3 

117 

118 

119class MessageError: 

120 def __init__(self, exception: Exception, offset: int): 

121 self.exception = exception 

122 self.offset = offset 

123 

124 

125DEFAULT_EDNS_PAYLOAD = 1232 

126MAX_CHAIN = 16 

127 

128IndexKeyType = Tuple[ 

129 int, 

130 dns.name.Name, 

131 dns.rdataclass.RdataClass, 

132 dns.rdatatype.RdataType, 

133 Optional[dns.rdatatype.RdataType], 

134 Optional[dns.rdataclass.RdataClass], 

135] 

136IndexType = Dict[IndexKeyType, dns.rrset.RRset] 

137SectionType = Union[int, str, List[dns.rrset.RRset]] 

138 

139 

140class Message: 

141 """A DNS message.""" 

142 

143 _section_enum = MessageSection 

144 

145 def __init__(self, id: Optional[int] = None): 

146 if id is None: 

147 self.id = dns.entropy.random_16() 

148 else: 

149 self.id = id 

150 self.flags = 0 

151 self.sections: List[List[dns.rrset.RRset]] = [[], [], [], []] 

152 self.opt: Optional[dns.rrset.RRset] = None 

153 self.request_payload = 0 

154 self.pad = 0 

155 self.keyring: Any = None 

156 self.tsig: Optional[dns.rrset.RRset] = None 

157 self.request_mac = b"" 

158 self.xfr = False 

159 self.origin: Optional[dns.name.Name] = None 

160 self.tsig_ctx: Optional[Any] = None 

161 self.index: IndexType = {} 

162 self.errors: List[MessageError] = [] 

163 self.time = 0.0 

164 

165 @property 

166 def question(self) -> List[dns.rrset.RRset]: 

167 """The question section.""" 

168 return self.sections[0] 

169 

170 @question.setter 

171 def question(self, v): 

172 self.sections[0] = v 

173 

174 @property 

175 def answer(self) -> List[dns.rrset.RRset]: 

176 """The answer section.""" 

177 return self.sections[1] 

178 

179 @answer.setter 

180 def answer(self, v): 

181 self.sections[1] = v 

182 

183 @property 

184 def authority(self) -> List[dns.rrset.RRset]: 

185 """The authority section.""" 

186 return self.sections[2] 

187 

188 @authority.setter 

189 def authority(self, v): 

190 self.sections[2] = v 

191 

192 @property 

193 def additional(self) -> List[dns.rrset.RRset]: 

194 """The additional data section.""" 

195 return self.sections[3] 

196 

197 @additional.setter 

198 def additional(self, v): 

199 self.sections[3] = v 

200 

201 def __repr__(self): 

202 return "<DNS message, ID " + repr(self.id) + ">" 

203 

204 def __str__(self): 

205 return self.to_text() 

206 

207 def to_text( 

208 self, 

209 origin: Optional[dns.name.Name] = None, 

210 relativize: bool = True, 

211 **kw: Dict[str, Any], 

212 ) -> str: 

213 """Convert the message to text. 

214 

215 The *origin*, *relativize*, and any other keyword 

216 arguments are passed to the RRset ``to_wire()`` method. 

217 

218 Returns a ``str``. 

219 """ 

220 

221 s = io.StringIO() 

222 s.write("id %d\n" % self.id) 

223 s.write("opcode %s\n" % dns.opcode.to_text(self.opcode())) 

224 s.write("rcode %s\n" % dns.rcode.to_text(self.rcode())) 

225 s.write("flags %s\n" % dns.flags.to_text(self.flags)) 

226 if self.edns >= 0: 

227 s.write("edns %s\n" % self.edns) 

228 if self.ednsflags != 0: 

229 s.write("eflags %s\n" % dns.flags.edns_to_text(self.ednsflags)) 

230 s.write("payload %d\n" % self.payload) 

231 for opt in self.options: 

232 s.write("option %s\n" % opt.to_text()) 

233 for name, which in self._section_enum.__members__.items(): 

234 s.write(f";{name}\n") 

235 for rrset in self.section_from_number(which): 

236 s.write(rrset.to_text(origin, relativize, **kw)) 

237 s.write("\n") 

238 # 

239 # We strip off the final \n so the caller can print the result without 

240 # doing weird things to get around eccentricities in Python print 

241 # formatting 

242 # 

243 return s.getvalue()[:-1] 

244 

245 def __eq__(self, other): 

246 """Two messages are equal if they have the same content in the 

247 header, question, answer, and authority sections. 

248 

249 Returns a ``bool``. 

250 """ 

251 

252 if not isinstance(other, Message): 

253 return False 

254 if self.id != other.id: 

255 return False 

256 if self.flags != other.flags: 

257 return False 

258 for i, section in enumerate(self.sections): 

259 other_section = other.sections[i] 

260 for n in section: 

261 if n not in other_section: 

262 return False 

263 for n in other_section: 

264 if n not in section: 

265 return False 

266 return True 

267 

268 def __ne__(self, other): 

269 return not self.__eq__(other) 

270 

271 def is_response(self, other: "Message") -> bool: 

272 """Is *other*, also a ``dns.message.Message``, a response to this 

273 message? 

274 

275 Returns a ``bool``. 

276 """ 

277 

278 if ( 

279 other.flags & dns.flags.QR == 0 

280 or self.id != other.id 

281 or dns.opcode.from_flags(self.flags) != dns.opcode.from_flags(other.flags) 

282 ): 

283 return False 

284 if other.rcode() in { 

285 dns.rcode.FORMERR, 

286 dns.rcode.SERVFAIL, 

287 dns.rcode.NOTIMP, 

288 dns.rcode.REFUSED, 

289 }: 

290 # We don't check the question section in these cases if 

291 # the other question section is empty, even though they 

292 # still really ought to have a question section. 

293 if len(other.question) == 0: 

294 return True 

295 if dns.opcode.is_update(self.flags): 

296 # This is assuming the "sender doesn't include anything 

297 # from the update", but we don't care to check the other 

298 # case, which is that all the sections are returned and 

299 # identical. 

300 return True 

301 for n in self.question: 

302 if n not in other.question: 

303 return False 

304 for n in other.question: 

305 if n not in self.question: 

306 return False 

307 return True 

308 

309 def section_number(self, section: List[dns.rrset.RRset]) -> int: 

310 """Return the "section number" of the specified section for use 

311 in indexing. 

312 

313 *section* is one of the section attributes of this message. 

314 

315 Raises ``ValueError`` if the section isn't known. 

316 

317 Returns an ``int``. 

318 """ 

319 

320 for i, our_section in enumerate(self.sections): 

321 if section is our_section: 

322 return self._section_enum(i) 

323 raise ValueError("unknown section") 

324 

325 def section_from_number(self, number: int) -> List[dns.rrset.RRset]: 

326 """Return the section list associated with the specified section 

327 number. 

328 

329 *number* is a section number `int` or the text form of a section 

330 name. 

331 

332 Raises ``ValueError`` if the section isn't known. 

333 

334 Returns a ``list``. 

335 """ 

336 

337 section = self._section_enum.make(number) 

338 return self.sections[section] 

339 

340 def find_rrset( 

341 self, 

342 section: SectionType, 

343 name: dns.name.Name, 

344 rdclass: dns.rdataclass.RdataClass, 

345 rdtype: dns.rdatatype.RdataType, 

346 covers: dns.rdatatype.RdataType = dns.rdatatype.NONE, 

347 deleting: Optional[dns.rdataclass.RdataClass] = None, 

348 create: bool = False, 

349 force_unique: bool = False, 

350 idna_codec: Optional[dns.name.IDNACodec] = None, 

351 ) -> dns.rrset.RRset: 

352 """Find the RRset with the given attributes in the specified section. 

353 

354 *section*, an ``int`` section number, a ``str`` section name, or one of 

355 the section attributes of this message. This specifies the 

356 the section of the message to search. For example:: 

357 

358 my_message.find_rrset(my_message.answer, name, rdclass, rdtype) 

359 my_message.find_rrset(dns.message.ANSWER, name, rdclass, rdtype) 

360 my_message.find_rrset("ANSWER", name, rdclass, rdtype) 

361 

362 *name*, a ``dns.name.Name`` or ``str``, the name of the RRset. 

363 

364 *rdclass*, an ``int`` or ``str``, the class of the RRset. 

365 

366 *rdtype*, an ``int`` or ``str``, the type of the RRset. 

367 

368 *covers*, an ``int`` or ``str``, the covers value of the RRset. 

369 The default is ``dns.rdatatype.NONE``. 

370 

371 *deleting*, an ``int``, ``str``, or ``None``, the deleting value of the 

372 RRset. The default is ``None``. 

373 

374 *create*, a ``bool``. If ``True``, create the RRset if it is not found. 

375 The created RRset is appended to *section*. 

376 

377 *force_unique*, a ``bool``. If ``True`` and *create* is also ``True``, 

378 create a new RRset regardless of whether a matching RRset exists 

379 already. The default is ``False``. This is useful when creating 

380 DDNS Update messages, as order matters for them. 

381 

382 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA 

383 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder 

384 is used. 

385 

386 Raises ``KeyError`` if the RRset was not found and create was 

387 ``False``. 

388 

389 Returns a ``dns.rrset.RRset object``. 

390 """ 

391 

392 if isinstance(section, int): 

393 section_number = section 

394 section = self.section_from_number(section_number) 

395 elif isinstance(section, str): 

396 section_number = MessageSection.from_text(section) 

397 section = self.section_from_number(section_number) 

398 else: 

399 section_number = self.section_number(section) 

400 if isinstance(name, str): 

401 name = dns.name.from_text(name, idna_codec=idna_codec) 

402 rdtype = dns.rdatatype.RdataType.make(rdtype) 

403 rdclass = dns.rdataclass.RdataClass.make(rdclass) 

404 covers = dns.rdatatype.RdataType.make(covers) 

405 if deleting is not None: 

406 deleting = dns.rdataclass.RdataClass.make(deleting) 

407 key = (section_number, name, rdclass, rdtype, covers, deleting) 

408 if not force_unique: 

409 if self.index is not None: 

410 rrset = self.index.get(key) 

411 if rrset is not None: 

412 return rrset 

413 else: 

414 for rrset in section: 

415 if rrset.full_match(name, rdclass, rdtype, covers, deleting): 

416 return rrset 

417 if not create: 

418 raise KeyError 

419 rrset = dns.rrset.RRset(name, rdclass, rdtype, covers, deleting) 

420 section.append(rrset) 

421 if self.index is not None: 

422 self.index[key] = rrset 

423 return rrset 

424 

425 def get_rrset( 

426 self, 

427 section: SectionType, 

428 name: dns.name.Name, 

429 rdclass: dns.rdataclass.RdataClass, 

430 rdtype: dns.rdatatype.RdataType, 

431 covers: dns.rdatatype.RdataType = dns.rdatatype.NONE, 

432 deleting: Optional[dns.rdataclass.RdataClass] = None, 

433 create: bool = False, 

434 force_unique: bool = False, 

435 idna_codec: Optional[dns.name.IDNACodec] = None, 

436 ) -> Optional[dns.rrset.RRset]: 

437 """Get the RRset with the given attributes in the specified section. 

438 

439 If the RRset is not found, None is returned. 

440 

441 *section*, an ``int`` section number, a ``str`` section name, or one of 

442 the section attributes of this message. This specifies the 

443 the section of the message to search. For example:: 

444 

445 my_message.get_rrset(my_message.answer, name, rdclass, rdtype) 

446 my_message.get_rrset(dns.message.ANSWER, name, rdclass, rdtype) 

447 my_message.get_rrset("ANSWER", name, rdclass, rdtype) 

448 

449 *name*, a ``dns.name.Name`` or ``str``, the name of the RRset. 

450 

451 *rdclass*, an ``int`` or ``str``, the class of the RRset. 

452 

453 *rdtype*, an ``int`` or ``str``, the type of the RRset. 

454 

455 *covers*, an ``int`` or ``str``, the covers value of the RRset. 

456 The default is ``dns.rdatatype.NONE``. 

457 

458 *deleting*, an ``int``, ``str``, or ``None``, the deleting value of the 

459 RRset. The default is ``None``. 

460 

461 *create*, a ``bool``. If ``True``, create the RRset if it is not found. 

462 The created RRset is appended to *section*. 

463 

464 *force_unique*, a ``bool``. If ``True`` and *create* is also ``True``, 

465 create a new RRset regardless of whether a matching RRset exists 

466 already. The default is ``False``. This is useful when creating 

467 DDNS Update messages, as order matters for them. 

468 

469 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA 

470 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder 

471 is used. 

472 

473 Returns a ``dns.rrset.RRset object`` or ``None``. 

474 """ 

475 

476 try: 

477 rrset = self.find_rrset( 

478 section, 

479 name, 

480 rdclass, 

481 rdtype, 

482 covers, 

483 deleting, 

484 create, 

485 force_unique, 

486 idna_codec, 

487 ) 

488 except KeyError: 

489 rrset = None 

490 return rrset 

491 

492 def _compute_opt_reserve(self) -> int: 

493 """Compute the size required for the OPT RR, padding excluded""" 

494 if not self.opt: 

495 return 0 

496 # 1 byte for the root name, 10 for the standard RR fields 

497 size = 11 

498 # This would be more efficient if options had a size() method, but we won't 

499 # worry about that for now. We also don't worry if there is an existing padding 

500 # option, as it is unlikely and probably harmless, as the worst case is that we 

501 # may add another, and this seems to be legal. 

502 for option in self.opt[0].options: 

503 wire = option.to_wire() 

504 # We add 4 here to account for the option type and length 

505 size += len(wire) + 4 

506 if self.pad: 

507 # Padding will be added, so again add the option type and length. 

508 size += 4 

509 return size 

510 

511 def _compute_tsig_reserve(self) -> int: 

512 """Compute the size required for the TSIG RR""" 

513 # This would be more efficient if TSIGs had a size method, but we won't 

514 # worry about for now. Also, we can't really cope with the potential 

515 # compressibility of the TSIG owner name, so we estimate with the uncompressed 

516 # size. We will disable compression when TSIG and padding are both is active 

517 # so that the padding comes out right. 

518 if not self.tsig: 

519 return 0 

520 f = io.BytesIO() 

521 self.tsig.to_wire(f) 

522 return len(f.getvalue()) 

523 

524 def to_wire( 

525 self, 

526 origin: Optional[dns.name.Name] = None, 

527 max_size: int = 0, 

528 multi: bool = False, 

529 tsig_ctx: Optional[Any] = None, 

530 prepend_length: bool = False, 

531 **kw: Dict[str, Any], 

532 ) -> bytes: 

533 """Return a string containing the message in DNS compressed wire 

534 format. 

535 

536 Additional keyword arguments are passed to the RRset ``to_wire()`` 

537 method. 

538 

539 *origin*, a ``dns.name.Name`` or ``None``, the origin to be appended 

540 to any relative names. If ``None``, and the message has an origin 

541 attribute that is not ``None``, then it will be used. 

542 

543 *max_size*, an ``int``, the maximum size of the wire format 

544 output; default is 0, which means "the message's request 

545 payload, if nonzero, or 65535". 

546 

547 *multi*, a ``bool``, should be set to ``True`` if this message is 

548 part of a multiple message sequence. 

549 

550 *tsig_ctx*, a ``dns.tsig.HMACTSig`` or ``dns.tsig.GSSTSig`` object, the 

551 ongoing TSIG context, used when signing zone transfers. 

552 

553 *prepend_length*, a ``bool``, should be set to ``True`` if the caller 

554 wants the message length prepended to the message itself. This is 

555 useful for messages sent over TCP, TLS (DoT), or QUIC (DoQ). 

556 

557 Raises ``dns.exception.TooBig`` if *max_size* was exceeded. 

558 

559 Returns a ``bytes``. 

560 """ 

561 

562 if origin is None and self.origin is not None: 

563 origin = self.origin 

564 if max_size == 0: 

565 if self.request_payload != 0: 

566 max_size = self.request_payload 

567 else: 

568 max_size = 65535 

569 if max_size < 512: 

570 max_size = 512 

571 elif max_size > 65535: 

572 max_size = 65535 

573 r = dns.renderer.Renderer(self.id, self.flags, max_size, origin) 

574 opt_reserve = self._compute_opt_reserve() 

575 r.reserve(opt_reserve) 

576 tsig_reserve = self._compute_tsig_reserve() 

577 r.reserve(tsig_reserve) 

578 for rrset in self.question: 

579 r.add_question(rrset.name, rrset.rdtype, rrset.rdclass) 

580 for rrset in self.answer: 

581 r.add_rrset(dns.renderer.ANSWER, rrset, **kw) 

582 for rrset in self.authority: 

583 r.add_rrset(dns.renderer.AUTHORITY, rrset, **kw) 

584 for rrset in self.additional: 

585 r.add_rrset(dns.renderer.ADDITIONAL, rrset, **kw) 

586 r.release_reserved() 

587 if self.opt is not None: 

588 r.add_opt(self.opt, self.pad, opt_reserve, tsig_reserve) 

589 r.write_header() 

590 if self.tsig is not None: 

591 (new_tsig, ctx) = dns.tsig.sign( 

592 r.get_wire(), 

593 self.keyring, 

594 self.tsig[0], 

595 int(time.time()), 

596 self.request_mac, 

597 tsig_ctx, 

598 multi, 

599 ) 

600 self.tsig.clear() 

601 self.tsig.add(new_tsig) 

602 r.add_rrset(dns.renderer.ADDITIONAL, self.tsig) 

603 r.write_header() 

604 if multi: 

605 self.tsig_ctx = ctx 

606 wire = r.get_wire() 

607 if prepend_length: 

608 wire = len(wire).to_bytes(2, "big") + wire 

609 return wire 

610 

611 @staticmethod 

612 def _make_tsig( 

613 keyname, algorithm, time_signed, fudge, mac, original_id, error, other 

614 ): 

615 tsig = dns.rdtypes.ANY.TSIG.TSIG( 

616 dns.rdataclass.ANY, 

617 dns.rdatatype.TSIG, 

618 algorithm, 

619 time_signed, 

620 fudge, 

621 mac, 

622 original_id, 

623 error, 

624 other, 

625 ) 

626 return dns.rrset.from_rdata(keyname, 0, tsig) 

627 

628 def use_tsig( 

629 self, 

630 keyring: Any, 

631 keyname: Optional[Union[dns.name.Name, str]] = None, 

632 fudge: int = 300, 

633 original_id: Optional[int] = None, 

634 tsig_error: int = 0, 

635 other_data: bytes = b"", 

636 algorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm, 

637 ) -> None: 

638 """When sending, a TSIG signature using the specified key 

639 should be added. 

640 

641 *key*, a ``dns.tsig.Key`` is the key to use. If a key is specified, 

642 the *keyring* and *algorithm* fields are not used. 

643 

644 *keyring*, a ``dict``, ``callable`` or ``dns.tsig.Key``, is either 

645 the TSIG keyring or key to use. 

646 

647 The format of a keyring dict is a mapping from TSIG key name, as 

648 ``dns.name.Name`` to ``dns.tsig.Key`` or a TSIG secret, a ``bytes``. 

649 If a ``dict`` *keyring* is specified but a *keyname* is not, the key 

650 used will be the first key in the *keyring*. Note that the order of 

651 keys in a dictionary is not defined, so applications should supply a 

652 keyname when a ``dict`` keyring is used, unless they know the keyring 

653 contains only one key. If a ``callable`` keyring is specified, the 

654 callable will be called with the message and the keyname, and is 

655 expected to return a key. 

656 

657 *keyname*, a ``dns.name.Name``, ``str`` or ``None``, the name of 

658 this TSIG key to use; defaults to ``None``. If *keyring* is a 

659 ``dict``, the key must be defined in it. If *keyring* is a 

660 ``dns.tsig.Key``, this is ignored. 

661 

662 *fudge*, an ``int``, the TSIG time fudge. 

663 

664 *original_id*, an ``int``, the TSIG original id. If ``None``, 

665 the message's id is used. 

666 

667 *tsig_error*, an ``int``, the TSIG error code. 

668 

669 *other_data*, a ``bytes``, the TSIG other data. 

670 

671 *algorithm*, a ``dns.name.Name`` or ``str``, the TSIG algorithm to use. This is 

672 only used if *keyring* is a ``dict``, and the key entry is a ``bytes``. 

673 """ 

674 

675 if isinstance(keyring, dns.tsig.Key): 

676 key = keyring 

677 keyname = key.name 

678 elif callable(keyring): 

679 key = keyring(self, keyname) 

680 else: 

681 if isinstance(keyname, str): 

682 keyname = dns.name.from_text(keyname) 

683 if keyname is None: 

684 keyname = next(iter(keyring)) 

685 key = keyring[keyname] 

686 if isinstance(key, bytes): 

687 key = dns.tsig.Key(keyname, key, algorithm) 

688 self.keyring = key 

689 if original_id is None: 

690 original_id = self.id 

691 self.tsig = self._make_tsig( 

692 keyname, 

693 self.keyring.algorithm, 

694 0, 

695 fudge, 

696 b"\x00" * dns.tsig.mac_sizes[self.keyring.algorithm], 

697 original_id, 

698 tsig_error, 

699 other_data, 

700 ) 

701 

702 @property 

703 def keyname(self) -> Optional[dns.name.Name]: 

704 if self.tsig: 

705 return self.tsig.name 

706 else: 

707 return None 

708 

709 @property 

710 def keyalgorithm(self) -> Optional[dns.name.Name]: 

711 if self.tsig: 

712 return self.tsig[0].algorithm 

713 else: 

714 return None 

715 

716 @property 

717 def mac(self) -> Optional[bytes]: 

718 if self.tsig: 

719 return self.tsig[0].mac 

720 else: 

721 return None 

722 

723 @property 

724 def tsig_error(self) -> Optional[int]: 

725 if self.tsig: 

726 return self.tsig[0].error 

727 else: 

728 return None 

729 

730 @property 

731 def had_tsig(self) -> bool: 

732 return bool(self.tsig) 

733 

734 @staticmethod 

735 def _make_opt(flags=0, payload=DEFAULT_EDNS_PAYLOAD, options=None): 

736 opt = dns.rdtypes.ANY.OPT.OPT(payload, dns.rdatatype.OPT, options or ()) 

737 return dns.rrset.from_rdata(dns.name.root, int(flags), opt) 

738 

739 def use_edns( 

740 self, 

741 edns: Optional[Union[int, bool]] = 0, 

742 ednsflags: int = 0, 

743 payload: int = DEFAULT_EDNS_PAYLOAD, 

744 request_payload: Optional[int] = None, 

745 options: Optional[List[dns.edns.Option]] = None, 

746 pad: int = 0, 

747 ) -> None: 

748 """Configure EDNS behavior. 

749 

750 *edns*, an ``int``, is the EDNS level to use. Specifying ``None``, ``False``, 

751 or ``-1`` means "do not use EDNS", and in this case the other parameters are 

752 ignored. Specifying ``True`` is equivalent to specifying 0, i.e. "use EDNS0". 

753 

754 *ednsflags*, an ``int``, the EDNS flag values. 

755 

756 *payload*, an ``int``, is the EDNS sender's payload field, which is the maximum 

757 size of UDP datagram the sender can handle. I.e. how big a response to this 

758 message can be. 

759 

760 *request_payload*, an ``int``, is the EDNS payload size to use when sending this 

761 message. If not specified, defaults to the value of *payload*. 

762 

763 *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS options. 

764 

765 *pad*, a non-negative ``int``. If 0, the default, do not pad; otherwise add 

766 padding bytes to make the message size a multiple of *pad*. Note that if 

767 padding is non-zero, an EDNS PADDING option will always be added to the 

768 message. 

769 """ 

770 

771 if edns is None or edns is False: 

772 edns = -1 

773 elif edns is True: 

774 edns = 0 

775 if edns < 0: 

776 self.opt = None 

777 self.request_payload = 0 

778 else: 

779 # make sure the EDNS version in ednsflags agrees with edns 

780 ednsflags &= 0xFF00FFFF 

781 ednsflags |= edns << 16 

782 if options is None: 

783 options = [] 

784 self.opt = self._make_opt(ednsflags, payload, options) 

785 if request_payload is None: 

786 request_payload = payload 

787 self.request_payload = request_payload 

788 self.pad = pad 

789 

790 @property 

791 def edns(self) -> int: 

792 if self.opt: 

793 return (self.ednsflags & 0xFF0000) >> 16 

794 else: 

795 return -1 

796 

797 @property 

798 def ednsflags(self) -> int: 

799 if self.opt: 

800 return self.opt.ttl 

801 else: 

802 return 0 

803 

804 @ednsflags.setter 

805 def ednsflags(self, v): 

806 if self.opt: 

807 self.opt.ttl = v 

808 elif v: 

809 self.opt = self._make_opt(v) 

810 

811 @property 

812 def payload(self) -> int: 

813 if self.opt: 

814 return self.opt[0].payload 

815 else: 

816 return 0 

817 

818 @property 

819 def options(self) -> Tuple: 

820 if self.opt: 

821 return self.opt[0].options 

822 else: 

823 return () 

824 

825 def want_dnssec(self, wanted: bool = True) -> None: 

826 """Enable or disable 'DNSSEC desired' flag in requests. 

827 

828 *wanted*, a ``bool``. If ``True``, then DNSSEC data is 

829 desired in the response, EDNS is enabled if required, and then 

830 the DO bit is set. If ``False``, the DO bit is cleared if 

831 EDNS is enabled. 

832 """ 

833 

834 if wanted: 

835 self.ednsflags |= dns.flags.DO 

836 elif self.opt: 

837 self.ednsflags &= ~int(dns.flags.DO) 

838 

839 def rcode(self) -> dns.rcode.Rcode: 

840 """Return the rcode. 

841 

842 Returns a ``dns.rcode.Rcode``. 

843 """ 

844 return dns.rcode.from_flags(int(self.flags), int(self.ednsflags)) 

845 

846 def set_rcode(self, rcode: dns.rcode.Rcode) -> None: 

847 """Set the rcode. 

848 

849 *rcode*, a ``dns.rcode.Rcode``, is the rcode to set. 

850 """ 

851 (value, evalue) = dns.rcode.to_flags(rcode) 

852 self.flags &= 0xFFF0 

853 self.flags |= value 

854 self.ednsflags &= 0x00FFFFFF 

855 self.ednsflags |= evalue 

856 

857 def opcode(self) -> dns.opcode.Opcode: 

858 """Return the opcode. 

859 

860 Returns a ``dns.opcode.Opcode``. 

861 """ 

862 return dns.opcode.from_flags(int(self.flags)) 

863 

864 def set_opcode(self, opcode: dns.opcode.Opcode) -> None: 

865 """Set the opcode. 

866 

867 *opcode*, a ``dns.opcode.Opcode``, is the opcode to set. 

868 """ 

869 self.flags &= 0x87FF 

870 self.flags |= dns.opcode.to_flags(opcode) 

871 

872 def _get_one_rr_per_rrset(self, value): 

873 # What the caller picked is fine. 

874 return value 

875 

876 # pylint: disable=unused-argument 

877 

878 def _parse_rr_header(self, section, name, rdclass, rdtype): 

879 return (rdclass, rdtype, None, False) 

880 

881 # pylint: enable=unused-argument 

882 

883 def _parse_special_rr_header(self, section, count, position, name, rdclass, rdtype): 

884 if rdtype == dns.rdatatype.OPT: 

885 if ( 

886 section != MessageSection.ADDITIONAL 

887 or self.opt 

888 or name != dns.name.root 

889 ): 

890 raise BadEDNS 

891 elif rdtype == dns.rdatatype.TSIG: 

892 if ( 

893 section != MessageSection.ADDITIONAL 

894 or rdclass != dns.rdatatype.ANY 

895 or position != count - 1 

896 ): 

897 raise BadTSIG 

898 return (rdclass, rdtype, None, False) 

899 

900 

901class ChainingResult: 

902 """The result of a call to dns.message.QueryMessage.resolve_chaining(). 

903 

904 The ``answer`` attribute is the answer RRSet, or ``None`` if it doesn't 

905 exist. 

906 

907 The ``canonical_name`` attribute is the canonical name after all 

908 chaining has been applied (this is the same name as ``rrset.name`` in cases 

909 where rrset is not ``None``). 

910 

911 The ``minimum_ttl`` attribute is the minimum TTL, i.e. the TTL to 

912 use if caching the data. It is the smallest of all the CNAME TTLs 

913 and either the answer TTL if it exists or the SOA TTL and SOA 

914 minimum values for negative answers. 

915 

916 The ``cnames`` attribute is a list of all the CNAME RRSets followed to 

917 get to the canonical name. 

918 """ 

919 

920 def __init__( 

921 self, 

922 canonical_name: dns.name.Name, 

923 answer: Optional[dns.rrset.RRset], 

924 minimum_ttl: int, 

925 cnames: List[dns.rrset.RRset], 

926 ): 

927 self.canonical_name = canonical_name 

928 self.answer = answer 

929 self.minimum_ttl = minimum_ttl 

930 self.cnames = cnames 

931 

932 

933class QueryMessage(Message): 

934 def resolve_chaining(self) -> ChainingResult: 

935 """Follow the CNAME chain in the response to determine the answer 

936 RRset. 

937 

938 Raises ``dns.message.NotQueryResponse`` if the message is not 

939 a response. 

940 

941 Raises ``dns.message.ChainTooLong`` if the CNAME chain is too long. 

942 

943 Raises ``dns.message.AnswerForNXDOMAIN`` if the rcode is NXDOMAIN 

944 but an answer was found. 

945 

946 Raises ``dns.exception.FormError`` if the question count is not 1. 

947 

948 Returns a ChainingResult object. 

949 """ 

950 if self.flags & dns.flags.QR == 0: 

951 raise NotQueryResponse 

952 if len(self.question) != 1: 

953 raise dns.exception.FormError 

954 question = self.question[0] 

955 qname = question.name 

956 min_ttl = dns.ttl.MAX_TTL 

957 answer = None 

958 count = 0 

959 cnames = [] 

960 while count < MAX_CHAIN: 

961 try: 

962 answer = self.find_rrset( 

963 self.answer, qname, question.rdclass, question.rdtype 

964 ) 

965 min_ttl = min(min_ttl, answer.ttl) 

966 break 

967 except KeyError: 

968 if question.rdtype != dns.rdatatype.CNAME: 

969 try: 

970 crrset = self.find_rrset( 

971 self.answer, qname, question.rdclass, dns.rdatatype.CNAME 

972 ) 

973 cnames.append(crrset) 

974 min_ttl = min(min_ttl, crrset.ttl) 

975 for rd in crrset: 

976 qname = rd.target 

977 break 

978 count += 1 

979 continue 

980 except KeyError: 

981 # Exit the chaining loop 

982 break 

983 else: 

984 # Exit the chaining loop 

985 break 

986 if count >= MAX_CHAIN: 

987 raise ChainTooLong 

988 if self.rcode() == dns.rcode.NXDOMAIN and answer is not None: 

989 raise AnswerForNXDOMAIN 

990 if answer is None: 

991 # Further minimize the TTL with NCACHE. 

992 auname = qname 

993 while True: 

994 # Look for an SOA RR whose owner name is a superdomain 

995 # of qname. 

996 try: 

997 srrset = self.find_rrset( 

998 self.authority, auname, question.rdclass, dns.rdatatype.SOA 

999 ) 

1000 min_ttl = min(min_ttl, srrset.ttl, srrset[0].minimum) 

1001 break 

1002 except KeyError: 

1003 try: 

1004 auname = auname.parent() 

1005 except dns.name.NoParent: 

1006 break 

1007 return ChainingResult(qname, answer, min_ttl, cnames) 

1008 

1009 def canonical_name(self) -> dns.name.Name: 

1010 """Return the canonical name of the first name in the question 

1011 section. 

1012 

1013 Raises ``dns.message.NotQueryResponse`` if the message is not 

1014 a response. 

1015 

1016 Raises ``dns.message.ChainTooLong`` if the CNAME chain is too long. 

1017 

1018 Raises ``dns.message.AnswerForNXDOMAIN`` if the rcode is NXDOMAIN 

1019 but an answer was found. 

1020 

1021 Raises ``dns.exception.FormError`` if the question count is not 1. 

1022 """ 

1023 return self.resolve_chaining().canonical_name 

1024 

1025 

1026def _maybe_import_update(): 

1027 # We avoid circular imports by doing this here. We do it in another 

1028 # function as doing it in _message_factory_from_opcode() makes "dns" 

1029 # a local symbol, and the first line fails :) 

1030 

1031 # pylint: disable=redefined-outer-name,import-outside-toplevel,unused-import 

1032 import dns.update # noqa: F401 

1033 

1034 

1035def _message_factory_from_opcode(opcode): 

1036 if opcode == dns.opcode.QUERY: 

1037 return QueryMessage 

1038 elif opcode == dns.opcode.UPDATE: 

1039 _maybe_import_update() 

1040 return dns.update.UpdateMessage 

1041 else: 

1042 return Message 

1043 

1044 

1045class _WireReader: 

1046 

1047 """Wire format reader. 

1048 

1049 parser: the binary parser 

1050 message: The message object being built 

1051 initialize_message: Callback to set message parsing options 

1052 question_only: Are we only reading the question? 

1053 one_rr_per_rrset: Put each RR into its own RRset? 

1054 keyring: TSIG keyring 

1055 ignore_trailing: Ignore trailing junk at end of request? 

1056 multi: Is this message part of a multi-message sequence? 

1057 DNS dynamic updates. 

1058 continue_on_error: try to extract as much information as possible from 

1059 the message, accumulating MessageErrors in the *errors* attribute instead of 

1060 raising them. 

1061 """ 

1062 

1063 def __init__( 

1064 self, 

1065 wire, 

1066 initialize_message, 

1067 question_only=False, 

1068 one_rr_per_rrset=False, 

1069 ignore_trailing=False, 

1070 keyring=None, 

1071 multi=False, 

1072 continue_on_error=False, 

1073 ): 

1074 self.parser = dns.wire.Parser(wire) 

1075 self.message = None 

1076 self.initialize_message = initialize_message 

1077 self.question_only = question_only 

1078 self.one_rr_per_rrset = one_rr_per_rrset 

1079 self.ignore_trailing = ignore_trailing 

1080 self.keyring = keyring 

1081 self.multi = multi 

1082 self.continue_on_error = continue_on_error 

1083 self.errors = [] 

1084 

1085 def _get_question(self, section_number, qcount): 

1086 """Read the next *qcount* records from the wire data and add them to 

1087 the question section. 

1088 """ 

1089 assert self.message is not None 

1090 section = self.message.sections[section_number] 

1091 for _ in range(qcount): 

1092 qname = self.parser.get_name(self.message.origin) 

1093 (rdtype, rdclass) = self.parser.get_struct("!HH") 

1094 (rdclass, rdtype, _, _) = self.message._parse_rr_header( 

1095 section_number, qname, rdclass, rdtype 

1096 ) 

1097 self.message.find_rrset( 

1098 section, qname, rdclass, rdtype, create=True, force_unique=True 

1099 ) 

1100 

1101 def _add_error(self, e): 

1102 self.errors.append(MessageError(e, self.parser.current)) 

1103 

1104 def _get_section(self, section_number, count): 

1105 """Read the next I{count} records from the wire data and add them to 

1106 the specified section. 

1107 

1108 section_number: the section of the message to which to add records 

1109 count: the number of records to read 

1110 """ 

1111 assert self.message is not None 

1112 section = self.message.sections[section_number] 

1113 force_unique = self.one_rr_per_rrset 

1114 for i in range(count): 

1115 rr_start = self.parser.current 

1116 absolute_name = self.parser.get_name() 

1117 if self.message.origin is not None: 

1118 name = absolute_name.relativize(self.message.origin) 

1119 else: 

1120 name = absolute_name 

1121 (rdtype, rdclass, ttl, rdlen) = self.parser.get_struct("!HHIH") 

1122 if rdtype in (dns.rdatatype.OPT, dns.rdatatype.TSIG): 

1123 ( 

1124 rdclass, 

1125 rdtype, 

1126 deleting, 

1127 empty, 

1128 ) = self.message._parse_special_rr_header( 

1129 section_number, count, i, name, rdclass, rdtype 

1130 ) 

1131 else: 

1132 (rdclass, rdtype, deleting, empty) = self.message._parse_rr_header( 

1133 section_number, name, rdclass, rdtype 

1134 ) 

1135 rdata_start = self.parser.current 

1136 try: 

1137 if empty: 

1138 if rdlen > 0: 

1139 raise dns.exception.FormError 

1140 rd = None 

1141 covers = dns.rdatatype.NONE 

1142 else: 

1143 with self.parser.restrict_to(rdlen): 

1144 rd = dns.rdata.from_wire_parser( 

1145 rdclass, rdtype, self.parser, self.message.origin 

1146 ) 

1147 covers = rd.covers() 

1148 if self.message.xfr and rdtype == dns.rdatatype.SOA: 

1149 force_unique = True 

1150 if rdtype == dns.rdatatype.OPT: 

1151 self.message.opt = dns.rrset.from_rdata(name, ttl, rd) 

1152 elif rdtype == dns.rdatatype.TSIG: 

1153 if self.keyring is None: 

1154 raise UnknownTSIGKey("got signed message without keyring") 

1155 if isinstance(self.keyring, dict): 

1156 key = self.keyring.get(absolute_name) 

1157 if isinstance(key, bytes): 

1158 key = dns.tsig.Key(absolute_name, key, rd.algorithm) 

1159 elif callable(self.keyring): 

1160 key = self.keyring(self.message, absolute_name) 

1161 else: 

1162 key = self.keyring 

1163 if key is None: 

1164 raise UnknownTSIGKey("key '%s' unknown" % name) 

1165 self.message.keyring = key 

1166 self.message.tsig_ctx = dns.tsig.validate( 

1167 self.parser.wire, 

1168 key, 

1169 absolute_name, 

1170 rd, 

1171 int(time.time()), 

1172 self.message.request_mac, 

1173 rr_start, 

1174 self.message.tsig_ctx, 

1175 self.multi, 

1176 ) 

1177 self.message.tsig = dns.rrset.from_rdata(absolute_name, 0, rd) 

1178 else: 

1179 rrset = self.message.find_rrset( 

1180 section, 

1181 name, 

1182 rdclass, 

1183 rdtype, 

1184 covers, 

1185 deleting, 

1186 True, 

1187 force_unique, 

1188 ) 

1189 if rd is not None: 

1190 if ttl > 0x7FFFFFFF: 

1191 ttl = 0 

1192 rrset.add(rd, ttl) 

1193 except Exception as e: 

1194 if self.continue_on_error: 

1195 self._add_error(e) 

1196 self.parser.seek(rdata_start + rdlen) 

1197 else: 

1198 raise 

1199 

1200 def read(self): 

1201 """Read a wire format DNS message and build a dns.message.Message 

1202 object.""" 

1203 

1204 if self.parser.remaining() < 12: 

1205 raise ShortHeader 

1206 (id, flags, qcount, ancount, aucount, adcount) = self.parser.get_struct( 

1207 "!HHHHHH" 

1208 ) 

1209 factory = _message_factory_from_opcode(dns.opcode.from_flags(flags)) 

1210 self.message = factory(id=id) 

1211 self.message.flags = dns.flags.Flag(flags) 

1212 self.initialize_message(self.message) 

1213 self.one_rr_per_rrset = self.message._get_one_rr_per_rrset( 

1214 self.one_rr_per_rrset 

1215 ) 

1216 try: 

1217 self._get_question(MessageSection.QUESTION, qcount) 

1218 if self.question_only: 

1219 return self.message 

1220 self._get_section(MessageSection.ANSWER, ancount) 

1221 self._get_section(MessageSection.AUTHORITY, aucount) 

1222 self._get_section(MessageSection.ADDITIONAL, adcount) 

1223 if not self.ignore_trailing and self.parser.remaining() != 0: 

1224 raise TrailingJunk 

1225 if self.multi and self.message.tsig_ctx and not self.message.had_tsig: 

1226 self.message.tsig_ctx.update(self.parser.wire) 

1227 except Exception as e: 

1228 if self.continue_on_error: 

1229 self._add_error(e) 

1230 else: 

1231 raise 

1232 return self.message 

1233 

1234 

1235def from_wire( 

1236 wire: bytes, 

1237 keyring: Optional[Any] = None, 

1238 request_mac: Optional[bytes] = b"", 

1239 xfr: bool = False, 

1240 origin: Optional[dns.name.Name] = None, 

1241 tsig_ctx: Optional[Union[dns.tsig.HMACTSig, dns.tsig.GSSTSig]] = None, 

1242 multi: bool = False, 

1243 question_only: bool = False, 

1244 one_rr_per_rrset: bool = False, 

1245 ignore_trailing: bool = False, 

1246 raise_on_truncation: bool = False, 

1247 continue_on_error: bool = False, 

1248) -> Message: 

1249 """Convert a DNS wire format message into a message object. 

1250 

1251 *keyring*, a ``dns.tsig.Key`` or ``dict``, the key or keyring to use if the message 

1252 is signed. 

1253 

1254 *request_mac*, a ``bytes`` or ``None``. If the message is a response to a 

1255 TSIG-signed request, *request_mac* should be set to the MAC of that request. 

1256 

1257 *xfr*, a ``bool``, should be set to ``True`` if this message is part of a zone 

1258 transfer. 

1259 

1260 *origin*, a ``dns.name.Name`` or ``None``. If the message is part of a zone 

1261 transfer, *origin* should be the origin name of the zone. If not ``None``, names 

1262 will be relativized to the origin. 

1263 

1264 *tsig_ctx*, a ``dns.tsig.HMACTSig`` or ``dns.tsig.GSSTSig`` object, the ongoing TSIG 

1265 context, used when validating zone transfers. 

1266 

1267 *multi*, a ``bool``, should be set to ``True`` if this message is part of a multiple 

1268 message sequence. 

1269 

1270 *question_only*, a ``bool``. If ``True``, read only up to the end of the question 

1271 section. 

1272 

1273 *one_rr_per_rrset*, a ``bool``. If ``True``, put each RR into its own RRset. 

1274 

1275 *ignore_trailing*, a ``bool``. If ``True``, ignore trailing junk at end of the 

1276 message. 

1277 

1278 *raise_on_truncation*, a ``bool``. If ``True``, raise an exception if the TC bit is 

1279 set. 

1280 

1281 *continue_on_error*, a ``bool``. If ``True``, try to continue parsing even if 

1282 errors occur. Erroneous rdata will be ignored. Errors will be accumulated as a 

1283 list of MessageError objects in the message's ``errors`` attribute. This option is 

1284 recommended only for DNS analysis tools, or for use in a server as part of an error 

1285 handling path. The default is ``False``. 

1286 

1287 Raises ``dns.message.ShortHeader`` if the message is less than 12 octets long. 

1288 

1289 Raises ``dns.message.TrailingJunk`` if there were octets in the message past the end 

1290 of the proper DNS message, and *ignore_trailing* is ``False``. 

1291 

1292 Raises ``dns.message.BadEDNS`` if an OPT record was in the wrong section, or 

1293 occurred more than once. 

1294 

1295 Raises ``dns.message.BadTSIG`` if a TSIG record was not the last record of the 

1296 additional data section. 

1297 

1298 Raises ``dns.message.Truncated`` if the TC flag is set and *raise_on_truncation* is 

1299 ``True``. 

1300 

1301 Returns a ``dns.message.Message``. 

1302 """ 

1303 

1304 # We permit None for request_mac solely for backwards compatibility 

1305 if request_mac is None: 

1306 request_mac = b"" 

1307 

1308 def initialize_message(message): 

1309 message.request_mac = request_mac 

1310 message.xfr = xfr 

1311 message.origin = origin 

1312 message.tsig_ctx = tsig_ctx 

1313 

1314 reader = _WireReader( 

1315 wire, 

1316 initialize_message, 

1317 question_only, 

1318 one_rr_per_rrset, 

1319 ignore_trailing, 

1320 keyring, 

1321 multi, 

1322 continue_on_error, 

1323 ) 

1324 try: 

1325 m = reader.read() 

1326 except dns.exception.FormError: 

1327 if ( 

1328 reader.message 

1329 and (reader.message.flags & dns.flags.TC) 

1330 and raise_on_truncation 

1331 ): 

1332 raise Truncated(message=reader.message) 

1333 else: 

1334 raise 

1335 # Reading a truncated message might not have any errors, so we 

1336 # have to do this check here too. 

1337 if m.flags & dns.flags.TC and raise_on_truncation: 

1338 raise Truncated(message=m) 

1339 if continue_on_error: 

1340 m.errors = reader.errors 

1341 

1342 return m 

1343 

1344 

1345class _TextReader: 

1346 

1347 """Text format reader. 

1348 

1349 tok: the tokenizer. 

1350 message: The message object being built. 

1351 DNS dynamic updates. 

1352 last_name: The most recently read name when building a message object. 

1353 one_rr_per_rrset: Put each RR into its own RRset? 

1354 origin: The origin for relative names 

1355 relativize: relativize names? 

1356 relativize_to: the origin to relativize to. 

1357 """ 

1358 

1359 def __init__( 

1360 self, 

1361 text, 

1362 idna_codec, 

1363 one_rr_per_rrset=False, 

1364 origin=None, 

1365 relativize=True, 

1366 relativize_to=None, 

1367 ): 

1368 self.message = None 

1369 self.tok = dns.tokenizer.Tokenizer(text, idna_codec=idna_codec) 

1370 self.last_name = None 

1371 self.one_rr_per_rrset = one_rr_per_rrset 

1372 self.origin = origin 

1373 self.relativize = relativize 

1374 self.relativize_to = relativize_to 

1375 self.id = None 

1376 self.edns = -1 

1377 self.ednsflags = 0 

1378 self.payload = DEFAULT_EDNS_PAYLOAD 

1379 self.rcode = None 

1380 self.opcode = dns.opcode.QUERY 

1381 self.flags = 0 

1382 

1383 def _header_line(self, _): 

1384 """Process one line from the text format header section.""" 

1385 

1386 token = self.tok.get() 

1387 what = token.value 

1388 if what == "id": 

1389 self.id = self.tok.get_int() 

1390 elif what == "flags": 

1391 while True: 

1392 token = self.tok.get() 

1393 if not token.is_identifier(): 

1394 self.tok.unget(token) 

1395 break 

1396 self.flags = self.flags | dns.flags.from_text(token.value) 

1397 elif what == "edns": 

1398 self.edns = self.tok.get_int() 

1399 self.ednsflags = self.ednsflags | (self.edns << 16) 

1400 elif what == "eflags": 

1401 if self.edns < 0: 

1402 self.edns = 0 

1403 while True: 

1404 token = self.tok.get() 

1405 if not token.is_identifier(): 

1406 self.tok.unget(token) 

1407 break 

1408 self.ednsflags = self.ednsflags | dns.flags.edns_from_text(token.value) 

1409 elif what == "payload": 

1410 self.payload = self.tok.get_int() 

1411 if self.edns < 0: 

1412 self.edns = 0 

1413 elif what == "opcode": 

1414 text = self.tok.get_string() 

1415 self.opcode = dns.opcode.from_text(text) 

1416 self.flags = self.flags | dns.opcode.to_flags(self.opcode) 

1417 elif what == "rcode": 

1418 text = self.tok.get_string() 

1419 self.rcode = dns.rcode.from_text(text) 

1420 else: 

1421 raise UnknownHeaderField 

1422 self.tok.get_eol() 

1423 

1424 def _question_line(self, section_number): 

1425 """Process one line from the text format question section.""" 

1426 

1427 section = self.message.sections[section_number] 

1428 token = self.tok.get(want_leading=True) 

1429 if not token.is_whitespace(): 

1430 self.last_name = self.tok.as_name( 

1431 token, self.message.origin, self.relativize, self.relativize_to 

1432 ) 

1433 name = self.last_name 

1434 if name is None: 

1435 raise NoPreviousName 

1436 token = self.tok.get() 

1437 if not token.is_identifier(): 

1438 raise dns.exception.SyntaxError 

1439 # Class 

1440 try: 

1441 rdclass = dns.rdataclass.from_text(token.value) 

1442 token = self.tok.get() 

1443 if not token.is_identifier(): 

1444 raise dns.exception.SyntaxError 

1445 except dns.exception.SyntaxError: 

1446 raise dns.exception.SyntaxError 

1447 except Exception: 

1448 rdclass = dns.rdataclass.IN 

1449 # Type 

1450 rdtype = dns.rdatatype.from_text(token.value) 

1451 (rdclass, rdtype, _, _) = self.message._parse_rr_header( 

1452 section_number, name, rdclass, rdtype 

1453 ) 

1454 self.message.find_rrset( 

1455 section, name, rdclass, rdtype, create=True, force_unique=True 

1456 ) 

1457 self.tok.get_eol() 

1458 

1459 def _rr_line(self, section_number): 

1460 """Process one line from the text format answer, authority, or 

1461 additional data sections. 

1462 """ 

1463 

1464 section = self.message.sections[section_number] 

1465 # Name 

1466 token = self.tok.get(want_leading=True) 

1467 if not token.is_whitespace(): 

1468 self.last_name = self.tok.as_name( 

1469 token, self.message.origin, self.relativize, self.relativize_to 

1470 ) 

1471 name = self.last_name 

1472 if name is None: 

1473 raise NoPreviousName 

1474 token = self.tok.get() 

1475 if not token.is_identifier(): 

1476 raise dns.exception.SyntaxError 

1477 # TTL 

1478 try: 

1479 ttl = int(token.value, 0) 

1480 token = self.tok.get() 

1481 if not token.is_identifier(): 

1482 raise dns.exception.SyntaxError 

1483 except dns.exception.SyntaxError: 

1484 raise dns.exception.SyntaxError 

1485 except Exception: 

1486 ttl = 0 

1487 # Class 

1488 try: 

1489 rdclass = dns.rdataclass.from_text(token.value) 

1490 token = self.tok.get() 

1491 if not token.is_identifier(): 

1492 raise dns.exception.SyntaxError 

1493 except dns.exception.SyntaxError: 

1494 raise dns.exception.SyntaxError 

1495 except Exception: 

1496 rdclass = dns.rdataclass.IN 

1497 # Type 

1498 rdtype = dns.rdatatype.from_text(token.value) 

1499 (rdclass, rdtype, deleting, empty) = self.message._parse_rr_header( 

1500 section_number, name, rdclass, rdtype 

1501 ) 

1502 token = self.tok.get() 

1503 if empty and not token.is_eol_or_eof(): 

1504 raise dns.exception.SyntaxError 

1505 if not empty and token.is_eol_or_eof(): 

1506 raise dns.exception.UnexpectedEnd 

1507 if not token.is_eol_or_eof(): 

1508 self.tok.unget(token) 

1509 rd = dns.rdata.from_text( 

1510 rdclass, 

1511 rdtype, 

1512 self.tok, 

1513 self.message.origin, 

1514 self.relativize, 

1515 self.relativize_to, 

1516 ) 

1517 covers = rd.covers() 

1518 else: 

1519 rd = None 

1520 covers = dns.rdatatype.NONE 

1521 rrset = self.message.find_rrset( 

1522 section, 

1523 name, 

1524 rdclass, 

1525 rdtype, 

1526 covers, 

1527 deleting, 

1528 True, 

1529 self.one_rr_per_rrset, 

1530 ) 

1531 if rd is not None: 

1532 rrset.add(rd, ttl) 

1533 

1534 def _make_message(self): 

1535 factory = _message_factory_from_opcode(self.opcode) 

1536 message = factory(id=self.id) 

1537 message.flags = self.flags 

1538 if self.edns >= 0: 

1539 message.use_edns(self.edns, self.ednsflags, self.payload) 

1540 if self.rcode: 

1541 message.set_rcode(self.rcode) 

1542 if self.origin: 

1543 message.origin = self.origin 

1544 return message 

1545 

1546 def read(self): 

1547 """Read a text format DNS message and build a dns.message.Message 

1548 object.""" 

1549 

1550 line_method = self._header_line 

1551 section_number = None 

1552 while 1: 

1553 token = self.tok.get(True, True) 

1554 if token.is_eol_or_eof(): 

1555 break 

1556 if token.is_comment(): 

1557 u = token.value.upper() 

1558 if u == "HEADER": 

1559 line_method = self._header_line 

1560 

1561 if self.message: 

1562 message = self.message 

1563 else: 

1564 # If we don't have a message, create one with the current 

1565 # opcode, so that we know which section names to parse. 

1566 message = self._make_message() 

1567 try: 

1568 section_number = message._section_enum.from_text(u) 

1569 # We found a section name. If we don't have a message, 

1570 # use the one we just created. 

1571 if not self.message: 

1572 self.message = message 

1573 self.one_rr_per_rrset = message._get_one_rr_per_rrset( 

1574 self.one_rr_per_rrset 

1575 ) 

1576 if section_number == MessageSection.QUESTION: 

1577 line_method = self._question_line 

1578 else: 

1579 line_method = self._rr_line 

1580 except Exception: 

1581 # It's just a comment. 

1582 pass 

1583 self.tok.get_eol() 

1584 continue 

1585 self.tok.unget(token) 

1586 line_method(section_number) 

1587 if not self.message: 

1588 self.message = self._make_message() 

1589 return self.message 

1590 

1591 

1592def from_text( 

1593 text: str, 

1594 idna_codec: Optional[dns.name.IDNACodec] = None, 

1595 one_rr_per_rrset: bool = False, 

1596 origin: Optional[dns.name.Name] = None, 

1597 relativize: bool = True, 

1598 relativize_to: Optional[dns.name.Name] = None, 

1599) -> Message: 

1600 """Convert the text format message into a message object. 

1601 

1602 The reader stops after reading the first blank line in the input to 

1603 facilitate reading multiple messages from a single file with 

1604 ``dns.message.from_file()``. 

1605 

1606 *text*, a ``str``, the text format message. 

1607 

1608 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA 

1609 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder 

1610 is used. 

1611 

1612 *one_rr_per_rrset*, a ``bool``. If ``True``, then each RR is put 

1613 into its own rrset. The default is ``False``. 

1614 

1615 *origin*, a ``dns.name.Name`` (or ``None``), the 

1616 origin to use for relative names. 

1617 

1618 *relativize*, a ``bool``. If true, name will be relativized. 

1619 

1620 *relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use 

1621 when relativizing names. If not set, the *origin* value will be used. 

1622 

1623 Raises ``dns.message.UnknownHeaderField`` if a header is unknown. 

1624 

1625 Raises ``dns.exception.SyntaxError`` if the text is badly formed. 

1626 

1627 Returns a ``dns.message.Message object`` 

1628 """ 

1629 

1630 # 'text' can also be a file, but we don't publish that fact 

1631 # since it's an implementation detail. The official file 

1632 # interface is from_file(). 

1633 

1634 reader = _TextReader( 

1635 text, idna_codec, one_rr_per_rrset, origin, relativize, relativize_to 

1636 ) 

1637 return reader.read() 

1638 

1639 

1640def from_file( 

1641 f: Any, 

1642 idna_codec: Optional[dns.name.IDNACodec] = None, 

1643 one_rr_per_rrset: bool = False, 

1644) -> Message: 

1645 """Read the next text format message from the specified file. 

1646 

1647 Message blocks are separated by a single blank line. 

1648 

1649 *f*, a ``file`` or ``str``. If *f* is text, it is treated as the 

1650 pathname of a file to open. 

1651 

1652 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA 

1653 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder 

1654 is used. 

1655 

1656 *one_rr_per_rrset*, a ``bool``. If ``True``, then each RR is put 

1657 into its own rrset. The default is ``False``. 

1658 

1659 Raises ``dns.message.UnknownHeaderField`` if a header is unknown. 

1660 

1661 Raises ``dns.exception.SyntaxError`` if the text is badly formed. 

1662 

1663 Returns a ``dns.message.Message object`` 

1664 """ 

1665 

1666 if isinstance(f, str): 

1667 cm: contextlib.AbstractContextManager = open(f) 

1668 else: 

1669 cm = contextlib.nullcontext(f) 

1670 with cm as f: 

1671 return from_text(f, idna_codec, one_rr_per_rrset) 

1672 assert False # for mypy lgtm[py/unreachable-statement] 

1673 

1674 

1675def make_query( 

1676 qname: Union[dns.name.Name, str], 

1677 rdtype: Union[dns.rdatatype.RdataType, str], 

1678 rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, 

1679 use_edns: Optional[Union[int, bool]] = None, 

1680 want_dnssec: bool = False, 

1681 ednsflags: Optional[int] = None, 

1682 payload: Optional[int] = None, 

1683 request_payload: Optional[int] = None, 

1684 options: Optional[List[dns.edns.Option]] = None, 

1685 idna_codec: Optional[dns.name.IDNACodec] = None, 

1686 id: Optional[int] = None, 

1687 flags: int = dns.flags.RD, 

1688 pad: int = 0, 

1689) -> QueryMessage: 

1690 """Make a query message. 

1691 

1692 The query name, type, and class may all be specified either 

1693 as objects of the appropriate type, or as strings. 

1694 

1695 The query will have a randomly chosen query id, and its DNS flags 

1696 will be set to dns.flags.RD. 

1697 

1698 qname, a ``dns.name.Name`` or ``str``, the query name. 

1699 

1700 *rdtype*, an ``int`` or ``str``, the desired rdata type. 

1701 

1702 *rdclass*, an ``int`` or ``str``, the desired rdata class; the default 

1703 is class IN. 

1704 

1705 *use_edns*, an ``int``, ``bool`` or ``None``. The EDNS level to use; the 

1706 default is ``None``. If ``None``, EDNS will be enabled only if other 

1707 parameters (*ednsflags*, *payload*, *request_payload*, or *options*) are 

1708 set. 

1709 See the description of dns.message.Message.use_edns() for the possible 

1710 values for use_edns and their meanings. 

1711 

1712 *want_dnssec*, a ``bool``. If ``True``, DNSSEC data is desired. 

1713 

1714 *ednsflags*, an ``int``, the EDNS flag values. 

1715 

1716 *payload*, an ``int``, is the EDNS sender's payload field, which is the 

1717 maximum size of UDP datagram the sender can handle. I.e. how big 

1718 a response to this message can be. 

1719 

1720 *request_payload*, an ``int``, is the EDNS payload size to use when 

1721 sending this message. If not specified, defaults to the value of 

1722 *payload*. 

1723 

1724 *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS 

1725 options. 

1726 

1727 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA 

1728 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder 

1729 is used. 

1730 

1731 *id*, an ``int`` or ``None``, the desired query id. The default is 

1732 ``None``, which generates a random query id. 

1733 

1734 *flags*, an ``int``, the desired query flags. The default is 

1735 ``dns.flags.RD``. 

1736 

1737 *pad*, a non-negative ``int``. If 0, the default, do not pad; otherwise add 

1738 padding bytes to make the message size a multiple of *pad*. Note that if 

1739 padding is non-zero, an EDNS PADDING option will always be added to the 

1740 message. 

1741 

1742 Returns a ``dns.message.QueryMessage`` 

1743 """ 

1744 

1745 if isinstance(qname, str): 

1746 qname = dns.name.from_text(qname, idna_codec=idna_codec) 

1747 rdtype = dns.rdatatype.RdataType.make(rdtype) 

1748 rdclass = dns.rdataclass.RdataClass.make(rdclass) 

1749 m = QueryMessage(id=id) 

1750 m.flags = dns.flags.Flag(flags) 

1751 m.find_rrset(m.question, qname, rdclass, rdtype, create=True, force_unique=True) 

1752 # only pass keywords on to use_edns if they have been set to a 

1753 # non-None value. Setting a field will turn EDNS on if it hasn't 

1754 # been configured. 

1755 kwargs: Dict[str, Any] = {} 

1756 if ednsflags is not None: 

1757 kwargs["ednsflags"] = ednsflags 

1758 if payload is not None: 

1759 kwargs["payload"] = payload 

1760 if request_payload is not None: 

1761 kwargs["request_payload"] = request_payload 

1762 if options is not None: 

1763 kwargs["options"] = options 

1764 if kwargs and use_edns is None: 

1765 use_edns = 0 

1766 kwargs["edns"] = use_edns 

1767 kwargs["pad"] = pad 

1768 m.use_edns(**kwargs) 

1769 m.want_dnssec(want_dnssec) 

1770 return m 

1771 

1772 

1773def make_response( 

1774 query: Message, 

1775 recursion_available: bool = False, 

1776 our_payload: int = 8192, 

1777 fudge: int = 300, 

1778 tsig_error: int = 0, 

1779) -> Message: 

1780 """Make a message which is a response for the specified query. 

1781 The message returned is really a response skeleton; it has all 

1782 of the infrastructure required of a response, but none of the 

1783 content. 

1784 

1785 The response's question section is a shallow copy of the query's 

1786 question section, so the query's question RRsets should not be 

1787 changed. 

1788 

1789 *query*, a ``dns.message.Message``, the query to respond to. 

1790 

1791 *recursion_available*, a ``bool``, should RA be set in the response? 

1792 

1793 *our_payload*, an ``int``, the payload size to advertise in EDNS 

1794 responses. 

1795 

1796 *fudge*, an ``int``, the TSIG time fudge. 

1797 

1798 *tsig_error*, an ``int``, the TSIG error. 

1799 

1800 Returns a ``dns.message.Message`` object whose specific class is 

1801 appropriate for the query. For example, if query is a 

1802 ``dns.update.UpdateMessage``, response will be too. 

1803 """ 

1804 

1805 if query.flags & dns.flags.QR: 

1806 raise dns.exception.FormError("specified query message is not a query") 

1807 factory = _message_factory_from_opcode(query.opcode()) 

1808 response = factory(id=query.id) 

1809 response.flags = dns.flags.QR | (query.flags & dns.flags.RD) 

1810 if recursion_available: 

1811 response.flags |= dns.flags.RA 

1812 response.set_opcode(query.opcode()) 

1813 response.question = list(query.question) 

1814 if query.edns >= 0: 

1815 response.use_edns(0, 0, our_payload, query.payload) 

1816 if query.had_tsig: 

1817 response.use_tsig( 

1818 query.keyring, 

1819 query.keyname, 

1820 fudge, 

1821 None, 

1822 tsig_error, 

1823 b"", 

1824 query.keyalgorithm, 

1825 ) 

1826 response.request_mac = query.mac 

1827 return response 

1828 

1829 

1830### BEGIN generated MessageSection constants 

1831 

1832QUESTION = MessageSection.QUESTION 

1833ANSWER = MessageSection.ANSWER 

1834AUTHORITY = MessageSection.AUTHORITY 

1835ADDITIONAL = MessageSection.ADDITIONAL 

1836 

1837### END generated MessageSection constants