Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/update.py: 43%

141 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-02 06:07 +0000

1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 

2 

3# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc. 

4# 

5# Permission to use, copy, modify, and distribute this software and its 

6# documentation for any purpose with or without fee is hereby granted, 

7# provided that the above copyright notice and this permission notice 

8# appear in all copies. 

9# 

10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 

11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 

12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 

13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 

14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 

15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 

16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 

17 

18"""DNS Dynamic Update Support""" 

19 

20from typing import Any, List, Optional, Union 

21 

22import dns.message 

23import dns.name 

24import dns.opcode 

25import dns.rdata 

26import dns.rdataclass 

27import dns.rdataset 

28import dns.rdatatype 

29import dns.tsig 

30 

31 

32class UpdateSection(dns.enum.IntEnum): 

33 """Update sections""" 

34 

35 ZONE = 0 

36 PREREQ = 1 

37 UPDATE = 2 

38 ADDITIONAL = 3 

39 

40 @classmethod 

41 def _maximum(cls): 

42 return 3 

43 

44 

45class UpdateMessage(dns.message.Message): # lgtm[py/missing-equals] 

46 # ignore the mypy error here as we mean to use a different enum 

47 _section_enum = UpdateSection # type: ignore 

48 

49 def __init__( 

50 self, 

51 zone: Optional[Union[dns.name.Name, str]] = None, 

52 rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, 

53 keyring: Optional[Any] = None, 

54 keyname: Optional[dns.name.Name] = None, 

55 keyalgorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm, 

56 id: Optional[int] = None, 

57 ): 

58 """Initialize a new DNS Update object. 

59 

60 See the documentation of the Message class for a complete 

61 description of the keyring dictionary. 

62 

63 *zone*, a ``dns.name.Name``, ``str``, or ``None``, the zone 

64 which is being updated. ``None`` should only be used by dnspython's 

65 message constructors, as a zone is required for the convenience 

66 methods like ``add()``, ``replace()``, etc. 

67 

68 *rdclass*, an ``int`` or ``str``, the class of the zone. 

69 

70 The *keyring*, *keyname*, and *keyalgorithm* parameters are passed to 

71 ``use_tsig()``; see its documentation for details. 

72 """ 

73 super().__init__(id=id) 

74 self.flags |= dns.opcode.to_flags(dns.opcode.UPDATE) 

75 if isinstance(zone, str): 

76 zone = dns.name.from_text(zone) 

77 self.origin = zone 

78 rdclass = dns.rdataclass.RdataClass.make(rdclass) 

79 self.zone_rdclass = rdclass 

80 if self.origin: 

81 self.find_rrset( 

82 self.zone, 

83 self.origin, 

84 rdclass, 

85 dns.rdatatype.SOA, 

86 create=True, 

87 force_unique=True, 

88 ) 

89 if keyring is not None: 

90 self.use_tsig(keyring, keyname, algorithm=keyalgorithm) 

91 

92 @property 

93 def zone(self) -> List[dns.rrset.RRset]: 

94 """The zone section.""" 

95 return self.sections[0] 

96 

97 @zone.setter 

98 def zone(self, v): 

99 self.sections[0] = v 

100 

101 @property 

102 def prerequisite(self) -> List[dns.rrset.RRset]: 

103 """The prerequisite section.""" 

104 return self.sections[1] 

105 

106 @prerequisite.setter 

107 def prerequisite(self, v): 

108 self.sections[1] = v 

109 

110 @property 

111 def update(self) -> List[dns.rrset.RRset]: 

112 """The update section.""" 

113 return self.sections[2] 

114 

115 @update.setter 

116 def update(self, v): 

117 self.sections[2] = v 

118 

119 def _add_rr(self, name, ttl, rd, deleting=None, section=None): 

120 """Add a single RR to the update section.""" 

121 

122 if section is None: 

123 section = self.update 

124 covers = rd.covers() 

125 rrset = self.find_rrset( 

126 section, name, self.zone_rdclass, rd.rdtype, covers, deleting, True, True 

127 ) 

128 rrset.add(rd, ttl) 

129 

130 def _add(self, replace, section, name, *args): 

131 """Add records. 

132 

133 *replace* is the replacement mode. If ``False``, 

134 RRs are added to an existing RRset; if ``True``, the RRset 

135 is replaced with the specified contents. The second 

136 argument is the section to add to. The third argument 

137 is always a name. The other arguments can be: 

138 

139 - rdataset... 

140 

141 - ttl, rdata... 

142 

143 - ttl, rdtype, string... 

144 """ 

145 

146 if isinstance(name, str): 

147 name = dns.name.from_text(name, None) 

148 if isinstance(args[0], dns.rdataset.Rdataset): 

149 for rds in args: 

150 if replace: 

151 self.delete(name, rds.rdtype) 

152 for rd in rds: 

153 self._add_rr(name, rds.ttl, rd, section=section) 

154 else: 

155 args = list(args) 

156 ttl = int(args.pop(0)) 

157 if isinstance(args[0], dns.rdata.Rdata): 

158 if replace: 

159 self.delete(name, args[0].rdtype) 

160 for rd in args: 

161 self._add_rr(name, ttl, rd, section=section) 

162 else: 

163 rdtype = dns.rdatatype.RdataType.make(args.pop(0)) 

164 if replace: 

165 self.delete(name, rdtype) 

166 for s in args: 

167 rd = dns.rdata.from_text(self.zone_rdclass, rdtype, s, self.origin) 

168 self._add_rr(name, ttl, rd, section=section) 

169 

170 def add(self, name: Union[dns.name.Name, str], *args: Any) -> None: 

171 """Add records. 

172 

173 The first argument is always a name. The other 

174 arguments can be: 

175 

176 - rdataset... 

177 

178 - ttl, rdata... 

179 

180 - ttl, rdtype, string... 

181 """ 

182 

183 self._add(False, self.update, name, *args) 

184 

185 def delete(self, name: Union[dns.name.Name, str], *args: Any) -> None: 

186 """Delete records. 

187 

188 The first argument is always a name. The other 

189 arguments can be: 

190 

191 - *empty* 

192 

193 - rdataset... 

194 

195 - rdata... 

196 

197 - rdtype, [string...] 

198 """ 

199 

200 if isinstance(name, str): 

201 name = dns.name.from_text(name, None) 

202 if len(args) == 0: 

203 self.find_rrset( 

204 self.update, 

205 name, 

206 dns.rdataclass.ANY, 

207 dns.rdatatype.ANY, 

208 dns.rdatatype.NONE, 

209 dns.rdataclass.ANY, 

210 True, 

211 True, 

212 ) 

213 elif isinstance(args[0], dns.rdataset.Rdataset): 

214 for rds in args: 

215 for rd in rds: 

216 self._add_rr(name, 0, rd, dns.rdataclass.NONE) 

217 else: 

218 largs = list(args) 

219 if isinstance(largs[0], dns.rdata.Rdata): 

220 for rd in largs: 

221 self._add_rr(name, 0, rd, dns.rdataclass.NONE) 

222 else: 

223 rdtype = dns.rdatatype.RdataType.make(largs.pop(0)) 

224 if len(largs) == 0: 

225 self.find_rrset( 

226 self.update, 

227 name, 

228 self.zone_rdclass, 

229 rdtype, 

230 dns.rdatatype.NONE, 

231 dns.rdataclass.ANY, 

232 True, 

233 True, 

234 ) 

235 else: 

236 for s in largs: 

237 rd = dns.rdata.from_text( 

238 self.zone_rdclass, 

239 rdtype, 

240 s, # type: ignore[arg-type] 

241 self.origin, 

242 ) 

243 self._add_rr(name, 0, rd, dns.rdataclass.NONE) 

244 

245 def replace(self, name: Union[dns.name.Name, str], *args: Any) -> None: 

246 """Replace records. 

247 

248 The first argument is always a name. The other 

249 arguments can be: 

250 

251 - rdataset... 

252 

253 - ttl, rdata... 

254 

255 - ttl, rdtype, string... 

256 

257 Note that if you want to replace the entire node, you should do 

258 a delete of the name followed by one or more calls to add. 

259 """ 

260 

261 self._add(True, self.update, name, *args) 

262 

263 def present(self, name: Union[dns.name.Name, str], *args: Any) -> None: 

264 """Require that an owner name (and optionally an rdata type, 

265 or specific rdataset) exists as a prerequisite to the 

266 execution of the update. 

267 

268 The first argument is always a name. 

269 The other arguments can be: 

270 

271 - rdataset... 

272 

273 - rdata... 

274 

275 - rdtype, string... 

276 """ 

277 

278 if isinstance(name, str): 

279 name = dns.name.from_text(name, None) 

280 if len(args) == 0: 

281 self.find_rrset( 

282 self.prerequisite, 

283 name, 

284 dns.rdataclass.ANY, 

285 dns.rdatatype.ANY, 

286 dns.rdatatype.NONE, 

287 None, 

288 True, 

289 True, 

290 ) 

291 elif ( 

292 isinstance(args[0], dns.rdataset.Rdataset) 

293 or isinstance(args[0], dns.rdata.Rdata) 

294 or len(args) > 1 

295 ): 

296 if not isinstance(args[0], dns.rdataset.Rdataset): 

297 # Add a 0 TTL 

298 largs = list(args) 

299 largs.insert(0, 0) # type: ignore[arg-type] 

300 self._add(False, self.prerequisite, name, *largs) 

301 else: 

302 self._add(False, self.prerequisite, name, *args) 

303 else: 

304 rdtype = dns.rdatatype.RdataType.make(args[0]) 

305 self.find_rrset( 

306 self.prerequisite, 

307 name, 

308 dns.rdataclass.ANY, 

309 rdtype, 

310 dns.rdatatype.NONE, 

311 None, 

312 True, 

313 True, 

314 ) 

315 

316 def absent( 

317 self, 

318 name: Union[dns.name.Name, str], 

319 rdtype: Optional[Union[dns.rdatatype.RdataType, str]] = None, 

320 ) -> None: 

321 """Require that an owner name (and optionally an rdata type) does 

322 not exist as a prerequisite to the execution of the update.""" 

323 

324 if isinstance(name, str): 

325 name = dns.name.from_text(name, None) 

326 if rdtype is None: 

327 self.find_rrset( 

328 self.prerequisite, 

329 name, 

330 dns.rdataclass.NONE, 

331 dns.rdatatype.ANY, 

332 dns.rdatatype.NONE, 

333 None, 

334 True, 

335 True, 

336 ) 

337 else: 

338 rdtype = dns.rdatatype.RdataType.make(rdtype) 

339 self.find_rrset( 

340 self.prerequisite, 

341 name, 

342 dns.rdataclass.NONE, 

343 rdtype, 

344 dns.rdatatype.NONE, 

345 None, 

346 True, 

347 True, 

348 ) 

349 

350 def _get_one_rr_per_rrset(self, value): 

351 # Updates are always one_rr_per_rrset 

352 return True 

353 

354 def _parse_rr_header(self, section, name, rdclass, rdtype): 

355 deleting = None 

356 empty = False 

357 if section == UpdateSection.ZONE: 

358 if ( 

359 dns.rdataclass.is_metaclass(rdclass) 

360 or rdtype != dns.rdatatype.SOA 

361 or self.zone 

362 ): 

363 raise dns.exception.FormError 

364 else: 

365 if not self.zone: 

366 raise dns.exception.FormError 

367 if rdclass in (dns.rdataclass.ANY, dns.rdataclass.NONE): 

368 deleting = rdclass 

369 rdclass = self.zone[0].rdclass 

370 empty = ( 

371 deleting == dns.rdataclass.ANY or section == UpdateSection.PREREQ 

372 ) 

373 return (rdclass, rdtype, deleting, empty) 

374 

375 

376# backwards compatibility 

377Update = UpdateMessage 

378 

379### BEGIN generated UpdateSection constants 

380 

381ZONE = UpdateSection.ZONE 

382PREREQ = UpdateSection.PREREQ 

383UPDATE = UpdateSection.UPDATE 

384ADDITIONAL = UpdateSection.ADDITIONAL 

385 

386### END generated UpdateSection constants