Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/rdata.py: 59%

379 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 07:09 +0000

1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 

2 

3# Copyright (C) 2001-2017 Nominum, Inc. 

4# 

5# Permission to use, copy, modify, and distribute this software and its 

6# documentation for any purpose with or without fee is hereby granted, 

7# provided that the above copyright notice and this permission notice 

8# appear in all copies. 

9# 

10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 

11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 

12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 

13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 

14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 

15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 

16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 

17 

18"""DNS rdata.""" 

19 

20import base64 

21import binascii 

22import inspect 

23import io 

24import itertools 

25import random 

26from importlib import import_module 

27from typing import Any, Dict, Optional, Tuple, Union 

28 

29import dns.exception 

30import dns.immutable 

31import dns.ipv4 

32import dns.ipv6 

33import dns.name 

34import dns.rdataclass 

35import dns.rdatatype 

36import dns.tokenizer 

37import dns.ttl 

38import dns.wire 

39 

40_chunksize = 32 

41 

42# We currently allow comparisons for rdata with relative names for backwards 

43# compatibility, but in the future we will not, as these kinds of comparisons 

44# can lead to subtle bugs if code is not carefully written. 

45# 

46# This switch allows the future behavior to be turned on so code can be 

47# tested with it. 

48_allow_relative_comparisons = True 

49 

50 

51class NoRelativeRdataOrdering(dns.exception.DNSException): 

52 """An attempt was made to do an ordered comparison of one or more 

53 rdata with relative names. The only reliable way of sorting rdata 

54 is to use non-relativized rdata. 

55 

56 """ 

57 

58 

59def _wordbreak(data, chunksize=_chunksize, separator=b" "): 

60 """Break a binary string into chunks of chunksize characters separated by 

61 a space. 

62 """ 

63 

64 if not chunksize: 

65 return data.decode() 

66 return separator.join( 

67 [data[i : i + chunksize] for i in range(0, len(data), chunksize)] 

68 ).decode() 

69 

70 

71# pylint: disable=unused-argument 

72 

73 

74def _hexify(data, chunksize=_chunksize, separator=b" ", **kw): 

75 """Convert a binary string into its hex encoding, broken up into chunks 

76 of chunksize characters separated by a separator. 

77 """ 

78 

79 return _wordbreak(binascii.hexlify(data), chunksize, separator) 

80 

81 

82def _base64ify(data, chunksize=_chunksize, separator=b" ", **kw): 

83 """Convert a binary string into its base64 encoding, broken up into chunks 

84 of chunksize characters separated by a separator. 

85 """ 

86 

87 return _wordbreak(base64.b64encode(data), chunksize, separator) 

88 

89 

90# pylint: enable=unused-argument 

91 

92__escaped = b'"\\' 

93 

94 

95def _escapify(qstring): 

96 """Escape the characters in a quoted string which need it.""" 

97 

98 if isinstance(qstring, str): 

99 qstring = qstring.encode() 

100 if not isinstance(qstring, bytearray): 

101 qstring = bytearray(qstring) 

102 

103 text = "" 

104 for c in qstring: 

105 if c in __escaped: 

106 text += "\\" + chr(c) 

107 elif c >= 0x20 and c < 0x7F: 

108 text += chr(c) 

109 else: 

110 text += "\\%03d" % c 

111 return text 

112 

113 

114def _truncate_bitmap(what): 

115 """Determine the index of greatest byte that isn't all zeros, and 

116 return the bitmap that contains all the bytes less than that index. 

117 """ 

118 

119 for i in range(len(what) - 1, -1, -1): 

120 if what[i] != 0: 

121 return what[0 : i + 1] 

122 return what[0:1] 

123 

124 

125# So we don't have to edit all the rdata classes... 

126_constify = dns.immutable.constify 

127 

128 

129@dns.immutable.immutable 

130class Rdata: 

131 """Base class for all DNS rdata types.""" 

132 

133 __slots__ = ["rdclass", "rdtype", "rdcomment"] 

134 

135 def __init__(self, rdclass, rdtype): 

136 """Initialize an rdata. 

137 

138 *rdclass*, an ``int`` is the rdataclass of the Rdata. 

139 

140 *rdtype*, an ``int`` is the rdatatype of the Rdata. 

141 """ 

142 

143 self.rdclass = self._as_rdataclass(rdclass) 

144 self.rdtype = self._as_rdatatype(rdtype) 

145 self.rdcomment = None 

146 

147 def _get_all_slots(self): 

148 return itertools.chain.from_iterable( 

149 getattr(cls, "__slots__", []) for cls in self.__class__.__mro__ 

150 ) 

151 

152 def __getstate__(self): 

153 # We used to try to do a tuple of all slots here, but it 

154 # doesn't work as self._all_slots isn't available at 

155 # __setstate__() time. Before that we tried to store a tuple 

156 # of __slots__, but that didn't work as it didn't store the 

157 # slots defined by ancestors. This older way didn't fail 

158 # outright, but ended up with partially broken objects, e.g. 

159 # if you unpickled an A RR it wouldn't have rdclass and rdtype 

160 # attributes, and would compare badly. 

161 state = {} 

162 for slot in self._get_all_slots(): 

163 state[slot] = getattr(self, slot) 

164 return state 

165 

166 def __setstate__(self, state): 

167 for slot, val in state.items(): 

168 object.__setattr__(self, slot, val) 

169 if not hasattr(self, "rdcomment"): 

170 # Pickled rdata from 2.0.x might not have a rdcomment, so add 

171 # it if needed. 

172 object.__setattr__(self, "rdcomment", None) 

173 

174 def covers(self) -> dns.rdatatype.RdataType: 

175 """Return the type a Rdata covers. 

176 

177 DNS SIG/RRSIG rdatas apply to a specific type; this type is 

178 returned by the covers() function. If the rdata type is not 

179 SIG or RRSIG, dns.rdatatype.NONE is returned. This is useful when 

180 creating rdatasets, allowing the rdataset to contain only RRSIGs 

181 of a particular type, e.g. RRSIG(NS). 

182 

183 Returns a ``dns.rdatatype.RdataType``. 

184 """ 

185 

186 return dns.rdatatype.NONE 

187 

188 def extended_rdatatype(self) -> int: 

189 """Return a 32-bit type value, the least significant 16 bits of 

190 which are the ordinary DNS type, and the upper 16 bits of which are 

191 the "covered" type, if any. 

192 

193 Returns an ``int``. 

194 """ 

195 

196 return self.covers() << 16 | self.rdtype 

197 

198 def to_text( 

199 self, 

200 origin: Optional[dns.name.Name] = None, 

201 relativize: bool = True, 

202 **kw: Dict[str, Any] 

203 ) -> str: 

204 """Convert an rdata to text format. 

205 

206 Returns a ``str``. 

207 """ 

208 

209 raise NotImplementedError # pragma: no cover 

210 

211 def _to_wire( 

212 self, 

213 file: Optional[Any], 

214 compress: Optional[dns.name.CompressType] = None, 

215 origin: Optional[dns.name.Name] = None, 

216 canonicalize: bool = False, 

217 ) -> bytes: 

218 raise NotImplementedError # pragma: no cover 

219 

220 def to_wire( 

221 self, 

222 file: Optional[Any] = None, 

223 compress: Optional[dns.name.CompressType] = None, 

224 origin: Optional[dns.name.Name] = None, 

225 canonicalize: bool = False, 

226 ) -> bytes: 

227 """Convert an rdata to wire format. 

228 

229 Returns a ``bytes`` or ``None``. 

230 """ 

231 

232 if file: 

233 return self._to_wire(file, compress, origin, canonicalize) 

234 else: 

235 f = io.BytesIO() 

236 self._to_wire(f, compress, origin, canonicalize) 

237 return f.getvalue() 

238 

239 def to_generic( 

240 self, origin: Optional[dns.name.Name] = None 

241 ) -> "dns.rdata.GenericRdata": 

242 """Creates a dns.rdata.GenericRdata equivalent of this rdata. 

243 

244 Returns a ``dns.rdata.GenericRdata``. 

245 """ 

246 return dns.rdata.GenericRdata( 

247 self.rdclass, self.rdtype, self.to_wire(origin=origin) 

248 ) 

249 

250 def to_digestable(self, origin: Optional[dns.name.Name] = None) -> bytes: 

251 """Convert rdata to a format suitable for digesting in hashes. This 

252 is also the DNSSEC canonical form. 

253 

254 Returns a ``bytes``. 

255 """ 

256 

257 return self.to_wire(origin=origin, canonicalize=True) 

258 

259 def __repr__(self): 

260 covers = self.covers() 

261 if covers == dns.rdatatype.NONE: 

262 ctext = "" 

263 else: 

264 ctext = "(" + dns.rdatatype.to_text(covers) + ")" 

265 return ( 

266 "<DNS " 

267 + dns.rdataclass.to_text(self.rdclass) 

268 + " " 

269 + dns.rdatatype.to_text(self.rdtype) 

270 + ctext 

271 + " rdata: " 

272 + str(self) 

273 + ">" 

274 ) 

275 

276 def __str__(self): 

277 return self.to_text() 

278 

279 def _cmp(self, other): 

280 """Compare an rdata with another rdata of the same rdtype and 

281 rdclass. 

282 

283 For rdata with only absolute names: 

284 Return < 0 if self < other in the DNSSEC ordering, 0 if self 

285 == other, and > 0 if self > other. 

286 For rdata with at least one relative names: 

287 The rdata sorts before any rdata with only absolute names. 

288 When compared with another relative rdata, all names are 

289 made absolute as if they were relative to the root, as the 

290 proper origin is not available. While this creates a stable 

291 ordering, it is NOT guaranteed to be the DNSSEC ordering. 

292 In the future, all ordering comparisons for rdata with 

293 relative names will be disallowed. 

294 """ 

295 try: 

296 our = self.to_digestable() 

297 our_relative = False 

298 except dns.name.NeedAbsoluteNameOrOrigin: 

299 if _allow_relative_comparisons: 

300 our = self.to_digestable(dns.name.root) 

301 our_relative = True 

302 try: 

303 their = other.to_digestable() 

304 their_relative = False 

305 except dns.name.NeedAbsoluteNameOrOrigin: 

306 if _allow_relative_comparisons: 

307 their = other.to_digestable(dns.name.root) 

308 their_relative = True 

309 if _allow_relative_comparisons: 

310 if our_relative != their_relative: 

311 # For the purpose of comparison, all rdata with at least one 

312 # relative name is less than an rdata with only absolute names. 

313 if our_relative: 

314 return -1 

315 else: 

316 return 1 

317 elif our_relative or their_relative: 

318 raise NoRelativeRdataOrdering 

319 if our == their: 

320 return 0 

321 elif our > their: 

322 return 1 

323 else: 

324 return -1 

325 

326 def __eq__(self, other): 

327 if not isinstance(other, Rdata): 

328 return False 

329 if self.rdclass != other.rdclass or self.rdtype != other.rdtype: 

330 return False 

331 our_relative = False 

332 their_relative = False 

333 try: 

334 our = self.to_digestable() 

335 except dns.name.NeedAbsoluteNameOrOrigin: 

336 our = self.to_digestable(dns.name.root) 

337 our_relative = True 

338 try: 

339 their = other.to_digestable() 

340 except dns.name.NeedAbsoluteNameOrOrigin: 

341 their = other.to_digestable(dns.name.root) 

342 their_relative = True 

343 if our_relative != their_relative: 

344 return False 

345 return our == their 

346 

347 def __ne__(self, other): 

348 if not isinstance(other, Rdata): 

349 return True 

350 if self.rdclass != other.rdclass or self.rdtype != other.rdtype: 

351 return True 

352 return not self.__eq__(other) 

353 

354 def __lt__(self, other): 

355 if ( 

356 not isinstance(other, Rdata) 

357 or self.rdclass != other.rdclass 

358 or self.rdtype != other.rdtype 

359 ): 

360 return NotImplemented 

361 return self._cmp(other) < 0 

362 

363 def __le__(self, other): 

364 if ( 

365 not isinstance(other, Rdata) 

366 or self.rdclass != other.rdclass 

367 or self.rdtype != other.rdtype 

368 ): 

369 return NotImplemented 

370 return self._cmp(other) <= 0 

371 

372 def __ge__(self, other): 

373 if ( 

374 not isinstance(other, Rdata) 

375 or self.rdclass != other.rdclass 

376 or self.rdtype != other.rdtype 

377 ): 

378 return NotImplemented 

379 return self._cmp(other) >= 0 

380 

381 def __gt__(self, other): 

382 if ( 

383 not isinstance(other, Rdata) 

384 or self.rdclass != other.rdclass 

385 or self.rdtype != other.rdtype 

386 ): 

387 return NotImplemented 

388 return self._cmp(other) > 0 

389 

390 def __hash__(self): 

391 return hash(self.to_digestable(dns.name.root)) 

392 

393 @classmethod 

394 def from_text( 

395 cls, 

396 rdclass: dns.rdataclass.RdataClass, 

397 rdtype: dns.rdatatype.RdataType, 

398 tok: dns.tokenizer.Tokenizer, 

399 origin: Optional[dns.name.Name] = None, 

400 relativize: bool = True, 

401 relativize_to: Optional[dns.name.Name] = None, 

402 ) -> "Rdata": 

403 raise NotImplementedError # pragma: no cover 

404 

405 @classmethod 

406 def from_wire_parser( 

407 cls, 

408 rdclass: dns.rdataclass.RdataClass, 

409 rdtype: dns.rdatatype.RdataType, 

410 parser: dns.wire.Parser, 

411 origin: Optional[dns.name.Name] = None, 

412 ) -> "Rdata": 

413 raise NotImplementedError # pragma: no cover 

414 

415 def replace(self, **kwargs: Any) -> "Rdata": 

416 """ 

417 Create a new Rdata instance based on the instance replace was 

418 invoked on. It is possible to pass different parameters to 

419 override the corresponding properties of the base Rdata. 

420 

421 Any field specific to the Rdata type can be replaced, but the 

422 *rdtype* and *rdclass* fields cannot. 

423 

424 Returns an instance of the same Rdata subclass as *self*. 

425 """ 

426 

427 # Get the constructor parameters. 

428 parameters = inspect.signature(self.__init__).parameters # type: ignore 

429 

430 # Ensure that all of the arguments correspond to valid fields. 

431 # Don't allow rdclass or rdtype to be changed, though. 

432 for key in kwargs: 

433 if key == "rdcomment": 

434 continue 

435 if key not in parameters: 

436 raise AttributeError( 

437 "'{}' object has no attribute '{}'".format( 

438 self.__class__.__name__, key 

439 ) 

440 ) 

441 if key in ("rdclass", "rdtype"): 

442 raise AttributeError( 

443 "Cannot overwrite '{}' attribute '{}'".format( 

444 self.__class__.__name__, key 

445 ) 

446 ) 

447 

448 # Construct the parameter list. For each field, use the value in 

449 # kwargs if present, and the current value otherwise. 

450 args = (kwargs.get(key, getattr(self, key)) for key in parameters) 

451 

452 # Create, validate, and return the new object. 

453 rd = self.__class__(*args) 

454 # The comment is not set in the constructor, so give it special 

455 # handling. 

456 rdcomment = kwargs.get("rdcomment", self.rdcomment) 

457 if rdcomment is not None: 

458 object.__setattr__(rd, "rdcomment", rdcomment) 

459 return rd 

460 

461 # Type checking and conversion helpers. These are class methods as 

462 # they don't touch object state and may be useful to others. 

463 

464 @classmethod 

465 def _as_rdataclass(cls, value): 

466 return dns.rdataclass.RdataClass.make(value) 

467 

468 @classmethod 

469 def _as_rdatatype(cls, value): 

470 return dns.rdatatype.RdataType.make(value) 

471 

472 @classmethod 

473 def _as_bytes( 

474 cls, 

475 value: Any, 

476 encode: bool = False, 

477 max_length: Optional[int] = None, 

478 empty_ok: bool = True, 

479 ) -> bytes: 

480 if encode and isinstance(value, str): 

481 bvalue = value.encode() 

482 elif isinstance(value, bytearray): 

483 bvalue = bytes(value) 

484 elif isinstance(value, bytes): 

485 bvalue = value 

486 else: 

487 raise ValueError("not bytes") 

488 if max_length is not None and len(bvalue) > max_length: 

489 raise ValueError("too long") 

490 if not empty_ok and len(bvalue) == 0: 

491 raise ValueError("empty bytes not allowed") 

492 return bvalue 

493 

494 @classmethod 

495 def _as_name(cls, value): 

496 # Note that proper name conversion (e.g. with origin and IDNA 

497 # awareness) is expected to be done via from_text. This is just 

498 # a simple thing for people invoking the constructor directly. 

499 if isinstance(value, str): 

500 return dns.name.from_text(value) 

501 elif not isinstance(value, dns.name.Name): 

502 raise ValueError("not a name") 

503 return value 

504 

505 @classmethod 

506 def _as_uint8(cls, value): 

507 if not isinstance(value, int): 

508 raise ValueError("not an integer") 

509 if value < 0 or value > 255: 

510 raise ValueError("not a uint8") 

511 return value 

512 

513 @classmethod 

514 def _as_uint16(cls, value): 

515 if not isinstance(value, int): 

516 raise ValueError("not an integer") 

517 if value < 0 or value > 65535: 

518 raise ValueError("not a uint16") 

519 return value 

520 

521 @classmethod 

522 def _as_uint32(cls, value): 

523 if not isinstance(value, int): 

524 raise ValueError("not an integer") 

525 if value < 0 or value > 4294967295: 

526 raise ValueError("not a uint32") 

527 return value 

528 

529 @classmethod 

530 def _as_uint48(cls, value): 

531 if not isinstance(value, int): 

532 raise ValueError("not an integer") 

533 if value < 0 or value > 281474976710655: 

534 raise ValueError("not a uint48") 

535 return value 

536 

537 @classmethod 

538 def _as_int(cls, value, low=None, high=None): 

539 if not isinstance(value, int): 

540 raise ValueError("not an integer") 

541 if low is not None and value < low: 

542 raise ValueError("value too small") 

543 if high is not None and value > high: 

544 raise ValueError("value too large") 

545 return value 

546 

547 @classmethod 

548 def _as_ipv4_address(cls, value): 

549 if isinstance(value, str): 

550 # call to check validity 

551 dns.ipv4.inet_aton(value) 

552 return value 

553 elif isinstance(value, bytes): 

554 return dns.ipv4.inet_ntoa(value) 

555 else: 

556 raise ValueError("not an IPv4 address") 

557 

558 @classmethod 

559 def _as_ipv6_address(cls, value): 

560 if isinstance(value, str): 

561 # call to check validity 

562 dns.ipv6.inet_aton(value) 

563 return value 

564 elif isinstance(value, bytes): 

565 return dns.ipv6.inet_ntoa(value) 

566 else: 

567 raise ValueError("not an IPv6 address") 

568 

569 @classmethod 

570 def _as_bool(cls, value): 

571 if isinstance(value, bool): 

572 return value 

573 else: 

574 raise ValueError("not a boolean") 

575 

576 @classmethod 

577 def _as_ttl(cls, value): 

578 if isinstance(value, int): 

579 return cls._as_int(value, 0, dns.ttl.MAX_TTL) 

580 elif isinstance(value, str): 

581 return dns.ttl.from_text(value) 

582 else: 

583 raise ValueError("not a TTL") 

584 

585 @classmethod 

586 def _as_tuple(cls, value, as_value): 

587 try: 

588 # For user convenience, if value is a singleton of the list 

589 # element type, wrap it in a tuple. 

590 return (as_value(value),) 

591 except Exception: 

592 # Otherwise, check each element of the iterable *value* 

593 # against *as_value*. 

594 return tuple(as_value(v) for v in value) 

595 

596 # Processing order 

597 

598 @classmethod 

599 def _processing_order(cls, iterable): 

600 items = list(iterable) 

601 random.shuffle(items) 

602 return items 

603 

604 

605@dns.immutable.immutable 

606class GenericRdata(Rdata): 

607 

608 """Generic Rdata Class 

609 

610 This class is used for rdata types for which we have no better 

611 implementation. It implements the DNS "unknown RRs" scheme. 

612 """ 

613 

614 __slots__ = ["data"] 

615 

616 def __init__(self, rdclass, rdtype, data): 

617 super().__init__(rdclass, rdtype) 

618 self.data = data 

619 

620 def to_text( 

621 self, 

622 origin: Optional[dns.name.Name] = None, 

623 relativize: bool = True, 

624 **kw: Dict[str, Any] 

625 ) -> str: 

626 return r"\# %d " % len(self.data) + _hexify(self.data, **kw) 

627 

628 @classmethod 

629 def from_text( 

630 cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None 

631 ): 

632 token = tok.get() 

633 if not token.is_identifier() or token.value != r"\#": 

634 raise dns.exception.SyntaxError(r"generic rdata does not start with \#") 

635 length = tok.get_int() 

636 hex = tok.concatenate_remaining_identifiers(True).encode() 

637 data = binascii.unhexlify(hex) 

638 if len(data) != length: 

639 raise dns.exception.SyntaxError("generic rdata hex data has wrong length") 

640 return cls(rdclass, rdtype, data) 

641 

642 def _to_wire(self, file, compress=None, origin=None, canonicalize=False): 

643 file.write(self.data) 

644 

645 @classmethod 

646 def from_wire_parser(cls, rdclass, rdtype, parser, origin=None): 

647 return cls(rdclass, rdtype, parser.get_remaining()) 

648 

649 

650_rdata_classes: Dict[ 

651 Tuple[dns.rdataclass.RdataClass, dns.rdatatype.RdataType], Any 

652] = {} 

653_module_prefix = "dns.rdtypes" 

654 

655 

656def get_rdata_class(rdclass, rdtype): 

657 cls = _rdata_classes.get((rdclass, rdtype)) 

658 if not cls: 

659 cls = _rdata_classes.get((dns.rdatatype.ANY, rdtype)) 

660 if not cls: 

661 rdclass_text = dns.rdataclass.to_text(rdclass) 

662 rdtype_text = dns.rdatatype.to_text(rdtype) 

663 rdtype_text = rdtype_text.replace("-", "_") 

664 try: 

665 mod = import_module( 

666 ".".join([_module_prefix, rdclass_text, rdtype_text]) 

667 ) 

668 cls = getattr(mod, rdtype_text) 

669 _rdata_classes[(rdclass, rdtype)] = cls 

670 except ImportError: 

671 try: 

672 mod = import_module(".".join([_module_prefix, "ANY", rdtype_text])) 

673 cls = getattr(mod, rdtype_text) 

674 _rdata_classes[(dns.rdataclass.ANY, rdtype)] = cls 

675 _rdata_classes[(rdclass, rdtype)] = cls 

676 except ImportError: 

677 pass 

678 if not cls: 

679 cls = GenericRdata 

680 _rdata_classes[(rdclass, rdtype)] = cls 

681 return cls 

682 

683 

684def from_text( 

685 rdclass: Union[dns.rdataclass.RdataClass, str], 

686 rdtype: Union[dns.rdatatype.RdataType, str], 

687 tok: Union[dns.tokenizer.Tokenizer, str], 

688 origin: Optional[dns.name.Name] = None, 

689 relativize: bool = True, 

690 relativize_to: Optional[dns.name.Name] = None, 

691 idna_codec: Optional[dns.name.IDNACodec] = None, 

692) -> Rdata: 

693 """Build an rdata object from text format. 

694 

695 This function attempts to dynamically load a class which 

696 implements the specified rdata class and type. If there is no 

697 class-and-type-specific implementation, the GenericRdata class 

698 is used. 

699 

700 Once a class is chosen, its from_text() class method is called 

701 with the parameters to this function. 

702 

703 If *tok* is a ``str``, then a tokenizer is created and the string 

704 is used as its input. 

705 

706 *rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass. 

707 

708 *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype. 

709 

710 *tok*, a ``dns.tokenizer.Tokenizer`` or a ``str``. 

711 

712 *origin*, a ``dns.name.Name`` (or ``None``), the 

713 origin to use for relative names. 

714 

715 *relativize*, a ``bool``. If true, name will be relativized. 

716 

717 *relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use 

718 when relativizing names. If not set, the *origin* value will be used. 

719 

720 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA 

721 encoder/decoder to use if a tokenizer needs to be created. If 

722 ``None``, the default IDNA 2003 encoder/decoder is used. If a 

723 tokenizer is not created, then the codec associated with the tokenizer 

724 is the one that is used. 

725 

726 Returns an instance of the chosen Rdata subclass. 

727 

728 """ 

729 if isinstance(tok, str): 

730 tok = dns.tokenizer.Tokenizer(tok, idna_codec=idna_codec) 

731 rdclass = dns.rdataclass.RdataClass.make(rdclass) 

732 rdtype = dns.rdatatype.RdataType.make(rdtype) 

733 cls = get_rdata_class(rdclass, rdtype) 

734 with dns.exception.ExceptionWrapper(dns.exception.SyntaxError): 

735 rdata = None 

736 if cls != GenericRdata: 

737 # peek at first token 

738 token = tok.get() 

739 tok.unget(token) 

740 if token.is_identifier() and token.value == r"\#": 

741 # 

742 # Known type using the generic syntax. Extract the 

743 # wire form from the generic syntax, and then run 

744 # from_wire on it. 

745 # 

746 grdata = GenericRdata.from_text( 

747 rdclass, rdtype, tok, origin, relativize, relativize_to 

748 ) 

749 rdata = from_wire( 

750 rdclass, rdtype, grdata.data, 0, len(grdata.data), origin 

751 ) 

752 # 

753 # If this comparison isn't equal, then there must have been 

754 # compressed names in the wire format, which is an error, 

755 # there being no reasonable context to decompress with. 

756 # 

757 rwire = rdata.to_wire() 

758 if rwire != grdata.data: 

759 raise dns.exception.SyntaxError( 

760 "compressed data in " 

761 "generic syntax form " 

762 "of known rdatatype" 

763 ) 

764 if rdata is None: 

765 rdata = cls.from_text( 

766 rdclass, rdtype, tok, origin, relativize, relativize_to 

767 ) 

768 token = tok.get_eol_as_token() 

769 if token.comment is not None: 

770 object.__setattr__(rdata, "rdcomment", token.comment) 

771 return rdata 

772 

773 

774def from_wire_parser( 

775 rdclass: Union[dns.rdataclass.RdataClass, str], 

776 rdtype: Union[dns.rdatatype.RdataType, str], 

777 parser: dns.wire.Parser, 

778 origin: Optional[dns.name.Name] = None, 

779) -> Rdata: 

780 """Build an rdata object from wire format 

781 

782 This function attempts to dynamically load a class which 

783 implements the specified rdata class and type. If there is no 

784 class-and-type-specific implementation, the GenericRdata class 

785 is used. 

786 

787 Once a class is chosen, its from_wire() class method is called 

788 with the parameters to this function. 

789 

790 *rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass. 

791 

792 *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype. 

793 

794 *parser*, a ``dns.wire.Parser``, the parser, which should be 

795 restricted to the rdata length. 

796 

797 *origin*, a ``dns.name.Name`` (or ``None``). If not ``None``, 

798 then names will be relativized to this origin. 

799 

800 Returns an instance of the chosen Rdata subclass. 

801 """ 

802 

803 rdclass = dns.rdataclass.RdataClass.make(rdclass) 

804 rdtype = dns.rdatatype.RdataType.make(rdtype) 

805 cls = get_rdata_class(rdclass, rdtype) 

806 with dns.exception.ExceptionWrapper(dns.exception.FormError): 

807 return cls.from_wire_parser(rdclass, rdtype, parser, origin) 

808 

809 

810def from_wire( 

811 rdclass: Union[dns.rdataclass.RdataClass, str], 

812 rdtype: Union[dns.rdatatype.RdataType, str], 

813 wire: bytes, 

814 current: int, 

815 rdlen: int, 

816 origin: Optional[dns.name.Name] = None, 

817) -> Rdata: 

818 """Build an rdata object from wire format 

819 

820 This function attempts to dynamically load a class which 

821 implements the specified rdata class and type. If there is no 

822 class-and-type-specific implementation, the GenericRdata class 

823 is used. 

824 

825 Once a class is chosen, its from_wire() class method is called 

826 with the parameters to this function. 

827 

828 *rdclass*, an ``int``, the rdataclass. 

829 

830 *rdtype*, an ``int``, the rdatatype. 

831 

832 *wire*, a ``bytes``, the wire-format message. 

833 

834 *current*, an ``int``, the offset in wire of the beginning of 

835 the rdata. 

836 

837 *rdlen*, an ``int``, the length of the wire-format rdata 

838 

839 *origin*, a ``dns.name.Name`` (or ``None``). If not ``None``, 

840 then names will be relativized to this origin. 

841 

842 Returns an instance of the chosen Rdata subclass. 

843 """ 

844 parser = dns.wire.Parser(wire, current) 

845 with parser.restrict_to(rdlen): 

846 return from_wire_parser(rdclass, rdtype, parser, origin) 

847 

848 

849class RdatatypeExists(dns.exception.DNSException): 

850 """DNS rdatatype already exists.""" 

851 

852 supp_kwargs = {"rdclass", "rdtype"} 

853 fmt = ( 

854 "The rdata type with class {rdclass:d} and rdtype {rdtype:d} " 

855 + "already exists." 

856 ) 

857 

858 

859def register_type( 

860 implementation: Any, 

861 rdtype: int, 

862 rdtype_text: str, 

863 is_singleton: bool = False, 

864 rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, 

865) -> None: 

866 """Dynamically register a module to handle an rdatatype. 

867 

868 *implementation*, a module implementing the type in the usual dnspython 

869 way. 

870 

871 *rdtype*, an ``int``, the rdatatype to register. 

872 

873 *rdtype_text*, a ``str``, the textual form of the rdatatype. 

874 

875 *is_singleton*, a ``bool``, indicating if the type is a singleton (i.e. 

876 RRsets of the type can have only one member.) 

877 

878 *rdclass*, the rdataclass of the type, or ``dns.rdataclass.ANY`` if 

879 it applies to all classes. 

880 """ 

881 

882 rdtype = dns.rdatatype.RdataType.make(rdtype) 

883 existing_cls = get_rdata_class(rdclass, rdtype) 

884 if existing_cls != GenericRdata or dns.rdatatype.is_metatype(rdtype): 

885 raise RdatatypeExists(rdclass=rdclass, rdtype=rdtype) 

886 _rdata_classes[(rdclass, rdtype)] = getattr( 

887 implementation, rdtype_text.replace("-", "_") 

888 ) 

889 dns.rdatatype.register_type(rdtype, rdtype_text, is_singleton)