Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/rdata.py: 47%

377 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-02 06:07 +0000

1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 

2 

3# Copyright (C) 2001-2017 Nominum, Inc. 

4# 

5# Permission to use, copy, modify, and distribute this software and its 

6# documentation for any purpose with or without fee is hereby granted, 

7# provided that the above copyright notice and this permission notice 

8# appear in all copies. 

9# 

10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 

11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 

12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 

13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 

14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 

15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 

16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 

17 

18"""DNS rdata.""" 

19 

20import base64 

21import binascii 

22import inspect 

23import io 

24import itertools 

25import random 

26from importlib import import_module 

27from typing import Any, Dict, Optional, Tuple, Union 

28 

29import dns.exception 

30import dns.immutable 

31import dns.ipv4 

32import dns.ipv6 

33import dns.name 

34import dns.rdataclass 

35import dns.rdatatype 

36import dns.tokenizer 

37import dns.ttl 

38import dns.wire 

39 

40_chunksize = 32 

41 

42# We currently allow comparisons for rdata with relative names for backwards 

43# compatibility, but in the future we will not, as these kinds of comparisons 

44# can lead to subtle bugs if code is not carefully written. 

45# 

46# This switch allows the future behavior to be turned on so code can be 

47# tested with it. 

48_allow_relative_comparisons = True 

49 

50 

51class NoRelativeRdataOrdering(dns.exception.DNSException): 

52 """An attempt was made to do an ordered comparison of one or more 

53 rdata with relative names. The only reliable way of sorting rdata 

54 is to use non-relativized rdata. 

55 

56 """ 

57 

58 

59def _wordbreak(data, chunksize=_chunksize, separator=b" "): 

60 """Break a binary string into chunks of chunksize characters separated by 

61 a space. 

62 """ 

63 

64 if not chunksize: 

65 return data.decode() 

66 return separator.join( 

67 [data[i : i + chunksize] for i in range(0, len(data), chunksize)] 

68 ).decode() 

69 

70 

71# pylint: disable=unused-argument 

72 

73 

74def _hexify(data, chunksize=_chunksize, separator=b" ", **kw): 

75 """Convert a binary string into its hex encoding, broken up into chunks 

76 of chunksize characters separated by a separator. 

77 """ 

78 

79 return _wordbreak(binascii.hexlify(data), chunksize, separator) 

80 

81 

82def _base64ify(data, chunksize=_chunksize, separator=b" ", **kw): 

83 """Convert a binary string into its base64 encoding, broken up into chunks 

84 of chunksize characters separated by a separator. 

85 """ 

86 

87 return _wordbreak(base64.b64encode(data), chunksize, separator) 

88 

89 

90# pylint: enable=unused-argument 

91 

92__escaped = b'"\\' 

93 

94 

95def _escapify(qstring): 

96 """Escape the characters in a quoted string which need it.""" 

97 

98 if isinstance(qstring, str): 

99 qstring = qstring.encode() 

100 if not isinstance(qstring, bytearray): 

101 qstring = bytearray(qstring) 

102 

103 text = "" 

104 for c in qstring: 

105 if c in __escaped: 

106 text += "\\" + chr(c) 

107 elif c >= 0x20 and c < 0x7F: 

108 text += chr(c) 

109 else: 

110 text += "\\%03d" % c 

111 return text 

112 

113 

114def _truncate_bitmap(what): 

115 """Determine the index of greatest byte that isn't all zeros, and 

116 return the bitmap that contains all the bytes less than that index. 

117 """ 

118 

119 for i in range(len(what) - 1, -1, -1): 

120 if what[i] != 0: 

121 return what[0 : i + 1] 

122 return what[0:1] 

123 

124 

125# So we don't have to edit all the rdata classes... 

126_constify = dns.immutable.constify 

127 

128 

129@dns.immutable.immutable 

130class Rdata: 

131 """Base class for all DNS rdata types.""" 

132 

133 __slots__ = ["rdclass", "rdtype", "rdcomment"] 

134 

135 def __init__(self, rdclass, rdtype): 

136 """Initialize an rdata. 

137 

138 *rdclass*, an ``int`` is the rdataclass of the Rdata. 

139 

140 *rdtype*, an ``int`` is the rdatatype of the Rdata. 

141 """ 

142 

143 self.rdclass = self._as_rdataclass(rdclass) 

144 self.rdtype = self._as_rdatatype(rdtype) 

145 self.rdcomment = None 

146 

147 def _get_all_slots(self): 

148 return itertools.chain.from_iterable( 

149 getattr(cls, "__slots__", []) for cls in self.__class__.__mro__ 

150 ) 

151 

152 def __getstate__(self): 

153 # We used to try to do a tuple of all slots here, but it 

154 # doesn't work as self._all_slots isn't available at 

155 # __setstate__() time. Before that we tried to store a tuple 

156 # of __slots__, but that didn't work as it didn't store the 

157 # slots defined by ancestors. This older way didn't fail 

158 # outright, but ended up with partially broken objects, e.g. 

159 # if you unpickled an A RR it wouldn't have rdclass and rdtype 

160 # attributes, and would compare badly. 

161 state = {} 

162 for slot in self._get_all_slots(): 

163 state[slot] = getattr(self, slot) 

164 return state 

165 

166 def __setstate__(self, state): 

167 for slot, val in state.items(): 

168 object.__setattr__(self, slot, val) 

169 if not hasattr(self, "rdcomment"): 

170 # Pickled rdata from 2.0.x might not have a rdcomment, so add 

171 # it if needed. 

172 object.__setattr__(self, "rdcomment", None) 

173 

174 def covers(self) -> dns.rdatatype.RdataType: 

175 """Return the type a Rdata covers. 

176 

177 DNS SIG/RRSIG rdatas apply to a specific type; this type is 

178 returned by the covers() function. If the rdata type is not 

179 SIG or RRSIG, dns.rdatatype.NONE is returned. This is useful when 

180 creating rdatasets, allowing the rdataset to contain only RRSIGs 

181 of a particular type, e.g. RRSIG(NS). 

182 

183 Returns a ``dns.rdatatype.RdataType``. 

184 """ 

185 

186 return dns.rdatatype.NONE 

187 

188 def extended_rdatatype(self) -> int: 

189 """Return a 32-bit type value, the least significant 16 bits of 

190 which are the ordinary DNS type, and the upper 16 bits of which are 

191 the "covered" type, if any. 

192 

193 Returns an ``int``. 

194 """ 

195 

196 return self.covers() << 16 | self.rdtype 

197 

198 def to_text( 

199 self, 

200 origin: Optional[dns.name.Name] = None, 

201 relativize: bool = True, 

202 **kw: Dict[str, Any], 

203 ) -> str: 

204 """Convert an rdata to text format. 

205 

206 Returns a ``str``. 

207 """ 

208 

209 raise NotImplementedError # pragma: no cover 

210 

211 def _to_wire( 

212 self, 

213 file: Optional[Any], 

214 compress: Optional[dns.name.CompressType] = None, 

215 origin: Optional[dns.name.Name] = None, 

216 canonicalize: bool = False, 

217 ) -> bytes: 

218 raise NotImplementedError # pragma: no cover 

219 

220 def to_wire( 

221 self, 

222 file: Optional[Any] = None, 

223 compress: Optional[dns.name.CompressType] = None, 

224 origin: Optional[dns.name.Name] = None, 

225 canonicalize: bool = False, 

226 ) -> bytes: 

227 """Convert an rdata to wire format. 

228 

229 Returns a ``bytes`` or ``None``. 

230 """ 

231 

232 if file: 

233 return self._to_wire(file, compress, origin, canonicalize) 

234 else: 

235 f = io.BytesIO() 

236 self._to_wire(f, compress, origin, canonicalize) 

237 return f.getvalue() 

238 

239 def to_generic( 

240 self, origin: Optional[dns.name.Name] = None 

241 ) -> "dns.rdata.GenericRdata": 

242 """Creates a dns.rdata.GenericRdata equivalent of this rdata. 

243 

244 Returns a ``dns.rdata.GenericRdata``. 

245 """ 

246 return dns.rdata.GenericRdata( 

247 self.rdclass, self.rdtype, self.to_wire(origin=origin) 

248 ) 

249 

250 def to_digestable(self, origin: Optional[dns.name.Name] = None) -> bytes: 

251 """Convert rdata to a format suitable for digesting in hashes. This 

252 is also the DNSSEC canonical form. 

253 

254 Returns a ``bytes``. 

255 """ 

256 

257 return self.to_wire(origin=origin, canonicalize=True) 

258 

259 def __repr__(self): 

260 covers = self.covers() 

261 if covers == dns.rdatatype.NONE: 

262 ctext = "" 

263 else: 

264 ctext = "(" + dns.rdatatype.to_text(covers) + ")" 

265 return ( 

266 "<DNS " 

267 + dns.rdataclass.to_text(self.rdclass) 

268 + " " 

269 + dns.rdatatype.to_text(self.rdtype) 

270 + ctext 

271 + " rdata: " 

272 + str(self) 

273 + ">" 

274 ) 

275 

276 def __str__(self): 

277 return self.to_text() 

278 

279 def _cmp(self, other): 

280 """Compare an rdata with another rdata of the same rdtype and 

281 rdclass. 

282 

283 For rdata with only absolute names: 

284 Return < 0 if self < other in the DNSSEC ordering, 0 if self 

285 == other, and > 0 if self > other. 

286 For rdata with at least one relative names: 

287 The rdata sorts before any rdata with only absolute names. 

288 When compared with another relative rdata, all names are 

289 made absolute as if they were relative to the root, as the 

290 proper origin is not available. While this creates a stable 

291 ordering, it is NOT guaranteed to be the DNSSEC ordering. 

292 In the future, all ordering comparisons for rdata with 

293 relative names will be disallowed. 

294 """ 

295 try: 

296 our = self.to_digestable() 

297 our_relative = False 

298 except dns.name.NeedAbsoluteNameOrOrigin: 

299 if _allow_relative_comparisons: 

300 our = self.to_digestable(dns.name.root) 

301 our_relative = True 

302 try: 

303 their = other.to_digestable() 

304 their_relative = False 

305 except dns.name.NeedAbsoluteNameOrOrigin: 

306 if _allow_relative_comparisons: 

307 their = other.to_digestable(dns.name.root) 

308 their_relative = True 

309 if _allow_relative_comparisons: 

310 if our_relative != their_relative: 

311 # For the purpose of comparison, all rdata with at least one 

312 # relative name is less than an rdata with only absolute names. 

313 if our_relative: 

314 return -1 

315 else: 

316 return 1 

317 elif our_relative or their_relative: 

318 raise NoRelativeRdataOrdering 

319 if our == their: 

320 return 0 

321 elif our > their: 

322 return 1 

323 else: 

324 return -1 

325 

326 def __eq__(self, other): 

327 if not isinstance(other, Rdata): 

328 return False 

329 if self.rdclass != other.rdclass or self.rdtype != other.rdtype: 

330 return False 

331 our_relative = False 

332 their_relative = False 

333 try: 

334 our = self.to_digestable() 

335 except dns.name.NeedAbsoluteNameOrOrigin: 

336 our = self.to_digestable(dns.name.root) 

337 our_relative = True 

338 try: 

339 their = other.to_digestable() 

340 except dns.name.NeedAbsoluteNameOrOrigin: 

341 their = other.to_digestable(dns.name.root) 

342 their_relative = True 

343 if our_relative != their_relative: 

344 return False 

345 return our == their 

346 

347 def __ne__(self, other): 

348 if not isinstance(other, Rdata): 

349 return True 

350 if self.rdclass != other.rdclass or self.rdtype != other.rdtype: 

351 return True 

352 return not self.__eq__(other) 

353 

354 def __lt__(self, other): 

355 if ( 

356 not isinstance(other, Rdata) 

357 or self.rdclass != other.rdclass 

358 or self.rdtype != other.rdtype 

359 ): 

360 return NotImplemented 

361 return self._cmp(other) < 0 

362 

363 def __le__(self, other): 

364 if ( 

365 not isinstance(other, Rdata) 

366 or self.rdclass != other.rdclass 

367 or self.rdtype != other.rdtype 

368 ): 

369 return NotImplemented 

370 return self._cmp(other) <= 0 

371 

372 def __ge__(self, other): 

373 if ( 

374 not isinstance(other, Rdata) 

375 or self.rdclass != other.rdclass 

376 or self.rdtype != other.rdtype 

377 ): 

378 return NotImplemented 

379 return self._cmp(other) >= 0 

380 

381 def __gt__(self, other): 

382 if ( 

383 not isinstance(other, Rdata) 

384 or self.rdclass != other.rdclass 

385 or self.rdtype != other.rdtype 

386 ): 

387 return NotImplemented 

388 return self._cmp(other) > 0 

389 

390 def __hash__(self): 

391 return hash(self.to_digestable(dns.name.root)) 

392 

393 @classmethod 

394 def from_text( 

395 cls, 

396 rdclass: dns.rdataclass.RdataClass, 

397 rdtype: dns.rdatatype.RdataType, 

398 tok: dns.tokenizer.Tokenizer, 

399 origin: Optional[dns.name.Name] = None, 

400 relativize: bool = True, 

401 relativize_to: Optional[dns.name.Name] = None, 

402 ) -> "Rdata": 

403 raise NotImplementedError # pragma: no cover 

404 

405 @classmethod 

406 def from_wire_parser( 

407 cls, 

408 rdclass: dns.rdataclass.RdataClass, 

409 rdtype: dns.rdatatype.RdataType, 

410 parser: dns.wire.Parser, 

411 origin: Optional[dns.name.Name] = None, 

412 ) -> "Rdata": 

413 raise NotImplementedError # pragma: no cover 

414 

415 def replace(self, **kwargs: Any) -> "Rdata": 

416 """ 

417 Create a new Rdata instance based on the instance replace was 

418 invoked on. It is possible to pass different parameters to 

419 override the corresponding properties of the base Rdata. 

420 

421 Any field specific to the Rdata type can be replaced, but the 

422 *rdtype* and *rdclass* fields cannot. 

423 

424 Returns an instance of the same Rdata subclass as *self*. 

425 """ 

426 

427 # Get the constructor parameters. 

428 parameters = inspect.signature(self.__init__).parameters # type: ignore 

429 

430 # Ensure that all of the arguments correspond to valid fields. 

431 # Don't allow rdclass or rdtype to be changed, though. 

432 for key in kwargs: 

433 if key == "rdcomment": 

434 continue 

435 if key not in parameters: 

436 raise AttributeError( 

437 "'{}' object has no attribute '{}'".format( 

438 self.__class__.__name__, key 

439 ) 

440 ) 

441 if key in ("rdclass", "rdtype"): 

442 raise AttributeError( 

443 "Cannot overwrite '{}' attribute '{}'".format( 

444 self.__class__.__name__, key 

445 ) 

446 ) 

447 

448 # Construct the parameter list. For each field, use the value in 

449 # kwargs if present, and the current value otherwise. 

450 args = (kwargs.get(key, getattr(self, key)) for key in parameters) 

451 

452 # Create, validate, and return the new object. 

453 rd = self.__class__(*args) 

454 # The comment is not set in the constructor, so give it special 

455 # handling. 

456 rdcomment = kwargs.get("rdcomment", self.rdcomment) 

457 if rdcomment is not None: 

458 object.__setattr__(rd, "rdcomment", rdcomment) 

459 return rd 

460 

461 # Type checking and conversion helpers. These are class methods as 

462 # they don't touch object state and may be useful to others. 

463 

464 @classmethod 

465 def _as_rdataclass(cls, value): 

466 return dns.rdataclass.RdataClass.make(value) 

467 

468 @classmethod 

469 def _as_rdatatype(cls, value): 

470 return dns.rdatatype.RdataType.make(value) 

471 

472 @classmethod 

473 def _as_bytes( 

474 cls, 

475 value: Any, 

476 encode: bool = False, 

477 max_length: Optional[int] = None, 

478 empty_ok: bool = True, 

479 ) -> bytes: 

480 if encode and isinstance(value, str): 

481 bvalue = value.encode() 

482 elif isinstance(value, bytearray): 

483 bvalue = bytes(value) 

484 elif isinstance(value, bytes): 

485 bvalue = value 

486 else: 

487 raise ValueError("not bytes") 

488 if max_length is not None and len(bvalue) > max_length: 

489 raise ValueError("too long") 

490 if not empty_ok and len(bvalue) == 0: 

491 raise ValueError("empty bytes not allowed") 

492 return bvalue 

493 

494 @classmethod 

495 def _as_name(cls, value): 

496 # Note that proper name conversion (e.g. with origin and IDNA 

497 # awareness) is expected to be done via from_text. This is just 

498 # a simple thing for people invoking the constructor directly. 

499 if isinstance(value, str): 

500 return dns.name.from_text(value) 

501 elif not isinstance(value, dns.name.Name): 

502 raise ValueError("not a name") 

503 return value 

504 

505 @classmethod 

506 def _as_uint8(cls, value): 

507 if not isinstance(value, int): 

508 raise ValueError("not an integer") 

509 if value < 0 or value > 255: 

510 raise ValueError("not a uint8") 

511 return value 

512 

513 @classmethod 

514 def _as_uint16(cls, value): 

515 if not isinstance(value, int): 

516 raise ValueError("not an integer") 

517 if value < 0 or value > 65535: 

518 raise ValueError("not a uint16") 

519 return value 

520 

521 @classmethod 

522 def _as_uint32(cls, value): 

523 if not isinstance(value, int): 

524 raise ValueError("not an integer") 

525 if value < 0 or value > 4294967295: 

526 raise ValueError("not a uint32") 

527 return value 

528 

529 @classmethod 

530 def _as_uint48(cls, value): 

531 if not isinstance(value, int): 

532 raise ValueError("not an integer") 

533 if value < 0 or value > 281474976710655: 

534 raise ValueError("not a uint48") 

535 return value 

536 

537 @classmethod 

538 def _as_int(cls, value, low=None, high=None): 

539 if not isinstance(value, int): 

540 raise ValueError("not an integer") 

541 if low is not None and value < low: 

542 raise ValueError("value too small") 

543 if high is not None and value > high: 

544 raise ValueError("value too large") 

545 return value 

546 

547 @classmethod 

548 def _as_ipv4_address(cls, value): 

549 if isinstance(value, str): 

550 return dns.ipv4.canonicalize(value) 

551 elif isinstance(value, bytes): 

552 return dns.ipv4.inet_ntoa(value) 

553 else: 

554 raise ValueError("not an IPv4 address") 

555 

556 @classmethod 

557 def _as_ipv6_address(cls, value): 

558 if isinstance(value, str): 

559 return dns.ipv6.canonicalize(value) 

560 elif isinstance(value, bytes): 

561 return dns.ipv6.inet_ntoa(value) 

562 else: 

563 raise ValueError("not an IPv6 address") 

564 

565 @classmethod 

566 def _as_bool(cls, value): 

567 if isinstance(value, bool): 

568 return value 

569 else: 

570 raise ValueError("not a boolean") 

571 

572 @classmethod 

573 def _as_ttl(cls, value): 

574 if isinstance(value, int): 

575 return cls._as_int(value, 0, dns.ttl.MAX_TTL) 

576 elif isinstance(value, str): 

577 return dns.ttl.from_text(value) 

578 else: 

579 raise ValueError("not a TTL") 

580 

581 @classmethod 

582 def _as_tuple(cls, value, as_value): 

583 try: 

584 # For user convenience, if value is a singleton of the list 

585 # element type, wrap it in a tuple. 

586 return (as_value(value),) 

587 except Exception: 

588 # Otherwise, check each element of the iterable *value* 

589 # against *as_value*. 

590 return tuple(as_value(v) for v in value) 

591 

592 # Processing order 

593 

594 @classmethod 

595 def _processing_order(cls, iterable): 

596 items = list(iterable) 

597 random.shuffle(items) 

598 return items 

599 

600 

601@dns.immutable.immutable 

602class GenericRdata(Rdata): 

603 

604 """Generic Rdata Class 

605 

606 This class is used for rdata types for which we have no better 

607 implementation. It implements the DNS "unknown RRs" scheme. 

608 """ 

609 

610 __slots__ = ["data"] 

611 

612 def __init__(self, rdclass, rdtype, data): 

613 super().__init__(rdclass, rdtype) 

614 self.data = data 

615 

616 def to_text( 

617 self, 

618 origin: Optional[dns.name.Name] = None, 

619 relativize: bool = True, 

620 **kw: Dict[str, Any], 

621 ) -> str: 

622 return r"\# %d " % len(self.data) + _hexify(self.data, **kw) 

623 

624 @classmethod 

625 def from_text( 

626 cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None 

627 ): 

628 token = tok.get() 

629 if not token.is_identifier() or token.value != r"\#": 

630 raise dns.exception.SyntaxError(r"generic rdata does not start with \#") 

631 length = tok.get_int() 

632 hex = tok.concatenate_remaining_identifiers(True).encode() 

633 data = binascii.unhexlify(hex) 

634 if len(data) != length: 

635 raise dns.exception.SyntaxError("generic rdata hex data has wrong length") 

636 return cls(rdclass, rdtype, data) 

637 

638 def _to_wire(self, file, compress=None, origin=None, canonicalize=False): 

639 file.write(self.data) 

640 

641 @classmethod 

642 def from_wire_parser(cls, rdclass, rdtype, parser, origin=None): 

643 return cls(rdclass, rdtype, parser.get_remaining()) 

644 

645 

646_rdata_classes: Dict[ 

647 Tuple[dns.rdataclass.RdataClass, dns.rdatatype.RdataType], Any 

648] = {} 

649_module_prefix = "dns.rdtypes" 

650 

651 

652def get_rdata_class(rdclass, rdtype): 

653 cls = _rdata_classes.get((rdclass, rdtype)) 

654 if not cls: 

655 cls = _rdata_classes.get((dns.rdatatype.ANY, rdtype)) 

656 if not cls: 

657 rdclass_text = dns.rdataclass.to_text(rdclass) 

658 rdtype_text = dns.rdatatype.to_text(rdtype) 

659 rdtype_text = rdtype_text.replace("-", "_") 

660 try: 

661 mod = import_module( 

662 ".".join([_module_prefix, rdclass_text, rdtype_text]) 

663 ) 

664 cls = getattr(mod, rdtype_text) 

665 _rdata_classes[(rdclass, rdtype)] = cls 

666 except ImportError: 

667 try: 

668 mod = import_module(".".join([_module_prefix, "ANY", rdtype_text])) 

669 cls = getattr(mod, rdtype_text) 

670 _rdata_classes[(dns.rdataclass.ANY, rdtype)] = cls 

671 _rdata_classes[(rdclass, rdtype)] = cls 

672 except ImportError: 

673 pass 

674 if not cls: 

675 cls = GenericRdata 

676 _rdata_classes[(rdclass, rdtype)] = cls 

677 return cls 

678 

679 

680def from_text( 

681 rdclass: Union[dns.rdataclass.RdataClass, str], 

682 rdtype: Union[dns.rdatatype.RdataType, str], 

683 tok: Union[dns.tokenizer.Tokenizer, str], 

684 origin: Optional[dns.name.Name] = None, 

685 relativize: bool = True, 

686 relativize_to: Optional[dns.name.Name] = None, 

687 idna_codec: Optional[dns.name.IDNACodec] = None, 

688) -> Rdata: 

689 """Build an rdata object from text format. 

690 

691 This function attempts to dynamically load a class which 

692 implements the specified rdata class and type. If there is no 

693 class-and-type-specific implementation, the GenericRdata class 

694 is used. 

695 

696 Once a class is chosen, its from_text() class method is called 

697 with the parameters to this function. 

698 

699 If *tok* is a ``str``, then a tokenizer is created and the string 

700 is used as its input. 

701 

702 *rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass. 

703 

704 *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype. 

705 

706 *tok*, a ``dns.tokenizer.Tokenizer`` or a ``str``. 

707 

708 *origin*, a ``dns.name.Name`` (or ``None``), the 

709 origin to use for relative names. 

710 

711 *relativize*, a ``bool``. If true, name will be relativized. 

712 

713 *relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use 

714 when relativizing names. If not set, the *origin* value will be used. 

715 

716 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA 

717 encoder/decoder to use if a tokenizer needs to be created. If 

718 ``None``, the default IDNA 2003 encoder/decoder is used. If a 

719 tokenizer is not created, then the codec associated with the tokenizer 

720 is the one that is used. 

721 

722 Returns an instance of the chosen Rdata subclass. 

723 

724 """ 

725 if isinstance(tok, str): 

726 tok = dns.tokenizer.Tokenizer(tok, idna_codec=idna_codec) 

727 rdclass = dns.rdataclass.RdataClass.make(rdclass) 

728 rdtype = dns.rdatatype.RdataType.make(rdtype) 

729 cls = get_rdata_class(rdclass, rdtype) 

730 with dns.exception.ExceptionWrapper(dns.exception.SyntaxError): 

731 rdata = None 

732 if cls != GenericRdata: 

733 # peek at first token 

734 token = tok.get() 

735 tok.unget(token) 

736 if token.is_identifier() and token.value == r"\#": 

737 # 

738 # Known type using the generic syntax. Extract the 

739 # wire form from the generic syntax, and then run 

740 # from_wire on it. 

741 # 

742 grdata = GenericRdata.from_text( 

743 rdclass, rdtype, tok, origin, relativize, relativize_to 

744 ) 

745 rdata = from_wire( 

746 rdclass, rdtype, grdata.data, 0, len(grdata.data), origin 

747 ) 

748 # 

749 # If this comparison isn't equal, then there must have been 

750 # compressed names in the wire format, which is an error, 

751 # there being no reasonable context to decompress with. 

752 # 

753 rwire = rdata.to_wire() 

754 if rwire != grdata.data: 

755 raise dns.exception.SyntaxError( 

756 "compressed data in " 

757 "generic syntax form " 

758 "of known rdatatype" 

759 ) 

760 if rdata is None: 

761 rdata = cls.from_text( 

762 rdclass, rdtype, tok, origin, relativize, relativize_to 

763 ) 

764 token = tok.get_eol_as_token() 

765 if token.comment is not None: 

766 object.__setattr__(rdata, "rdcomment", token.comment) 

767 return rdata 

768 

769 

770def from_wire_parser( 

771 rdclass: Union[dns.rdataclass.RdataClass, str], 

772 rdtype: Union[dns.rdatatype.RdataType, str], 

773 parser: dns.wire.Parser, 

774 origin: Optional[dns.name.Name] = None, 

775) -> Rdata: 

776 """Build an rdata object from wire format 

777 

778 This function attempts to dynamically load a class which 

779 implements the specified rdata class and type. If there is no 

780 class-and-type-specific implementation, the GenericRdata class 

781 is used. 

782 

783 Once a class is chosen, its from_wire() class method is called 

784 with the parameters to this function. 

785 

786 *rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass. 

787 

788 *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype. 

789 

790 *parser*, a ``dns.wire.Parser``, the parser, which should be 

791 restricted to the rdata length. 

792 

793 *origin*, a ``dns.name.Name`` (or ``None``). If not ``None``, 

794 then names will be relativized to this origin. 

795 

796 Returns an instance of the chosen Rdata subclass. 

797 """ 

798 

799 rdclass = dns.rdataclass.RdataClass.make(rdclass) 

800 rdtype = dns.rdatatype.RdataType.make(rdtype) 

801 cls = get_rdata_class(rdclass, rdtype) 

802 with dns.exception.ExceptionWrapper(dns.exception.FormError): 

803 return cls.from_wire_parser(rdclass, rdtype, parser, origin) 

804 

805 

806def from_wire( 

807 rdclass: Union[dns.rdataclass.RdataClass, str], 

808 rdtype: Union[dns.rdatatype.RdataType, str], 

809 wire: bytes, 

810 current: int, 

811 rdlen: int, 

812 origin: Optional[dns.name.Name] = None, 

813) -> Rdata: 

814 """Build an rdata object from wire format 

815 

816 This function attempts to dynamically load a class which 

817 implements the specified rdata class and type. If there is no 

818 class-and-type-specific implementation, the GenericRdata class 

819 is used. 

820 

821 Once a class is chosen, its from_wire() class method is called 

822 with the parameters to this function. 

823 

824 *rdclass*, an ``int``, the rdataclass. 

825 

826 *rdtype*, an ``int``, the rdatatype. 

827 

828 *wire*, a ``bytes``, the wire-format message. 

829 

830 *current*, an ``int``, the offset in wire of the beginning of 

831 the rdata. 

832 

833 *rdlen*, an ``int``, the length of the wire-format rdata 

834 

835 *origin*, a ``dns.name.Name`` (or ``None``). If not ``None``, 

836 then names will be relativized to this origin. 

837 

838 Returns an instance of the chosen Rdata subclass. 

839 """ 

840 parser = dns.wire.Parser(wire, current) 

841 with parser.restrict_to(rdlen): 

842 return from_wire_parser(rdclass, rdtype, parser, origin) 

843 

844 

845class RdatatypeExists(dns.exception.DNSException): 

846 """DNS rdatatype already exists.""" 

847 

848 supp_kwargs = {"rdclass", "rdtype"} 

849 fmt = ( 

850 "The rdata type with class {rdclass:d} and rdtype {rdtype:d} " 

851 + "already exists." 

852 ) 

853 

854 

855def register_type( 

856 implementation: Any, 

857 rdtype: int, 

858 rdtype_text: str, 

859 is_singleton: bool = False, 

860 rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, 

861) -> None: 

862 """Dynamically register a module to handle an rdatatype. 

863 

864 *implementation*, a module implementing the type in the usual dnspython 

865 way. 

866 

867 *rdtype*, an ``int``, the rdatatype to register. 

868 

869 *rdtype_text*, a ``str``, the textual form of the rdatatype. 

870 

871 *is_singleton*, a ``bool``, indicating if the type is a singleton (i.e. 

872 RRsets of the type can have only one member.) 

873 

874 *rdclass*, the rdataclass of the type, or ``dns.rdataclass.ANY`` if 

875 it applies to all classes. 

876 """ 

877 

878 rdtype = dns.rdatatype.RdataType.make(rdtype) 

879 existing_cls = get_rdata_class(rdclass, rdtype) 

880 if existing_cls != GenericRdata or dns.rdatatype.is_metatype(rdtype): 

881 raise RdatatypeExists(rdclass=rdclass, rdtype=rdtype) 

882 _rdata_classes[(rdclass, rdtype)] = getattr( 

883 implementation, rdtype_text.replace("-", "_") 

884 ) 

885 dns.rdatatype.register_type(rdtype, rdtype_text, is_singleton)