Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/message.py: 48%

774 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 07:09 +0000

1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 

2 

3# Copyright (C) 2001-2017 Nominum, Inc. 

4# 

5# Permission to use, copy, modify, and distribute this software and its 

6# documentation for any purpose with or without fee is hereby granted, 

7# provided that the above copyright notice and this permission notice 

8# appear in all copies. 

9# 

10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 

11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 

12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 

13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 

14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 

15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 

16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 

17 

18"""DNS Messages""" 

19 

20import contextlib 

21import io 

22import time 

23from typing import Any, Dict, List, Optional, Tuple, Union 

24 

25import dns.edns 

26import dns.entropy 

27import dns.enum 

28import dns.exception 

29import dns.flags 

30import dns.name 

31import dns.opcode 

32import dns.rcode 

33import dns.rdata 

34import dns.rdataclass 

35import dns.rdatatype 

36import dns.rdtypes.ANY.OPT 

37import dns.rdtypes.ANY.TSIG 

38import dns.renderer 

39import dns.rrset 

40import dns.tsig 

41import dns.ttl 

42import dns.wire 

43 

44 

45class ShortHeader(dns.exception.FormError): 

46 """The DNS packet passed to from_wire() is too short.""" 

47 

48 

49class TrailingJunk(dns.exception.FormError): 

50 """The DNS packet passed to from_wire() has extra junk at the end of it.""" 

51 

52 

53class UnknownHeaderField(dns.exception.DNSException): 

54 """The header field name was not recognized when converting from text 

55 into a message.""" 

56 

57 

58class BadEDNS(dns.exception.FormError): 

59 """An OPT record occurred somewhere other than 

60 the additional data section.""" 

61 

62 

63class BadTSIG(dns.exception.FormError): 

64 """A TSIG record occurred somewhere other than the end of 

65 the additional data section.""" 

66 

67 

68class UnknownTSIGKey(dns.exception.DNSException): 

69 """A TSIG with an unknown key was received.""" 

70 

71 

72class Truncated(dns.exception.DNSException): 

73 """The truncated flag is set.""" 

74 

75 supp_kwargs = {"message"} 

76 

77 # We do this as otherwise mypy complains about unexpected keyword argument 

78 # idna_exception 

79 def __init__(self, *args, **kwargs): 

80 super().__init__(*args, **kwargs) 

81 

82 def message(self): 

83 """As much of the message as could be processed. 

84 

85 Returns a ``dns.message.Message``. 

86 """ 

87 return self.kwargs["message"] 

88 

89 

90class NotQueryResponse(dns.exception.DNSException): 

91 """Message is not a response to a query.""" 

92 

93 

94class ChainTooLong(dns.exception.DNSException): 

95 """The CNAME chain is too long.""" 

96 

97 

98class AnswerForNXDOMAIN(dns.exception.DNSException): 

99 """The rcode is NXDOMAIN but an answer was found.""" 

100 

101 

102class NoPreviousName(dns.exception.SyntaxError): 

103 """No previous name was known.""" 

104 

105 

106class MessageSection(dns.enum.IntEnum): 

107 """Message sections""" 

108 

109 QUESTION = 0 

110 ANSWER = 1 

111 AUTHORITY = 2 

112 ADDITIONAL = 3 

113 

114 @classmethod 

115 def _maximum(cls): 

116 return 3 

117 

118 

119class MessageError: 

120 def __init__(self, exception: Exception, offset: int): 

121 self.exception = exception 

122 self.offset = offset 

123 

124 

125DEFAULT_EDNS_PAYLOAD = 1232 

126MAX_CHAIN = 16 

127 

128IndexKeyType = Tuple[ 

129 int, 

130 dns.name.Name, 

131 dns.rdataclass.RdataClass, 

132 dns.rdatatype.RdataType, 

133 Optional[dns.rdatatype.RdataType], 

134 Optional[dns.rdataclass.RdataClass], 

135] 

136IndexType = Dict[IndexKeyType, dns.rrset.RRset] 

137SectionType = Union[int, str, List[dns.rrset.RRset]] 

138 

139 

140class Message: 

141 """A DNS message.""" 

142 

143 _section_enum = MessageSection 

144 

145 def __init__(self, id: Optional[int] = None): 

146 if id is None: 

147 self.id = dns.entropy.random_16() 

148 else: 

149 self.id = id 

150 self.flags = 0 

151 self.sections: List[List[dns.rrset.RRset]] = [[], [], [], []] 

152 self.opt: Optional[dns.rrset.RRset] = None 

153 self.request_payload = 0 

154 self.pad = 0 

155 self.keyring: Any = None 

156 self.tsig: Optional[dns.rrset.RRset] = None 

157 self.request_mac = b"" 

158 self.xfr = False 

159 self.origin: Optional[dns.name.Name] = None 

160 self.tsig_ctx: Optional[Any] = None 

161 self.index: IndexType = {} 

162 self.errors: List[MessageError] = [] 

163 self.time = 0.0 

164 

165 @property 

166 def question(self) -> List[dns.rrset.RRset]: 

167 """The question section.""" 

168 return self.sections[0] 

169 

170 @question.setter 

171 def question(self, v): 

172 self.sections[0] = v 

173 

174 @property 

175 def answer(self) -> List[dns.rrset.RRset]: 

176 """The answer section.""" 

177 return self.sections[1] 

178 

179 @answer.setter 

180 def answer(self, v): 

181 self.sections[1] = v 

182 

183 @property 

184 def authority(self) -> List[dns.rrset.RRset]: 

185 """The authority section.""" 

186 return self.sections[2] 

187 

188 @authority.setter 

189 def authority(self, v): 

190 self.sections[2] = v 

191 

192 @property 

193 def additional(self) -> List[dns.rrset.RRset]: 

194 """The additional data section.""" 

195 return self.sections[3] 

196 

197 @additional.setter 

198 def additional(self, v): 

199 self.sections[3] = v 

200 

201 def __repr__(self): 

202 return "<DNS message, ID " + repr(self.id) + ">" 

203 

204 def __str__(self): 

205 return self.to_text() 

206 

207 def to_text( 

208 self, 

209 origin: Optional[dns.name.Name] = None, 

210 relativize: bool = True, 

211 **kw: Dict[str, Any], 

212 ) -> str: 

213 """Convert the message to text. 

214 

215 The *origin*, *relativize*, and any other keyword 

216 arguments are passed to the RRset ``to_wire()`` method. 

217 

218 Returns a ``str``. 

219 """ 

220 

221 s = io.StringIO() 

222 s.write("id %d\n" % self.id) 

223 s.write("opcode %s\n" % dns.opcode.to_text(self.opcode())) 

224 s.write("rcode %s\n" % dns.rcode.to_text(self.rcode())) 

225 s.write("flags %s\n" % dns.flags.to_text(self.flags)) 

226 if self.edns >= 0: 

227 s.write("edns %s\n" % self.edns) 

228 if self.ednsflags != 0: 

229 s.write("eflags %s\n" % dns.flags.edns_to_text(self.ednsflags)) 

230 s.write("payload %d\n" % self.payload) 

231 for opt in self.options: 

232 s.write("option %s\n" % opt.to_text()) 

233 for name, which in self._section_enum.__members__.items(): 

234 s.write(f";{name}\n") 

235 for rrset in self.section_from_number(which): 

236 s.write(rrset.to_text(origin, relativize, **kw)) 

237 s.write("\n") 

238 # 

239 # We strip off the final \n so the caller can print the result without 

240 # doing weird things to get around eccentricities in Python print 

241 # formatting 

242 # 

243 return s.getvalue()[:-1] 

244 

245 def __eq__(self, other): 

246 """Two messages are equal if they have the same content in the 

247 header, question, answer, and authority sections. 

248 

249 Returns a ``bool``. 

250 """ 

251 

252 if not isinstance(other, Message): 

253 return False 

254 if self.id != other.id: 

255 return False 

256 if self.flags != other.flags: 

257 return False 

258 for i, section in enumerate(self.sections): 

259 other_section = other.sections[i] 

260 for n in section: 

261 if n not in other_section: 

262 return False 

263 for n in other_section: 

264 if n not in section: 

265 return False 

266 return True 

267 

268 def __ne__(self, other): 

269 return not self.__eq__(other) 

270 

271 def is_response(self, other: "Message") -> bool: 

272 """Is *other*, also a ``dns.message.Message``, a response to this 

273 message? 

274 

275 Returns a ``bool``. 

276 """ 

277 

278 if ( 

279 other.flags & dns.flags.QR == 0 

280 or self.id != other.id 

281 or dns.opcode.from_flags(self.flags) != dns.opcode.from_flags(other.flags) 

282 ): 

283 return False 

284 if other.rcode() in { 

285 dns.rcode.FORMERR, 

286 dns.rcode.SERVFAIL, 

287 dns.rcode.NOTIMP, 

288 dns.rcode.REFUSED, 

289 }: 

290 # We don't check the question section in these cases if 

291 # the other question section is empty, even though they 

292 # still really ought to have a question section. 

293 if len(other.question) == 0: 

294 return True 

295 if dns.opcode.is_update(self.flags): 

296 # This is assuming the "sender doesn't include anything 

297 # from the update", but we don't care to check the other 

298 # case, which is that all the sections are returned and 

299 # identical. 

300 return True 

301 for n in self.question: 

302 if n not in other.question: 

303 return False 

304 for n in other.question: 

305 if n not in self.question: 

306 return False 

307 return True 

308 

309 def section_number(self, section: List[dns.rrset.RRset]) -> int: 

310 """Return the "section number" of the specified section for use 

311 in indexing. 

312 

313 *section* is one of the section attributes of this message. 

314 

315 Raises ``ValueError`` if the section isn't known. 

316 

317 Returns an ``int``. 

318 """ 

319 

320 for i, our_section in enumerate(self.sections): 

321 if section is our_section: 

322 return self._section_enum(i) 

323 raise ValueError("unknown section") 

324 

325 def section_from_number(self, number: int) -> List[dns.rrset.RRset]: 

326 """Return the section list associated with the specified section 

327 number. 

328 

329 *number* is a section number `int` or the text form of a section 

330 name. 

331 

332 Raises ``ValueError`` if the section isn't known. 

333 

334 Returns a ``list``. 

335 """ 

336 

337 section = self._section_enum.make(number) 

338 return self.sections[section] 

339 

340 def find_rrset( 

341 self, 

342 section: SectionType, 

343 name: dns.name.Name, 

344 rdclass: dns.rdataclass.RdataClass, 

345 rdtype: dns.rdatatype.RdataType, 

346 covers: dns.rdatatype.RdataType = dns.rdatatype.NONE, 

347 deleting: Optional[dns.rdataclass.RdataClass] = None, 

348 create: bool = False, 

349 force_unique: bool = False, 

350 idna_codec: Optional[dns.name.IDNACodec] = None, 

351 ) -> dns.rrset.RRset: 

352 """Find the RRset with the given attributes in the specified section. 

353 

354 *section*, an ``int`` section number, a ``str`` section name, or one of 

355 the section attributes of this message. This specifies the 

356 the section of the message to search. For example:: 

357 

358 my_message.find_rrset(my_message.answer, name, rdclass, rdtype) 

359 my_message.find_rrset(dns.message.ANSWER, name, rdclass, rdtype) 

360 my_message.find_rrset("ANSWER", name, rdclass, rdtype) 

361 

362 *name*, a ``dns.name.Name`` or ``str``, the name of the RRset. 

363 

364 *rdclass*, an ``int`` or ``str``, the class of the RRset. 

365 

366 *rdtype*, an ``int`` or ``str``, the type of the RRset. 

367 

368 *covers*, an ``int`` or ``str``, the covers value of the RRset. 

369 The default is ``dns.rdatatype.NONE``. 

370 

371 *deleting*, an ``int``, ``str``, or ``None``, the deleting value of the 

372 RRset. The default is ``None``. 

373 

374 *create*, a ``bool``. If ``True``, create the RRset if it is not found. 

375 The created RRset is appended to *section*. 

376 

377 *force_unique*, a ``bool``. If ``True`` and *create* is also ``True``, 

378 create a new RRset regardless of whether a matching RRset exists 

379 already. The default is ``False``. This is useful when creating 

380 DDNS Update messages, as order matters for them. 

381 

382 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA 

383 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder 

384 is used. 

385 

386 Raises ``KeyError`` if the RRset was not found and create was 

387 ``False``. 

388 

389 Returns a ``dns.rrset.RRset object``. 

390 """ 

391 

392 if isinstance(section, int): 

393 section_number = section 

394 section = self.section_from_number(section_number) 

395 elif isinstance(section, str): 

396 section_number = MessageSection.from_text(section) 

397 section = self.section_from_number(section_number) 

398 else: 

399 section_number = self.section_number(section) 

400 if isinstance(name, str): 

401 name = dns.name.from_text(name, idna_codec=idna_codec) 

402 rdtype = dns.rdatatype.RdataType.make(rdtype) 

403 rdclass = dns.rdataclass.RdataClass.make(rdclass) 

404 covers = dns.rdatatype.RdataType.make(covers) 

405 if deleting is not None: 

406 deleting = dns.rdataclass.RdataClass.make(deleting) 

407 key = (section_number, name, rdclass, rdtype, covers, deleting) 

408 if not force_unique: 

409 if self.index is not None: 

410 rrset = self.index.get(key) 

411 if rrset is not None: 

412 return rrset 

413 else: 

414 for rrset in section: 

415 if rrset.full_match(name, rdclass, rdtype, covers, deleting): 

416 return rrset 

417 if not create: 

418 raise KeyError 

419 rrset = dns.rrset.RRset(name, rdclass, rdtype, covers, deleting) 

420 section.append(rrset) 

421 if self.index is not None: 

422 self.index[key] = rrset 

423 return rrset 

424 

425 def get_rrset( 

426 self, 

427 section: SectionType, 

428 name: dns.name.Name, 

429 rdclass: dns.rdataclass.RdataClass, 

430 rdtype: dns.rdatatype.RdataType, 

431 covers: dns.rdatatype.RdataType = dns.rdatatype.NONE, 

432 deleting: Optional[dns.rdataclass.RdataClass] = None, 

433 create: bool = False, 

434 force_unique: bool = False, 

435 idna_codec: Optional[dns.name.IDNACodec] = None, 

436 ) -> Optional[dns.rrset.RRset]: 

437 """Get the RRset with the given attributes in the specified section. 

438 

439 If the RRset is not found, None is returned. 

440 

441 *section*, an ``int`` section number, a ``str`` section name, or one of 

442 the section attributes of this message. This specifies the 

443 the section of the message to search. For example:: 

444 

445 my_message.get_rrset(my_message.answer, name, rdclass, rdtype) 

446 my_message.get_rrset(dns.message.ANSWER, name, rdclass, rdtype) 

447 my_message.get_rrset("ANSWER", name, rdclass, rdtype) 

448 

449 *name*, a ``dns.name.Name`` or ``str``, the name of the RRset. 

450 

451 *rdclass*, an ``int`` or ``str``, the class of the RRset. 

452 

453 *rdtype*, an ``int`` or ``str``, the type of the RRset. 

454 

455 *covers*, an ``int`` or ``str``, the covers value of the RRset. 

456 The default is ``dns.rdatatype.NONE``. 

457 

458 *deleting*, an ``int``, ``str``, or ``None``, the deleting value of the 

459 RRset. The default is ``None``. 

460 

461 *create*, a ``bool``. If ``True``, create the RRset if it is not found. 

462 The created RRset is appended to *section*. 

463 

464 *force_unique*, a ``bool``. If ``True`` and *create* is also ``True``, 

465 create a new RRset regardless of whether a matching RRset exists 

466 already. The default is ``False``. This is useful when creating 

467 DDNS Update messages, as order matters for them. 

468 

469 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA 

470 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder 

471 is used. 

472 

473 Returns a ``dns.rrset.RRset object`` or ``None``. 

474 """ 

475 

476 try: 

477 rrset = self.find_rrset( 

478 section, 

479 name, 

480 rdclass, 

481 rdtype, 

482 covers, 

483 deleting, 

484 create, 

485 force_unique, 

486 idna_codec, 

487 ) 

488 except KeyError: 

489 rrset = None 

490 return rrset 

491 

492 def _compute_opt_reserve(self) -> int: 

493 """Compute the size required for the OPT RR, padding excluded""" 

494 if not self.opt: 

495 return 0 

496 # 1 byte for the root name, 10 for the standard RR fields 

497 size = 11 

498 # This would be more efficient if options had a size() method, but we won't 

499 # worry about that for now. We also don't worry if there is an existing padding 

500 # option, as it is unlikely and probably harmless, as the worst case is that we 

501 # may add another, and this seems to be legal. 

502 for option in self.opt[0].options: 

503 wire = option.to_wire() 

504 # We add 4 here to account for the option type and length 

505 size += len(wire) + 4 

506 if self.pad: 

507 # Padding will be added, so again add the option type and length. 

508 size += 4 

509 return size 

510 

511 def _compute_tsig_reserve(self) -> int: 

512 """Compute the size required for the TSIG RR""" 

513 # This would be more efficient if TSIGs had a size method, but we won't 

514 # worry about for now. Also, we can't really cope with the potential 

515 # compressibility of the TSIG owner name, so we estimate with the uncompressed 

516 # size. We will disable compression when TSIG and padding are both is active 

517 # so that the padding comes out right. 

518 if not self.tsig: 

519 return 0 

520 f = io.BytesIO() 

521 self.tsig.to_wire(f) 

522 return len(f.getvalue()) 

523 

524 def to_wire( 

525 self, 

526 origin: Optional[dns.name.Name] = None, 

527 max_size: int = 0, 

528 multi: bool = False, 

529 tsig_ctx: Optional[Any] = None, 

530 **kw: Dict[str, Any], 

531 ) -> bytes: 

532 """Return a string containing the message in DNS compressed wire 

533 format. 

534 

535 Additional keyword arguments are passed to the RRset ``to_wire()`` 

536 method. 

537 

538 *origin*, a ``dns.name.Name`` or ``None``, the origin to be appended 

539 to any relative names. If ``None``, and the message has an origin 

540 attribute that is not ``None``, then it will be used. 

541 

542 *max_size*, an ``int``, the maximum size of the wire format 

543 output; default is 0, which means "the message's request 

544 payload, if nonzero, or 65535". 

545 

546 *multi*, a ``bool``, should be set to ``True`` if this message is 

547 part of a multiple message sequence. 

548 

549 *tsig_ctx*, a ``dns.tsig.HMACTSig`` or ``dns.tsig.GSSTSig`` object, the 

550 ongoing TSIG context, used when signing zone transfers. 

551 

552 Raises ``dns.exception.TooBig`` if *max_size* was exceeded. 

553 

554 Returns a ``bytes``. 

555 """ 

556 

557 if origin is None and self.origin is not None: 

558 origin = self.origin 

559 if max_size == 0: 

560 if self.request_payload != 0: 

561 max_size = self.request_payload 

562 else: 

563 max_size = 65535 

564 if max_size < 512: 

565 max_size = 512 

566 elif max_size > 65535: 

567 max_size = 65535 

568 r = dns.renderer.Renderer(self.id, self.flags, max_size, origin) 

569 opt_reserve = self._compute_opt_reserve() 

570 r.reserve(opt_reserve) 

571 tsig_reserve = self._compute_tsig_reserve() 

572 r.reserve(tsig_reserve) 

573 for rrset in self.question: 

574 r.add_question(rrset.name, rrset.rdtype, rrset.rdclass) 

575 for rrset in self.answer: 

576 r.add_rrset(dns.renderer.ANSWER, rrset, **kw) 

577 for rrset in self.authority: 

578 r.add_rrset(dns.renderer.AUTHORITY, rrset, **kw) 

579 for rrset in self.additional: 

580 r.add_rrset(dns.renderer.ADDITIONAL, rrset, **kw) 

581 r.release_reserved() 

582 if self.opt is not None: 

583 r.add_opt(self.opt, self.pad, opt_reserve, tsig_reserve) 

584 r.write_header() 

585 if self.tsig is not None: 

586 (new_tsig, ctx) = dns.tsig.sign( 

587 r.get_wire(), 

588 self.keyring, 

589 self.tsig[0], 

590 int(time.time()), 

591 self.request_mac, 

592 tsig_ctx, 

593 multi, 

594 ) 

595 self.tsig.clear() 

596 self.tsig.add(new_tsig) 

597 r.add_rrset(dns.renderer.ADDITIONAL, self.tsig) 

598 r.write_header() 

599 if multi: 

600 self.tsig_ctx = ctx 

601 return r.get_wire() 

602 

603 @staticmethod 

604 def _make_tsig( 

605 keyname, algorithm, time_signed, fudge, mac, original_id, error, other 

606 ): 

607 tsig = dns.rdtypes.ANY.TSIG.TSIG( 

608 dns.rdataclass.ANY, 

609 dns.rdatatype.TSIG, 

610 algorithm, 

611 time_signed, 

612 fudge, 

613 mac, 

614 original_id, 

615 error, 

616 other, 

617 ) 

618 return dns.rrset.from_rdata(keyname, 0, tsig) 

619 

620 def use_tsig( 

621 self, 

622 keyring: Any, 

623 keyname: Optional[Union[dns.name.Name, str]] = None, 

624 fudge: int = 300, 

625 original_id: Optional[int] = None, 

626 tsig_error: int = 0, 

627 other_data: bytes = b"", 

628 algorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm, 

629 ) -> None: 

630 """When sending, a TSIG signature using the specified key 

631 should be added. 

632 

633 *key*, a ``dns.tsig.Key`` is the key to use. If a key is specified, 

634 the *keyring* and *algorithm* fields are not used. 

635 

636 *keyring*, a ``dict``, ``callable`` or ``dns.tsig.Key``, is either 

637 the TSIG keyring or key to use. 

638 

639 The format of a keyring dict is a mapping from TSIG key name, as 

640 ``dns.name.Name`` to ``dns.tsig.Key`` or a TSIG secret, a ``bytes``. 

641 If a ``dict`` *keyring* is specified but a *keyname* is not, the key 

642 used will be the first key in the *keyring*. Note that the order of 

643 keys in a dictionary is not defined, so applications should supply a 

644 keyname when a ``dict`` keyring is used, unless they know the keyring 

645 contains only one key. If a ``callable`` keyring is specified, the 

646 callable will be called with the message and the keyname, and is 

647 expected to return a key. 

648 

649 *keyname*, a ``dns.name.Name``, ``str`` or ``None``, the name of 

650 this TSIG key to use; defaults to ``None``. If *keyring* is a 

651 ``dict``, the key must be defined in it. If *keyring* is a 

652 ``dns.tsig.Key``, this is ignored. 

653 

654 *fudge*, an ``int``, the TSIG time fudge. 

655 

656 *original_id*, an ``int``, the TSIG original id. If ``None``, 

657 the message's id is used. 

658 

659 *tsig_error*, an ``int``, the TSIG error code. 

660 

661 *other_data*, a ``bytes``, the TSIG other data. 

662 

663 *algorithm*, a ``dns.name.Name`` or ``str``, the TSIG algorithm to use. This is 

664 only used if *keyring* is a ``dict``, and the key entry is a ``bytes``. 

665 """ 

666 

667 if isinstance(keyring, dns.tsig.Key): 

668 key = keyring 

669 keyname = key.name 

670 elif callable(keyring): 

671 key = keyring(self, keyname) 

672 else: 

673 if isinstance(keyname, str): 

674 keyname = dns.name.from_text(keyname) 

675 if keyname is None: 

676 keyname = next(iter(keyring)) 

677 key = keyring[keyname] 

678 if isinstance(key, bytes): 

679 key = dns.tsig.Key(keyname, key, algorithm) 

680 self.keyring = key 

681 if original_id is None: 

682 original_id = self.id 

683 self.tsig = self._make_tsig( 

684 keyname, 

685 self.keyring.algorithm, 

686 0, 

687 fudge, 

688 b"\x00" * dns.tsig.mac_sizes[self.keyring.algorithm], 

689 original_id, 

690 tsig_error, 

691 other_data, 

692 ) 

693 

694 @property 

695 def keyname(self) -> Optional[dns.name.Name]: 

696 if self.tsig: 

697 return self.tsig.name 

698 else: 

699 return None 

700 

701 @property 

702 def keyalgorithm(self) -> Optional[dns.name.Name]: 

703 if self.tsig: 

704 return self.tsig[0].algorithm 

705 else: 

706 return None 

707 

708 @property 

709 def mac(self) -> Optional[bytes]: 

710 if self.tsig: 

711 return self.tsig[0].mac 

712 else: 

713 return None 

714 

715 @property 

716 def tsig_error(self) -> Optional[int]: 

717 if self.tsig: 

718 return self.tsig[0].error 

719 else: 

720 return None 

721 

722 @property 

723 def had_tsig(self) -> bool: 

724 return bool(self.tsig) 

725 

726 @staticmethod 

727 def _make_opt(flags=0, payload=DEFAULT_EDNS_PAYLOAD, options=None): 

728 opt = dns.rdtypes.ANY.OPT.OPT(payload, dns.rdatatype.OPT, options or ()) 

729 return dns.rrset.from_rdata(dns.name.root, int(flags), opt) 

730 

731 def use_edns( 

732 self, 

733 edns: Optional[Union[int, bool]] = 0, 

734 ednsflags: int = 0, 

735 payload: int = DEFAULT_EDNS_PAYLOAD, 

736 request_payload: Optional[int] = None, 

737 options: Optional[List[dns.edns.Option]] = None, 

738 pad: int = 0, 

739 ) -> None: 

740 """Configure EDNS behavior. 

741 

742 *edns*, an ``int``, is the EDNS level to use. Specifying ``None``, ``False``, 

743 or ``-1`` means "do not use EDNS", and in this case the other parameters are 

744 ignored. Specifying ``True`` is equivalent to specifying 0, i.e. "use EDNS0". 

745 

746 *ednsflags*, an ``int``, the EDNS flag values. 

747 

748 *payload*, an ``int``, is the EDNS sender's payload field, which is the maximum 

749 size of UDP datagram the sender can handle. I.e. how big a response to this 

750 message can be. 

751 

752 *request_payload*, an ``int``, is the EDNS payload size to use when sending this 

753 message. If not specified, defaults to the value of *payload*. 

754 

755 *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS options. 

756 

757 *pad*, a non-negative ``int``. If 0, the default, do not pad; otherwise add 

758 padding bytes to make the message size a multiple of *pad*. Note that if 

759 padding is non-zero, an EDNS PADDING option will always be added to the 

760 message. 

761 """ 

762 

763 if edns is None or edns is False: 

764 edns = -1 

765 elif edns is True: 

766 edns = 0 

767 if edns < 0: 

768 self.opt = None 

769 self.request_payload = 0 

770 else: 

771 # make sure the EDNS version in ednsflags agrees with edns 

772 ednsflags &= 0xFF00FFFF 

773 ednsflags |= edns << 16 

774 if options is None: 

775 options = [] 

776 self.opt = self._make_opt(ednsflags, payload, options) 

777 if request_payload is None: 

778 request_payload = payload 

779 self.request_payload = request_payload 

780 self.pad = pad 

781 

782 @property 

783 def edns(self) -> int: 

784 if self.opt: 

785 return (self.ednsflags & 0xFF0000) >> 16 

786 else: 

787 return -1 

788 

789 @property 

790 def ednsflags(self) -> int: 

791 if self.opt: 

792 return self.opt.ttl 

793 else: 

794 return 0 

795 

796 @ednsflags.setter 

797 def ednsflags(self, v): 

798 if self.opt: 

799 self.opt.ttl = v 

800 elif v: 

801 self.opt = self._make_opt(v) 

802 

803 @property 

804 def payload(self) -> int: 

805 if self.opt: 

806 return self.opt[0].payload 

807 else: 

808 return 0 

809 

810 @property 

811 def options(self) -> Tuple: 

812 if self.opt: 

813 return self.opt[0].options 

814 else: 

815 return () 

816 

817 def want_dnssec(self, wanted: bool = True) -> None: 

818 """Enable or disable 'DNSSEC desired' flag in requests. 

819 

820 *wanted*, a ``bool``. If ``True``, then DNSSEC data is 

821 desired in the response, EDNS is enabled if required, and then 

822 the DO bit is set. If ``False``, the DO bit is cleared if 

823 EDNS is enabled. 

824 """ 

825 

826 if wanted: 

827 self.ednsflags |= dns.flags.DO 

828 elif self.opt: 

829 self.ednsflags &= ~dns.flags.DO 

830 

831 def rcode(self) -> dns.rcode.Rcode: 

832 """Return the rcode. 

833 

834 Returns a ``dns.rcode.Rcode``. 

835 """ 

836 return dns.rcode.from_flags(int(self.flags), int(self.ednsflags)) 

837 

838 def set_rcode(self, rcode: dns.rcode.Rcode) -> None: 

839 """Set the rcode. 

840 

841 *rcode*, a ``dns.rcode.Rcode``, is the rcode to set. 

842 """ 

843 (value, evalue) = dns.rcode.to_flags(rcode) 

844 self.flags &= 0xFFF0 

845 self.flags |= value 

846 self.ednsflags &= 0x00FFFFFF 

847 self.ednsflags |= evalue 

848 

849 def opcode(self) -> dns.opcode.Opcode: 

850 """Return the opcode. 

851 

852 Returns a ``dns.opcode.Opcode``. 

853 """ 

854 return dns.opcode.from_flags(int(self.flags)) 

855 

856 def set_opcode(self, opcode: dns.opcode.Opcode) -> None: 

857 """Set the opcode. 

858 

859 *opcode*, a ``dns.opcode.Opcode``, is the opcode to set. 

860 """ 

861 self.flags &= 0x87FF 

862 self.flags |= dns.opcode.to_flags(opcode) 

863 

864 def _get_one_rr_per_rrset(self, value): 

865 # What the caller picked is fine. 

866 return value 

867 

868 # pylint: disable=unused-argument 

869 

870 def _parse_rr_header(self, section, name, rdclass, rdtype): 

871 return (rdclass, rdtype, None, False) 

872 

873 # pylint: enable=unused-argument 

874 

875 def _parse_special_rr_header(self, section, count, position, name, rdclass, rdtype): 

876 if rdtype == dns.rdatatype.OPT: 

877 if ( 

878 section != MessageSection.ADDITIONAL 

879 or self.opt 

880 or name != dns.name.root 

881 ): 

882 raise BadEDNS 

883 elif rdtype == dns.rdatatype.TSIG: 

884 if ( 

885 section != MessageSection.ADDITIONAL 

886 or rdclass != dns.rdatatype.ANY 

887 or position != count - 1 

888 ): 

889 raise BadTSIG 

890 return (rdclass, rdtype, None, False) 

891 

892 

893class ChainingResult: 

894 """The result of a call to dns.message.QueryMessage.resolve_chaining(). 

895 

896 The ``answer`` attribute is the answer RRSet, or ``None`` if it doesn't 

897 exist. 

898 

899 The ``canonical_name`` attribute is the canonical name after all 

900 chaining has been applied (this is the same name as ``rrset.name`` in cases 

901 where rrset is not ``None``). 

902 

903 The ``minimum_ttl`` attribute is the minimum TTL, i.e. the TTL to 

904 use if caching the data. It is the smallest of all the CNAME TTLs 

905 and either the answer TTL if it exists or the SOA TTL and SOA 

906 minimum values for negative answers. 

907 

908 The ``cnames`` attribute is a list of all the CNAME RRSets followed to 

909 get to the canonical name. 

910 """ 

911 

912 def __init__( 

913 self, 

914 canonical_name: dns.name.Name, 

915 answer: Optional[dns.rrset.RRset], 

916 minimum_ttl: int, 

917 cnames: List[dns.rrset.RRset], 

918 ): 

919 self.canonical_name = canonical_name 

920 self.answer = answer 

921 self.minimum_ttl = minimum_ttl 

922 self.cnames = cnames 

923 

924 

925class QueryMessage(Message): 

926 def resolve_chaining(self) -> ChainingResult: 

927 """Follow the CNAME chain in the response to determine the answer 

928 RRset. 

929 

930 Raises ``dns.message.NotQueryResponse`` if the message is not 

931 a response. 

932 

933 Raises ``dns.message.ChainTooLong`` if the CNAME chain is too long. 

934 

935 Raises ``dns.message.AnswerForNXDOMAIN`` if the rcode is NXDOMAIN 

936 but an answer was found. 

937 

938 Raises ``dns.exception.FormError`` if the question count is not 1. 

939 

940 Returns a ChainingResult object. 

941 """ 

942 if self.flags & dns.flags.QR == 0: 

943 raise NotQueryResponse 

944 if len(self.question) != 1: 

945 raise dns.exception.FormError 

946 question = self.question[0] 

947 qname = question.name 

948 min_ttl = dns.ttl.MAX_TTL 

949 answer = None 

950 count = 0 

951 cnames = [] 

952 while count < MAX_CHAIN: 

953 try: 

954 answer = self.find_rrset( 

955 self.answer, qname, question.rdclass, question.rdtype 

956 ) 

957 min_ttl = min(min_ttl, answer.ttl) 

958 break 

959 except KeyError: 

960 if question.rdtype != dns.rdatatype.CNAME: 

961 try: 

962 crrset = self.find_rrset( 

963 self.answer, qname, question.rdclass, dns.rdatatype.CNAME 

964 ) 

965 cnames.append(crrset) 

966 min_ttl = min(min_ttl, crrset.ttl) 

967 for rd in crrset: 

968 qname = rd.target 

969 break 

970 count += 1 

971 continue 

972 except KeyError: 

973 # Exit the chaining loop 

974 break 

975 else: 

976 # Exit the chaining loop 

977 break 

978 if count >= MAX_CHAIN: 

979 raise ChainTooLong 

980 if self.rcode() == dns.rcode.NXDOMAIN and answer is not None: 

981 raise AnswerForNXDOMAIN 

982 if answer is None: 

983 # Further minimize the TTL with NCACHE. 

984 auname = qname 

985 while True: 

986 # Look for an SOA RR whose owner name is a superdomain 

987 # of qname. 

988 try: 

989 srrset = self.find_rrset( 

990 self.authority, auname, question.rdclass, dns.rdatatype.SOA 

991 ) 

992 min_ttl = min(min_ttl, srrset.ttl, srrset[0].minimum) 

993 break 

994 except KeyError: 

995 try: 

996 auname = auname.parent() 

997 except dns.name.NoParent: 

998 break 

999 return ChainingResult(qname, answer, min_ttl, cnames) 

1000 

1001 def canonical_name(self) -> dns.name.Name: 

1002 """Return the canonical name of the first name in the question 

1003 section. 

1004 

1005 Raises ``dns.message.NotQueryResponse`` if the message is not 

1006 a response. 

1007 

1008 Raises ``dns.message.ChainTooLong`` if the CNAME chain is too long. 

1009 

1010 Raises ``dns.message.AnswerForNXDOMAIN`` if the rcode is NXDOMAIN 

1011 but an answer was found. 

1012 

1013 Raises ``dns.exception.FormError`` if the question count is not 1. 

1014 """ 

1015 return self.resolve_chaining().canonical_name 

1016 

1017 

1018def _maybe_import_update(): 

1019 # We avoid circular imports by doing this here. We do it in another 

1020 # function as doing it in _message_factory_from_opcode() makes "dns" 

1021 # a local symbol, and the first line fails :) 

1022 

1023 # pylint: disable=redefined-outer-name,import-outside-toplevel,unused-import 

1024 import dns.update # noqa: F401 

1025 

1026 

1027def _message_factory_from_opcode(opcode): 

1028 if opcode == dns.opcode.QUERY: 

1029 return QueryMessage 

1030 elif opcode == dns.opcode.UPDATE: 

1031 _maybe_import_update() 

1032 return dns.update.UpdateMessage 

1033 else: 

1034 return Message 

1035 

1036 

1037class _WireReader: 

1038 

1039 """Wire format reader. 

1040 

1041 parser: the binary parser 

1042 message: The message object being built 

1043 initialize_message: Callback to set message parsing options 

1044 question_only: Are we only reading the question? 

1045 one_rr_per_rrset: Put each RR into its own RRset? 

1046 keyring: TSIG keyring 

1047 ignore_trailing: Ignore trailing junk at end of request? 

1048 multi: Is this message part of a multi-message sequence? 

1049 DNS dynamic updates. 

1050 continue_on_error: try to extract as much information as possible from 

1051 the message, accumulating MessageErrors in the *errors* attribute instead of 

1052 raising them. 

1053 """ 

1054 

1055 def __init__( 

1056 self, 

1057 wire, 

1058 initialize_message, 

1059 question_only=False, 

1060 one_rr_per_rrset=False, 

1061 ignore_trailing=False, 

1062 keyring=None, 

1063 multi=False, 

1064 continue_on_error=False, 

1065 ): 

1066 self.parser = dns.wire.Parser(wire) 

1067 self.message = None 

1068 self.initialize_message = initialize_message 

1069 self.question_only = question_only 

1070 self.one_rr_per_rrset = one_rr_per_rrset 

1071 self.ignore_trailing = ignore_trailing 

1072 self.keyring = keyring 

1073 self.multi = multi 

1074 self.continue_on_error = continue_on_error 

1075 self.errors = [] 

1076 

1077 def _get_question(self, section_number, qcount): 

1078 """Read the next *qcount* records from the wire data and add them to 

1079 the question section. 

1080 """ 

1081 assert self.message is not None 

1082 section = self.message.sections[section_number] 

1083 for _ in range(qcount): 

1084 qname = self.parser.get_name(self.message.origin) 

1085 (rdtype, rdclass) = self.parser.get_struct("!HH") 

1086 (rdclass, rdtype, _, _) = self.message._parse_rr_header( 

1087 section_number, qname, rdclass, rdtype 

1088 ) 

1089 self.message.find_rrset( 

1090 section, qname, rdclass, rdtype, create=True, force_unique=True 

1091 ) 

1092 

1093 def _add_error(self, e): 

1094 self.errors.append(MessageError(e, self.parser.current)) 

1095 

1096 def _get_section(self, section_number, count): 

1097 """Read the next I{count} records from the wire data and add them to 

1098 the specified section. 

1099 

1100 section_number: the section of the message to which to add records 

1101 count: the number of records to read 

1102 """ 

1103 assert self.message is not None 

1104 section = self.message.sections[section_number] 

1105 force_unique = self.one_rr_per_rrset 

1106 for i in range(count): 

1107 rr_start = self.parser.current 

1108 absolute_name = self.parser.get_name() 

1109 if self.message.origin is not None: 

1110 name = absolute_name.relativize(self.message.origin) 

1111 else: 

1112 name = absolute_name 

1113 (rdtype, rdclass, ttl, rdlen) = self.parser.get_struct("!HHIH") 

1114 if rdtype in (dns.rdatatype.OPT, dns.rdatatype.TSIG): 

1115 ( 

1116 rdclass, 

1117 rdtype, 

1118 deleting, 

1119 empty, 

1120 ) = self.message._parse_special_rr_header( 

1121 section_number, count, i, name, rdclass, rdtype 

1122 ) 

1123 else: 

1124 (rdclass, rdtype, deleting, empty) = self.message._parse_rr_header( 

1125 section_number, name, rdclass, rdtype 

1126 ) 

1127 rdata_start = self.parser.current 

1128 try: 

1129 if empty: 

1130 if rdlen > 0: 

1131 raise dns.exception.FormError 

1132 rd = None 

1133 covers = dns.rdatatype.NONE 

1134 else: 

1135 with self.parser.restrict_to(rdlen): 

1136 rd = dns.rdata.from_wire_parser( 

1137 rdclass, rdtype, self.parser, self.message.origin 

1138 ) 

1139 covers = rd.covers() 

1140 if self.message.xfr and rdtype == dns.rdatatype.SOA: 

1141 force_unique = True 

1142 if rdtype == dns.rdatatype.OPT: 

1143 self.message.opt = dns.rrset.from_rdata(name, ttl, rd) 

1144 elif rdtype == dns.rdatatype.TSIG: 

1145 if self.keyring is None: 

1146 raise UnknownTSIGKey("got signed message without keyring") 

1147 if isinstance(self.keyring, dict): 

1148 key = self.keyring.get(absolute_name) 

1149 if isinstance(key, bytes): 

1150 key = dns.tsig.Key(absolute_name, key, rd.algorithm) 

1151 elif callable(self.keyring): 

1152 key = self.keyring(self.message, absolute_name) 

1153 else: 

1154 key = self.keyring 

1155 if key is None: 

1156 raise UnknownTSIGKey("key '%s' unknown" % name) 

1157 self.message.keyring = key 

1158 self.message.tsig_ctx = dns.tsig.validate( 

1159 self.parser.wire, 

1160 key, 

1161 absolute_name, 

1162 rd, 

1163 int(time.time()), 

1164 self.message.request_mac, 

1165 rr_start, 

1166 self.message.tsig_ctx, 

1167 self.multi, 

1168 ) 

1169 self.message.tsig = dns.rrset.from_rdata(absolute_name, 0, rd) 

1170 else: 

1171 rrset = self.message.find_rrset( 

1172 section, 

1173 name, 

1174 rdclass, 

1175 rdtype, 

1176 covers, 

1177 deleting, 

1178 True, 

1179 force_unique, 

1180 ) 

1181 if rd is not None: 

1182 if ttl > 0x7FFFFFFF: 

1183 ttl = 0 

1184 rrset.add(rd, ttl) 

1185 except Exception as e: 

1186 if self.continue_on_error: 

1187 self._add_error(e) 

1188 self.parser.seek(rdata_start + rdlen) 

1189 else: 

1190 raise 

1191 

1192 def read(self): 

1193 """Read a wire format DNS message and build a dns.message.Message 

1194 object.""" 

1195 

1196 if self.parser.remaining() < 12: 

1197 raise ShortHeader 

1198 (id, flags, qcount, ancount, aucount, adcount) = self.parser.get_struct( 

1199 "!HHHHHH" 

1200 ) 

1201 factory = _message_factory_from_opcode(dns.opcode.from_flags(flags)) 

1202 self.message = factory(id=id) 

1203 self.message.flags = dns.flags.Flag(flags) 

1204 self.initialize_message(self.message) 

1205 self.one_rr_per_rrset = self.message._get_one_rr_per_rrset( 

1206 self.one_rr_per_rrset 

1207 ) 

1208 try: 

1209 self._get_question(MessageSection.QUESTION, qcount) 

1210 if self.question_only: 

1211 return self.message 

1212 self._get_section(MessageSection.ANSWER, ancount) 

1213 self._get_section(MessageSection.AUTHORITY, aucount) 

1214 self._get_section(MessageSection.ADDITIONAL, adcount) 

1215 if not self.ignore_trailing and self.parser.remaining() != 0: 

1216 raise TrailingJunk 

1217 if self.multi and self.message.tsig_ctx and not self.message.had_tsig: 

1218 self.message.tsig_ctx.update(self.parser.wire) 

1219 except Exception as e: 

1220 if self.continue_on_error: 

1221 self._add_error(e) 

1222 else: 

1223 raise 

1224 return self.message 

1225 

1226 

1227def from_wire( 

1228 wire: bytes, 

1229 keyring: Optional[Any] = None, 

1230 request_mac: Optional[bytes] = b"", 

1231 xfr: bool = False, 

1232 origin: Optional[dns.name.Name] = None, 

1233 tsig_ctx: Optional[Union[dns.tsig.HMACTSig, dns.tsig.GSSTSig]] = None, 

1234 multi: bool = False, 

1235 question_only: bool = False, 

1236 one_rr_per_rrset: bool = False, 

1237 ignore_trailing: bool = False, 

1238 raise_on_truncation: bool = False, 

1239 continue_on_error: bool = False, 

1240) -> Message: 

1241 """Convert a DNS wire format message into a message object. 

1242 

1243 *keyring*, a ``dns.tsig.Key`` or ``dict``, the key or keyring to use if the message 

1244 is signed. 

1245 

1246 *request_mac*, a ``bytes`` or ``None``. If the message is a response to a 

1247 TSIG-signed request, *request_mac* should be set to the MAC of that request. 

1248 

1249 *xfr*, a ``bool``, should be set to ``True`` if this message is part of a zone 

1250 transfer. 

1251 

1252 *origin*, a ``dns.name.Name`` or ``None``. If the message is part of a zone 

1253 transfer, *origin* should be the origin name of the zone. If not ``None``, names 

1254 will be relativized to the origin. 

1255 

1256 *tsig_ctx*, a ``dns.tsig.HMACTSig`` or ``dns.tsig.GSSTSig`` object, the ongoing TSIG 

1257 context, used when validating zone transfers. 

1258 

1259 *multi*, a ``bool``, should be set to ``True`` if this message is part of a multiple 

1260 message sequence. 

1261 

1262 *question_only*, a ``bool``. If ``True``, read only up to the end of the question 

1263 section. 

1264 

1265 *one_rr_per_rrset*, a ``bool``. If ``True``, put each RR into its own RRset. 

1266 

1267 *ignore_trailing*, a ``bool``. If ``True``, ignore trailing junk at end of the 

1268 message. 

1269 

1270 *raise_on_truncation*, a ``bool``. If ``True``, raise an exception if the TC bit is 

1271 set. 

1272 

1273 *continue_on_error*, a ``bool``. If ``True``, try to continue parsing even if 

1274 errors occur. Erroneous rdata will be ignored. Errors will be accumulated as a 

1275 list of MessageError objects in the message's ``errors`` attribute. This option is 

1276 recommended only for DNS analysis tools, or for use in a server as part of an error 

1277 handling path. The default is ``False``. 

1278 

1279 Raises ``dns.message.ShortHeader`` if the message is less than 12 octets long. 

1280 

1281 Raises ``dns.message.TrailingJunk`` if there were octets in the message past the end 

1282 of the proper DNS message, and *ignore_trailing* is ``False``. 

1283 

1284 Raises ``dns.message.BadEDNS`` if an OPT record was in the wrong section, or 

1285 occurred more than once. 

1286 

1287 Raises ``dns.message.BadTSIG`` if a TSIG record was not the last record of the 

1288 additional data section. 

1289 

1290 Raises ``dns.message.Truncated`` if the TC flag is set and *raise_on_truncation* is 

1291 ``True``. 

1292 

1293 Returns a ``dns.message.Message``. 

1294 """ 

1295 

1296 # We permit None for request_mac solely for backwards compatibility 

1297 if request_mac is None: 

1298 request_mac = b"" 

1299 

1300 def initialize_message(message): 

1301 message.request_mac = request_mac 

1302 message.xfr = xfr 

1303 message.origin = origin 

1304 message.tsig_ctx = tsig_ctx 

1305 

1306 reader = _WireReader( 

1307 wire, 

1308 initialize_message, 

1309 question_only, 

1310 one_rr_per_rrset, 

1311 ignore_trailing, 

1312 keyring, 

1313 multi, 

1314 continue_on_error, 

1315 ) 

1316 try: 

1317 m = reader.read() 

1318 except dns.exception.FormError: 

1319 if ( 

1320 reader.message 

1321 and (reader.message.flags & dns.flags.TC) 

1322 and raise_on_truncation 

1323 ): 

1324 raise Truncated(message=reader.message) 

1325 else: 

1326 raise 

1327 # Reading a truncated message might not have any errors, so we 

1328 # have to do this check here too. 

1329 if m.flags & dns.flags.TC and raise_on_truncation: 

1330 raise Truncated(message=m) 

1331 if continue_on_error: 

1332 m.errors = reader.errors 

1333 

1334 return m 

1335 

1336 

1337class _TextReader: 

1338 

1339 """Text format reader. 

1340 

1341 tok: the tokenizer. 

1342 message: The message object being built. 

1343 DNS dynamic updates. 

1344 last_name: The most recently read name when building a message object. 

1345 one_rr_per_rrset: Put each RR into its own RRset? 

1346 origin: The origin for relative names 

1347 relativize: relativize names? 

1348 relativize_to: the origin to relativize to. 

1349 """ 

1350 

1351 def __init__( 

1352 self, 

1353 text, 

1354 idna_codec, 

1355 one_rr_per_rrset=False, 

1356 origin=None, 

1357 relativize=True, 

1358 relativize_to=None, 

1359 ): 

1360 self.message = None 

1361 self.tok = dns.tokenizer.Tokenizer(text, idna_codec=idna_codec) 

1362 self.last_name = None 

1363 self.one_rr_per_rrset = one_rr_per_rrset 

1364 self.origin = origin 

1365 self.relativize = relativize 

1366 self.relativize_to = relativize_to 

1367 self.id = None 

1368 self.edns = -1 

1369 self.ednsflags = 0 

1370 self.payload = DEFAULT_EDNS_PAYLOAD 

1371 self.rcode = None 

1372 self.opcode = dns.opcode.QUERY 

1373 self.flags = 0 

1374 

1375 def _header_line(self, _): 

1376 """Process one line from the text format header section.""" 

1377 

1378 token = self.tok.get() 

1379 what = token.value 

1380 if what == "id": 

1381 self.id = self.tok.get_int() 

1382 elif what == "flags": 

1383 while True: 

1384 token = self.tok.get() 

1385 if not token.is_identifier(): 

1386 self.tok.unget(token) 

1387 break 

1388 self.flags = self.flags | dns.flags.from_text(token.value) 

1389 elif what == "edns": 

1390 self.edns = self.tok.get_int() 

1391 self.ednsflags = self.ednsflags | (self.edns << 16) 

1392 elif what == "eflags": 

1393 if self.edns < 0: 

1394 self.edns = 0 

1395 while True: 

1396 token = self.tok.get() 

1397 if not token.is_identifier(): 

1398 self.tok.unget(token) 

1399 break 

1400 self.ednsflags = self.ednsflags | dns.flags.edns_from_text(token.value) 

1401 elif what == "payload": 

1402 self.payload = self.tok.get_int() 

1403 if self.edns < 0: 

1404 self.edns = 0 

1405 elif what == "opcode": 

1406 text = self.tok.get_string() 

1407 self.opcode = dns.opcode.from_text(text) 

1408 self.flags = self.flags | dns.opcode.to_flags(self.opcode) 

1409 elif what == "rcode": 

1410 text = self.tok.get_string() 

1411 self.rcode = dns.rcode.from_text(text) 

1412 else: 

1413 raise UnknownHeaderField 

1414 self.tok.get_eol() 

1415 

1416 def _question_line(self, section_number): 

1417 """Process one line from the text format question section.""" 

1418 

1419 section = self.message.sections[section_number] 

1420 token = self.tok.get(want_leading=True) 

1421 if not token.is_whitespace(): 

1422 self.last_name = self.tok.as_name( 

1423 token, self.message.origin, self.relativize, self.relativize_to 

1424 ) 

1425 name = self.last_name 

1426 if name is None: 

1427 raise NoPreviousName 

1428 token = self.tok.get() 

1429 if not token.is_identifier(): 

1430 raise dns.exception.SyntaxError 

1431 # Class 

1432 try: 

1433 rdclass = dns.rdataclass.from_text(token.value) 

1434 token = self.tok.get() 

1435 if not token.is_identifier(): 

1436 raise dns.exception.SyntaxError 

1437 except dns.exception.SyntaxError: 

1438 raise dns.exception.SyntaxError 

1439 except Exception: 

1440 rdclass = dns.rdataclass.IN 

1441 # Type 

1442 rdtype = dns.rdatatype.from_text(token.value) 

1443 (rdclass, rdtype, _, _) = self.message._parse_rr_header( 

1444 section_number, name, rdclass, rdtype 

1445 ) 

1446 self.message.find_rrset( 

1447 section, name, rdclass, rdtype, create=True, force_unique=True 

1448 ) 

1449 self.tok.get_eol() 

1450 

1451 def _rr_line(self, section_number): 

1452 """Process one line from the text format answer, authority, or 

1453 additional data sections. 

1454 """ 

1455 

1456 section = self.message.sections[section_number] 

1457 # Name 

1458 token = self.tok.get(want_leading=True) 

1459 if not token.is_whitespace(): 

1460 self.last_name = self.tok.as_name( 

1461 token, self.message.origin, self.relativize, self.relativize_to 

1462 ) 

1463 name = self.last_name 

1464 if name is None: 

1465 raise NoPreviousName 

1466 token = self.tok.get() 

1467 if not token.is_identifier(): 

1468 raise dns.exception.SyntaxError 

1469 # TTL 

1470 try: 

1471 ttl = int(token.value, 0) 

1472 token = self.tok.get() 

1473 if not token.is_identifier(): 

1474 raise dns.exception.SyntaxError 

1475 except dns.exception.SyntaxError: 

1476 raise dns.exception.SyntaxError 

1477 except Exception: 

1478 ttl = 0 

1479 # Class 

1480 try: 

1481 rdclass = dns.rdataclass.from_text(token.value) 

1482 token = self.tok.get() 

1483 if not token.is_identifier(): 

1484 raise dns.exception.SyntaxError 

1485 except dns.exception.SyntaxError: 

1486 raise dns.exception.SyntaxError 

1487 except Exception: 

1488 rdclass = dns.rdataclass.IN 

1489 # Type 

1490 rdtype = dns.rdatatype.from_text(token.value) 

1491 (rdclass, rdtype, deleting, empty) = self.message._parse_rr_header( 

1492 section_number, name, rdclass, rdtype 

1493 ) 

1494 token = self.tok.get() 

1495 if empty and not token.is_eol_or_eof(): 

1496 raise dns.exception.SyntaxError 

1497 if not empty and token.is_eol_or_eof(): 

1498 raise dns.exception.UnexpectedEnd 

1499 if not token.is_eol_or_eof(): 

1500 self.tok.unget(token) 

1501 rd = dns.rdata.from_text( 

1502 rdclass, 

1503 rdtype, 

1504 self.tok, 

1505 self.message.origin, 

1506 self.relativize, 

1507 self.relativize_to, 

1508 ) 

1509 covers = rd.covers() 

1510 else: 

1511 rd = None 

1512 covers = dns.rdatatype.NONE 

1513 rrset = self.message.find_rrset( 

1514 section, 

1515 name, 

1516 rdclass, 

1517 rdtype, 

1518 covers, 

1519 deleting, 

1520 True, 

1521 self.one_rr_per_rrset, 

1522 ) 

1523 if rd is not None: 

1524 rrset.add(rd, ttl) 

1525 

1526 def _make_message(self): 

1527 factory = _message_factory_from_opcode(self.opcode) 

1528 message = factory(id=self.id) 

1529 message.flags = self.flags 

1530 if self.edns >= 0: 

1531 message.use_edns(self.edns, self.ednsflags, self.payload) 

1532 if self.rcode: 

1533 message.set_rcode(self.rcode) 

1534 if self.origin: 

1535 message.origin = self.origin 

1536 return message 

1537 

1538 def read(self): 

1539 """Read a text format DNS message and build a dns.message.Message 

1540 object.""" 

1541 

1542 line_method = self._header_line 

1543 section_number = None 

1544 while 1: 

1545 token = self.tok.get(True, True) 

1546 if token.is_eol_or_eof(): 

1547 break 

1548 if token.is_comment(): 

1549 u = token.value.upper() 

1550 if u == "HEADER": 

1551 line_method = self._header_line 

1552 

1553 if self.message: 

1554 message = self.message 

1555 else: 

1556 # If we don't have a message, create one with the current 

1557 # opcode, so that we know which section names to parse. 

1558 message = self._make_message() 

1559 try: 

1560 section_number = message._section_enum.from_text(u) 

1561 # We found a section name. If we don't have a message, 

1562 # use the one we just created. 

1563 if not self.message: 

1564 self.message = message 

1565 self.one_rr_per_rrset = message._get_one_rr_per_rrset( 

1566 self.one_rr_per_rrset 

1567 ) 

1568 if section_number == MessageSection.QUESTION: 

1569 line_method = self._question_line 

1570 else: 

1571 line_method = self._rr_line 

1572 except Exception: 

1573 # It's just a comment. 

1574 pass 

1575 self.tok.get_eol() 

1576 continue 

1577 self.tok.unget(token) 

1578 line_method(section_number) 

1579 if not self.message: 

1580 self.message = self._make_message() 

1581 return self.message 

1582 

1583 

1584def from_text( 

1585 text: str, 

1586 idna_codec: Optional[dns.name.IDNACodec] = None, 

1587 one_rr_per_rrset: bool = False, 

1588 origin: Optional[dns.name.Name] = None, 

1589 relativize: bool = True, 

1590 relativize_to: Optional[dns.name.Name] = None, 

1591) -> Message: 

1592 """Convert the text format message into a message object. 

1593 

1594 The reader stops after reading the first blank line in the input to 

1595 facilitate reading multiple messages from a single file with 

1596 ``dns.message.from_file()``. 

1597 

1598 *text*, a ``str``, the text format message. 

1599 

1600 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA 

1601 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder 

1602 is used. 

1603 

1604 *one_rr_per_rrset*, a ``bool``. If ``True``, then each RR is put 

1605 into its own rrset. The default is ``False``. 

1606 

1607 *origin*, a ``dns.name.Name`` (or ``None``), the 

1608 origin to use for relative names. 

1609 

1610 *relativize*, a ``bool``. If true, name will be relativized. 

1611 

1612 *relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use 

1613 when relativizing names. If not set, the *origin* value will be used. 

1614 

1615 Raises ``dns.message.UnknownHeaderField`` if a header is unknown. 

1616 

1617 Raises ``dns.exception.SyntaxError`` if the text is badly formed. 

1618 

1619 Returns a ``dns.message.Message object`` 

1620 """ 

1621 

1622 # 'text' can also be a file, but we don't publish that fact 

1623 # since it's an implementation detail. The official file 

1624 # interface is from_file(). 

1625 

1626 reader = _TextReader( 

1627 text, idna_codec, one_rr_per_rrset, origin, relativize, relativize_to 

1628 ) 

1629 return reader.read() 

1630 

1631 

1632def from_file( 

1633 f: Any, 

1634 idna_codec: Optional[dns.name.IDNACodec] = None, 

1635 one_rr_per_rrset: bool = False, 

1636) -> Message: 

1637 """Read the next text format message from the specified file. 

1638 

1639 Message blocks are separated by a single blank line. 

1640 

1641 *f*, a ``file`` or ``str``. If *f* is text, it is treated as the 

1642 pathname of a file to open. 

1643 

1644 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA 

1645 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder 

1646 is used. 

1647 

1648 *one_rr_per_rrset*, a ``bool``. If ``True``, then each RR is put 

1649 into its own rrset. The default is ``False``. 

1650 

1651 Raises ``dns.message.UnknownHeaderField`` if a header is unknown. 

1652 

1653 Raises ``dns.exception.SyntaxError`` if the text is badly formed. 

1654 

1655 Returns a ``dns.message.Message object`` 

1656 """ 

1657 

1658 if isinstance(f, str): 

1659 cm: contextlib.AbstractContextManager = open(f) 

1660 else: 

1661 cm = contextlib.nullcontext(f) 

1662 with cm as f: 

1663 return from_text(f, idna_codec, one_rr_per_rrset) 

1664 assert False # for mypy lgtm[py/unreachable-statement] 

1665 

1666 

1667def make_query( 

1668 qname: Union[dns.name.Name, str], 

1669 rdtype: Union[dns.rdatatype.RdataType, str], 

1670 rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, 

1671 use_edns: Optional[Union[int, bool]] = None, 

1672 want_dnssec: bool = False, 

1673 ednsflags: Optional[int] = None, 

1674 payload: Optional[int] = None, 

1675 request_payload: Optional[int] = None, 

1676 options: Optional[List[dns.edns.Option]] = None, 

1677 idna_codec: Optional[dns.name.IDNACodec] = None, 

1678 id: Optional[int] = None, 

1679 flags: int = dns.flags.RD, 

1680 pad: int = 0, 

1681) -> QueryMessage: 

1682 """Make a query message. 

1683 

1684 The query name, type, and class may all be specified either 

1685 as objects of the appropriate type, or as strings. 

1686 

1687 The query will have a randomly chosen query id, and its DNS flags 

1688 will be set to dns.flags.RD. 

1689 

1690 qname, a ``dns.name.Name`` or ``str``, the query name. 

1691 

1692 *rdtype*, an ``int`` or ``str``, the desired rdata type. 

1693 

1694 *rdclass*, an ``int`` or ``str``, the desired rdata class; the default 

1695 is class IN. 

1696 

1697 *use_edns*, an ``int``, ``bool`` or ``None``. The EDNS level to use; the 

1698 default is ``None``. If ``None``, EDNS will be enabled only if other 

1699 parameters (*ednsflags*, *payload*, *request_payload*, or *options*) are 

1700 set. 

1701 See the description of dns.message.Message.use_edns() for the possible 

1702 values for use_edns and their meanings. 

1703 

1704 *want_dnssec*, a ``bool``. If ``True``, DNSSEC data is desired. 

1705 

1706 *ednsflags*, an ``int``, the EDNS flag values. 

1707 

1708 *payload*, an ``int``, is the EDNS sender's payload field, which is the 

1709 maximum size of UDP datagram the sender can handle. I.e. how big 

1710 a response to this message can be. 

1711 

1712 *request_payload*, an ``int``, is the EDNS payload size to use when 

1713 sending this message. If not specified, defaults to the value of 

1714 *payload*. 

1715 

1716 *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS 

1717 options. 

1718 

1719 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA 

1720 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder 

1721 is used. 

1722 

1723 *id*, an ``int`` or ``None``, the desired query id. The default is 

1724 ``None``, which generates a random query id. 

1725 

1726 *flags*, an ``int``, the desired query flags. The default is 

1727 ``dns.flags.RD``. 

1728 

1729 *pad*, a non-negative ``int``. If 0, the default, do not pad; otherwise add 

1730 padding bytes to make the message size a multiple of *pad*. Note that if 

1731 padding is non-zero, an EDNS PADDING option will always be added to the 

1732 message. 

1733 

1734 Returns a ``dns.message.QueryMessage`` 

1735 """ 

1736 

1737 if isinstance(qname, str): 

1738 qname = dns.name.from_text(qname, idna_codec=idna_codec) 

1739 rdtype = dns.rdatatype.RdataType.make(rdtype) 

1740 rdclass = dns.rdataclass.RdataClass.make(rdclass) 

1741 m = QueryMessage(id=id) 

1742 m.flags = dns.flags.Flag(flags) 

1743 m.find_rrset(m.question, qname, rdclass, rdtype, create=True, force_unique=True) 

1744 # only pass keywords on to use_edns if they have been set to a 

1745 # non-None value. Setting a field will turn EDNS on if it hasn't 

1746 # been configured. 

1747 kwargs: Dict[str, Any] = {} 

1748 if ednsflags is not None: 

1749 kwargs["ednsflags"] = ednsflags 

1750 if payload is not None: 

1751 kwargs["payload"] = payload 

1752 if request_payload is not None: 

1753 kwargs["request_payload"] = request_payload 

1754 if options is not None: 

1755 kwargs["options"] = options 

1756 if kwargs and use_edns is None: 

1757 use_edns = 0 

1758 kwargs["edns"] = use_edns 

1759 kwargs["pad"] = pad 

1760 m.use_edns(**kwargs) 

1761 m.want_dnssec(want_dnssec) 

1762 return m 

1763 

1764 

1765def make_response( 

1766 query: Message, 

1767 recursion_available: bool = False, 

1768 our_payload: int = 8192, 

1769 fudge: int = 300, 

1770 tsig_error: int = 0, 

1771) -> Message: 

1772 """Make a message which is a response for the specified query. 

1773 The message returned is really a response skeleton; it has all 

1774 of the infrastructure required of a response, but none of the 

1775 content. 

1776 

1777 The response's question section is a shallow copy of the query's 

1778 question section, so the query's question RRsets should not be 

1779 changed. 

1780 

1781 *query*, a ``dns.message.Message``, the query to respond to. 

1782 

1783 *recursion_available*, a ``bool``, should RA be set in the response? 

1784 

1785 *our_payload*, an ``int``, the payload size to advertise in EDNS 

1786 responses. 

1787 

1788 *fudge*, an ``int``, the TSIG time fudge. 

1789 

1790 *tsig_error*, an ``int``, the TSIG error. 

1791 

1792 Returns a ``dns.message.Message`` object whose specific class is 

1793 appropriate for the query. For example, if query is a 

1794 ``dns.update.UpdateMessage``, response will be too. 

1795 """ 

1796 

1797 if query.flags & dns.flags.QR: 

1798 raise dns.exception.FormError("specified query message is not a query") 

1799 factory = _message_factory_from_opcode(query.opcode()) 

1800 response = factory(id=query.id) 

1801 response.flags = dns.flags.QR | (query.flags & dns.flags.RD) 

1802 if recursion_available: 

1803 response.flags |= dns.flags.RA 

1804 response.set_opcode(query.opcode()) 

1805 response.question = list(query.question) 

1806 if query.edns >= 0: 

1807 response.use_edns(0, 0, our_payload, query.payload) 

1808 if query.had_tsig: 

1809 response.use_tsig( 

1810 query.keyring, 

1811 query.keyname, 

1812 fudge, 

1813 None, 

1814 tsig_error, 

1815 b"", 

1816 query.keyalgorithm, 

1817 ) 

1818 response.request_mac = query.mac 

1819 return response 

1820 

1821 

1822### BEGIN generated MessageSection constants 

1823 

1824QUESTION = MessageSection.QUESTION 

1825ANSWER = MessageSection.ANSWER 

1826AUTHORITY = MessageSection.AUTHORITY 

1827ADDITIONAL = MessageSection.ADDITIONAL 

1828 

1829### END generated MessageSection constants