Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/rdataset.py: 39%

193 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-02 06:07 +0000

1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 

2 

3# Copyright (C) 2001-2017 Nominum, Inc. 

4# 

5# Permission to use, copy, modify, and distribute this software and its 

6# documentation for any purpose with or without fee is hereby granted, 

7# provided that the above copyright notice and this permission notice 

8# appear in all copies. 

9# 

10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 

11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 

12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 

13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 

14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 

15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 

16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 

17 

18"""DNS rdatasets (an rdataset is a set of rdatas of a given type and class)""" 

19 

20import io 

21import random 

22import struct 

23from typing import Any, Collection, Dict, List, Optional, Union, cast 

24 

25import dns.exception 

26import dns.immutable 

27import dns.name 

28import dns.rdata 

29import dns.rdataclass 

30import dns.rdatatype 

31import dns.renderer 

32import dns.set 

33import dns.ttl 

34 

35# define SimpleSet here for backwards compatibility 

36SimpleSet = dns.set.Set 

37 

38 

39class DifferingCovers(dns.exception.DNSException): 

40 """An attempt was made to add a DNS SIG/RRSIG whose covered type 

41 is not the same as that of the other rdatas in the rdataset.""" 

42 

43 

44class IncompatibleTypes(dns.exception.DNSException): 

45 """An attempt was made to add DNS RR data of an incompatible type.""" 

46 

47 

48class Rdataset(dns.set.Set): 

49 

50 """A DNS rdataset.""" 

51 

52 __slots__ = ["rdclass", "rdtype", "covers", "ttl"] 

53 

54 def __init__( 

55 self, 

56 rdclass: dns.rdataclass.RdataClass, 

57 rdtype: dns.rdatatype.RdataType, 

58 covers: dns.rdatatype.RdataType = dns.rdatatype.NONE, 

59 ttl: int = 0, 

60 ): 

61 """Create a new rdataset of the specified class and type. 

62 

63 *rdclass*, a ``dns.rdataclass.RdataClass``, the rdataclass. 

64 

65 *rdtype*, an ``dns.rdatatype.RdataType``, the rdatatype. 

66 

67 *covers*, an ``dns.rdatatype.RdataType``, the covered rdatatype. 

68 

69 *ttl*, an ``int``, the TTL. 

70 """ 

71 

72 super().__init__() 

73 self.rdclass = rdclass 

74 self.rdtype: dns.rdatatype.RdataType = rdtype 

75 self.covers: dns.rdatatype.RdataType = covers 

76 self.ttl = ttl 

77 

78 def _clone(self): 

79 obj = super()._clone() 

80 obj.rdclass = self.rdclass 

81 obj.rdtype = self.rdtype 

82 obj.covers = self.covers 

83 obj.ttl = self.ttl 

84 return obj 

85 

86 def update_ttl(self, ttl: int) -> None: 

87 """Perform TTL minimization. 

88 

89 Set the TTL of the rdataset to be the lesser of the set's current 

90 TTL or the specified TTL. If the set contains no rdatas, set the TTL 

91 to the specified TTL. 

92 

93 *ttl*, an ``int`` or ``str``. 

94 """ 

95 ttl = dns.ttl.make(ttl) 

96 if len(self) == 0: 

97 self.ttl = ttl 

98 elif ttl < self.ttl: 

99 self.ttl = ttl 

100 

101 def add( # pylint: disable=arguments-differ,arguments-renamed 

102 self, rd: dns.rdata.Rdata, ttl: Optional[int] = None 

103 ) -> None: 

104 """Add the specified rdata to the rdataset. 

105 

106 If the optional *ttl* parameter is supplied, then 

107 ``self.update_ttl(ttl)`` will be called prior to adding the rdata. 

108 

109 *rd*, a ``dns.rdata.Rdata``, the rdata 

110 

111 *ttl*, an ``int``, the TTL. 

112 

113 Raises ``dns.rdataset.IncompatibleTypes`` if the type and class 

114 do not match the type and class of the rdataset. 

115 

116 Raises ``dns.rdataset.DifferingCovers`` if the type is a signature 

117 type and the covered type does not match that of the rdataset. 

118 """ 

119 

120 # 

121 # If we're adding a signature, do some special handling to 

122 # check that the signature covers the same type as the 

123 # other rdatas in this rdataset. If this is the first rdata 

124 # in the set, initialize the covers field. 

125 # 

126 if self.rdclass != rd.rdclass or self.rdtype != rd.rdtype: 

127 raise IncompatibleTypes 

128 if ttl is not None: 

129 self.update_ttl(ttl) 

130 if self.rdtype == dns.rdatatype.RRSIG or self.rdtype == dns.rdatatype.SIG: 

131 covers = rd.covers() 

132 if len(self) == 0 and self.covers == dns.rdatatype.NONE: 

133 self.covers = covers 

134 elif self.covers != covers: 

135 raise DifferingCovers 

136 if dns.rdatatype.is_singleton(rd.rdtype) and len(self) > 0: 

137 self.clear() 

138 super().add(rd) 

139 

140 def union_update(self, other): 

141 self.update_ttl(other.ttl) 

142 super().union_update(other) 

143 

144 def intersection_update(self, other): 

145 self.update_ttl(other.ttl) 

146 super().intersection_update(other) 

147 

148 def update(self, other): 

149 """Add all rdatas in other to self. 

150 

151 *other*, a ``dns.rdataset.Rdataset``, the rdataset from which 

152 to update. 

153 """ 

154 

155 self.update_ttl(other.ttl) 

156 super().update(other) 

157 

158 def _rdata_repr(self): 

159 def maybe_truncate(s): 

160 if len(s) > 100: 

161 return s[:100] + "..." 

162 return s 

163 

164 return "[%s]" % ", ".join("<%s>" % maybe_truncate(str(rr)) for rr in self) 

165 

166 def __repr__(self): 

167 if self.covers == 0: 

168 ctext = "" 

169 else: 

170 ctext = "(" + dns.rdatatype.to_text(self.covers) + ")" 

171 return ( 

172 "<DNS " 

173 + dns.rdataclass.to_text(self.rdclass) 

174 + " " 

175 + dns.rdatatype.to_text(self.rdtype) 

176 + ctext 

177 + " rdataset: " 

178 + self._rdata_repr() 

179 + ">" 

180 ) 

181 

182 def __str__(self): 

183 return self.to_text() 

184 

185 def __eq__(self, other): 

186 if not isinstance(other, Rdataset): 

187 return False 

188 if ( 

189 self.rdclass != other.rdclass 

190 or self.rdtype != other.rdtype 

191 or self.covers != other.covers 

192 ): 

193 return False 

194 return super().__eq__(other) 

195 

196 def __ne__(self, other): 

197 return not self.__eq__(other) 

198 

199 def to_text( 

200 self, 

201 name: Optional[dns.name.Name] = None, 

202 origin: Optional[dns.name.Name] = None, 

203 relativize: bool = True, 

204 override_rdclass: Optional[dns.rdataclass.RdataClass] = None, 

205 want_comments: bool = False, 

206 **kw: Dict[str, Any], 

207 ) -> str: 

208 """Convert the rdataset into DNS zone file format. 

209 

210 See ``dns.name.Name.choose_relativity`` for more information 

211 on how *origin* and *relativize* determine the way names 

212 are emitted. 

213 

214 Any additional keyword arguments are passed on to the rdata 

215 ``to_text()`` method. 

216 

217 *name*, a ``dns.name.Name``. If name is not ``None``, emit RRs with 

218 *name* as the owner name. 

219 

220 *origin*, a ``dns.name.Name`` or ``None``, the origin for relative 

221 names. 

222 

223 *relativize*, a ``bool``. If ``True``, names will be relativized 

224 to *origin*. 

225 

226 *override_rdclass*, a ``dns.rdataclass.RdataClass`` or ``None``. 

227 If not ``None``, use this class instead of the Rdataset's class. 

228 

229 *want_comments*, a ``bool``. If ``True``, emit comments for rdata 

230 which have them. The default is ``False``. 

231 """ 

232 

233 if name is not None: 

234 name = name.choose_relativity(origin, relativize) 

235 ntext = str(name) 

236 pad = " " 

237 else: 

238 ntext = "" 

239 pad = "" 

240 s = io.StringIO() 

241 if override_rdclass is not None: 

242 rdclass = override_rdclass 

243 else: 

244 rdclass = self.rdclass 

245 if len(self) == 0: 

246 # 

247 # Empty rdatasets are used for the question section, and in 

248 # some dynamic updates, so we don't need to print out the TTL 

249 # (which is meaningless anyway). 

250 # 

251 s.write( 

252 "{}{}{} {}\n".format( 

253 ntext, 

254 pad, 

255 dns.rdataclass.to_text(rdclass), 

256 dns.rdatatype.to_text(self.rdtype), 

257 ) 

258 ) 

259 else: 

260 for rd in self: 

261 extra = "" 

262 if want_comments: 

263 if rd.rdcomment: 

264 extra = f" ;{rd.rdcomment}" 

265 s.write( 

266 "%s%s%d %s %s %s%s\n" 

267 % ( 

268 ntext, 

269 pad, 

270 self.ttl, 

271 dns.rdataclass.to_text(rdclass), 

272 dns.rdatatype.to_text(self.rdtype), 

273 rd.to_text(origin=origin, relativize=relativize, **kw), 

274 extra, 

275 ) 

276 ) 

277 # 

278 # We strip off the final \n for the caller's convenience in printing 

279 # 

280 return s.getvalue()[:-1] 

281 

282 def to_wire( 

283 self, 

284 name: dns.name.Name, 

285 file: Any, 

286 compress: Optional[dns.name.CompressType] = None, 

287 origin: Optional[dns.name.Name] = None, 

288 override_rdclass: Optional[dns.rdataclass.RdataClass] = None, 

289 want_shuffle: bool = True, 

290 ) -> int: 

291 """Convert the rdataset to wire format. 

292 

293 *name*, a ``dns.name.Name`` is the owner name to use. 

294 

295 *file* is the file where the name is emitted (typically a 

296 BytesIO file). 

297 

298 *compress*, a ``dict``, is the compression table to use. If 

299 ``None`` (the default), names will not be compressed. 

300 

301 *origin* is a ``dns.name.Name`` or ``None``. If the name is 

302 relative and origin is not ``None``, then *origin* will be appended 

303 to it. 

304 

305 *override_rdclass*, an ``int``, is used as the class instead of the 

306 class of the rdataset. This is useful when rendering rdatasets 

307 associated with dynamic updates. 

308 

309 *want_shuffle*, a ``bool``. If ``True``, then the order of the 

310 Rdatas within the Rdataset will be shuffled before rendering. 

311 

312 Returns an ``int``, the number of records emitted. 

313 """ 

314 

315 if override_rdclass is not None: 

316 rdclass = override_rdclass 

317 want_shuffle = False 

318 else: 

319 rdclass = self.rdclass 

320 if len(self) == 0: 

321 name.to_wire(file, compress, origin) 

322 file.write(struct.pack("!HHIH", self.rdtype, rdclass, 0, 0)) 

323 return 1 

324 else: 

325 l: Union[Rdataset, List[dns.rdata.Rdata]] 

326 if want_shuffle: 

327 l = list(self) 

328 random.shuffle(l) 

329 else: 

330 l = self 

331 for rd in l: 

332 name.to_wire(file, compress, origin) 

333 file.write(struct.pack("!HHI", self.rdtype, rdclass, self.ttl)) 

334 with dns.renderer.prefixed_length(file, 2): 

335 rd.to_wire(file, compress, origin) 

336 return len(self) 

337 

338 def match( 

339 self, 

340 rdclass: dns.rdataclass.RdataClass, 

341 rdtype: dns.rdatatype.RdataType, 

342 covers: dns.rdatatype.RdataType, 

343 ) -> bool: 

344 """Returns ``True`` if this rdataset matches the specified class, 

345 type, and covers. 

346 """ 

347 if self.rdclass == rdclass and self.rdtype == rdtype and self.covers == covers: 

348 return True 

349 return False 

350 

351 def processing_order(self) -> List[dns.rdata.Rdata]: 

352 """Return rdatas in a valid processing order according to the type's 

353 specification. For example, MX records are in preference order from 

354 lowest to highest preferences, with items of the same preference 

355 shuffled. 

356 

357 For types that do not define a processing order, the rdatas are 

358 simply shuffled. 

359 """ 

360 if len(self) == 0: 

361 return [] 

362 else: 

363 return self[0]._processing_order(iter(self)) 

364 

365 

366@dns.immutable.immutable 

367class ImmutableRdataset(Rdataset): # lgtm[py/missing-equals] 

368 

369 """An immutable DNS rdataset.""" 

370 

371 _clone_class = Rdataset 

372 

373 def __init__(self, rdataset: Rdataset): 

374 """Create an immutable rdataset from the specified rdataset.""" 

375 

376 super().__init__( 

377 rdataset.rdclass, rdataset.rdtype, rdataset.covers, rdataset.ttl 

378 ) 

379 self.items = dns.immutable.Dict(rdataset.items) 

380 

381 def update_ttl(self, ttl): 

382 raise TypeError("immutable") 

383 

384 def add(self, rd, ttl=None): 

385 raise TypeError("immutable") 

386 

387 def union_update(self, other): 

388 raise TypeError("immutable") 

389 

390 def intersection_update(self, other): 

391 raise TypeError("immutable") 

392 

393 def update(self, other): 

394 raise TypeError("immutable") 

395 

396 def __delitem__(self, i): 

397 raise TypeError("immutable") 

398 

399 # lgtm complains about these not raising ArithmeticError, but there is 

400 # precedent for overrides of these methods in other classes to raise 

401 # TypeError, and it seems like the better exception. 

402 

403 def __ior__(self, other): # lgtm[py/unexpected-raise-in-special-method] 

404 raise TypeError("immutable") 

405 

406 def __iand__(self, other): # lgtm[py/unexpected-raise-in-special-method] 

407 raise TypeError("immutable") 

408 

409 def __iadd__(self, other): # lgtm[py/unexpected-raise-in-special-method] 

410 raise TypeError("immutable") 

411 

412 def __isub__(self, other): # lgtm[py/unexpected-raise-in-special-method] 

413 raise TypeError("immutable") 

414 

415 def clear(self): 

416 raise TypeError("immutable") 

417 

418 def __copy__(self): 

419 return ImmutableRdataset(super().copy()) 

420 

421 def copy(self): 

422 return ImmutableRdataset(super().copy()) 

423 

424 def union(self, other): 

425 return ImmutableRdataset(super().union(other)) 

426 

427 def intersection(self, other): 

428 return ImmutableRdataset(super().intersection(other)) 

429 

430 def difference(self, other): 

431 return ImmutableRdataset(super().difference(other)) 

432 

433 def symmetric_difference(self, other): 

434 return ImmutableRdataset(super().symmetric_difference(other)) 

435 

436 

437def from_text_list( 

438 rdclass: Union[dns.rdataclass.RdataClass, str], 

439 rdtype: Union[dns.rdatatype.RdataType, str], 

440 ttl: int, 

441 text_rdatas: Collection[str], 

442 idna_codec: Optional[dns.name.IDNACodec] = None, 

443 origin: Optional[dns.name.Name] = None, 

444 relativize: bool = True, 

445 relativize_to: Optional[dns.name.Name] = None, 

446) -> Rdataset: 

447 """Create an rdataset with the specified class, type, and TTL, and with 

448 the specified list of rdatas in text format. 

449 

450 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA 

451 encoder/decoder to use; if ``None``, the default IDNA 2003 

452 encoder/decoder is used. 

453 

454 *origin*, a ``dns.name.Name`` (or ``None``), the 

455 origin to use for relative names. 

456 

457 *relativize*, a ``bool``. If true, name will be relativized. 

458 

459 *relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use 

460 when relativizing names. If not set, the *origin* value will be used. 

461 

462 Returns a ``dns.rdataset.Rdataset`` object. 

463 """ 

464 

465 rdclass = dns.rdataclass.RdataClass.make(rdclass) 

466 rdtype = dns.rdatatype.RdataType.make(rdtype) 

467 r = Rdataset(rdclass, rdtype) 

468 r.update_ttl(ttl) 

469 for t in text_rdatas: 

470 rd = dns.rdata.from_text( 

471 r.rdclass, r.rdtype, t, origin, relativize, relativize_to, idna_codec 

472 ) 

473 r.add(rd) 

474 return r 

475 

476 

477def from_text( 

478 rdclass: Union[dns.rdataclass.RdataClass, str], 

479 rdtype: Union[dns.rdatatype.RdataType, str], 

480 ttl: int, 

481 *text_rdatas: Any, 

482) -> Rdataset: 

483 """Create an rdataset with the specified class, type, and TTL, and with 

484 the specified rdatas in text format. 

485 

486 Returns a ``dns.rdataset.Rdataset`` object. 

487 """ 

488 

489 return from_text_list(rdclass, rdtype, ttl, cast(Collection[str], text_rdatas)) 

490 

491 

492def from_rdata_list(ttl: int, rdatas: Collection[dns.rdata.Rdata]) -> Rdataset: 

493 """Create an rdataset with the specified TTL, and with 

494 the specified list of rdata objects. 

495 

496 Returns a ``dns.rdataset.Rdataset`` object. 

497 """ 

498 

499 if len(rdatas) == 0: 

500 raise ValueError("rdata list must not be empty") 

501 r = None 

502 for rd in rdatas: 

503 if r is None: 

504 r = Rdataset(rd.rdclass, rd.rdtype) 

505 r.update_ttl(ttl) 

506 r.add(rd) 

507 assert r is not None 

508 return r 

509 

510 

511def from_rdata(ttl: int, *rdatas: Any) -> Rdataset: 

512 """Create an rdataset with the specified TTL, and with 

513 the specified rdata objects. 

514 

515 Returns a ``dns.rdataset.Rdataset`` object. 

516 """ 

517 

518 return from_rdata_list(ttl, cast(Collection[dns.rdata.Rdata], rdatas))