Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/tsig.py: 31%

177 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-02 06:07 +0000

1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 

2 

3# Copyright (C) 2001-2007, 2009-2011 Nominum, Inc. 

4# 

5# Permission to use, copy, modify, and distribute this software and its 

6# documentation for any purpose with or without fee is hereby granted, 

7# provided that the above copyright notice and this permission notice 

8# appear in all copies. 

9# 

10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 

11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 

12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 

13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 

14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 

15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 

16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 

17 

18"""DNS TSIG support.""" 

19 

20import base64 

21import hashlib 

22import hmac 

23import struct 

24 

25import dns.exception 

26import dns.name 

27import dns.rcode 

28import dns.rdataclass 

29 

30 

31class BadTime(dns.exception.DNSException): 

32 

33 """The current time is not within the TSIG's validity time.""" 

34 

35 

36class BadSignature(dns.exception.DNSException): 

37 

38 """The TSIG signature fails to verify.""" 

39 

40 

41class BadKey(dns.exception.DNSException): 

42 

43 """The TSIG record owner name does not match the key.""" 

44 

45 

46class BadAlgorithm(dns.exception.DNSException): 

47 

48 """The TSIG algorithm does not match the key.""" 

49 

50 

51class PeerError(dns.exception.DNSException): 

52 

53 """Base class for all TSIG errors generated by the remote peer""" 

54 

55 

56class PeerBadKey(PeerError): 

57 

58 """The peer didn't know the key we used""" 

59 

60 

61class PeerBadSignature(PeerError): 

62 

63 """The peer didn't like the signature we sent""" 

64 

65 

66class PeerBadTime(PeerError): 

67 

68 """The peer didn't like the time we sent""" 

69 

70 

71class PeerBadTruncation(PeerError): 

72 

73 """The peer didn't like amount of truncation in the TSIG we sent""" 

74 

75 

76# TSIG Algorithms 

77 

78HMAC_MD5 = dns.name.from_text("HMAC-MD5.SIG-ALG.REG.INT") 

79HMAC_SHA1 = dns.name.from_text("hmac-sha1") 

80HMAC_SHA224 = dns.name.from_text("hmac-sha224") 

81HMAC_SHA256 = dns.name.from_text("hmac-sha256") 

82HMAC_SHA256_128 = dns.name.from_text("hmac-sha256-128") 

83HMAC_SHA384 = dns.name.from_text("hmac-sha384") 

84HMAC_SHA384_192 = dns.name.from_text("hmac-sha384-192") 

85HMAC_SHA512 = dns.name.from_text("hmac-sha512") 

86HMAC_SHA512_256 = dns.name.from_text("hmac-sha512-256") 

87GSS_TSIG = dns.name.from_text("gss-tsig") 

88 

89default_algorithm = HMAC_SHA256 

90 

91mac_sizes = { 

92 HMAC_SHA1: 20, 

93 HMAC_SHA224: 28, 

94 HMAC_SHA256: 32, 

95 HMAC_SHA256_128: 16, 

96 HMAC_SHA384: 48, 

97 HMAC_SHA384_192: 24, 

98 HMAC_SHA512: 64, 

99 HMAC_SHA512_256: 32, 

100 HMAC_MD5: 16, 

101 GSS_TSIG: 128, # This is what we assume to be the worst case! 

102} 

103 

104 

105class GSSTSig: 

106 """ 

107 GSS-TSIG TSIG implementation. This uses the GSS-API context established 

108 in the TKEY message handshake to sign messages using GSS-API message 

109 integrity codes, per the RFC. 

110 

111 In order to avoid a direct GSSAPI dependency, the keyring holds a ref 

112 to the GSSAPI object required, rather than the key itself. 

113 """ 

114 

115 def __init__(self, gssapi_context): 

116 self.gssapi_context = gssapi_context 

117 self.data = b"" 

118 self.name = "gss-tsig" 

119 

120 def update(self, data): 

121 self.data += data 

122 

123 def sign(self): 

124 # defer to the GSSAPI function to sign 

125 return self.gssapi_context.get_signature(self.data) 

126 

127 def verify(self, expected): 

128 try: 

129 # defer to the GSSAPI function to verify 

130 return self.gssapi_context.verify_signature(self.data, expected) 

131 except Exception: 

132 # note the usage of a bare exception 

133 raise BadSignature 

134 

135 

136class GSSTSigAdapter: 

137 def __init__(self, keyring): 

138 self.keyring = keyring 

139 

140 def __call__(self, message, keyname): 

141 if keyname in self.keyring: 

142 key = self.keyring[keyname] 

143 if isinstance(key, Key) and key.algorithm == GSS_TSIG: 

144 if message: 

145 GSSTSigAdapter.parse_tkey_and_step(key, message, keyname) 

146 return key 

147 else: 

148 return None 

149 

150 @classmethod 

151 def parse_tkey_and_step(cls, key, message, keyname): 

152 # if the message is a TKEY type, absorb the key material 

153 # into the context using step(); this is used to allow the 

154 # client to complete the GSSAPI negotiation before attempting 

155 # to verify the signed response to a TKEY message exchange 

156 try: 

157 rrset = message.find_rrset( 

158 message.answer, keyname, dns.rdataclass.ANY, dns.rdatatype.TKEY 

159 ) 

160 if rrset: 

161 token = rrset[0].key 

162 gssapi_context = key.secret 

163 return gssapi_context.step(token) 

164 except KeyError: 

165 pass 

166 

167 

168class HMACTSig: 

169 """ 

170 HMAC TSIG implementation. This uses the HMAC python module to handle the 

171 sign/verify operations. 

172 """ 

173 

174 _hashes = { 

175 HMAC_SHA1: hashlib.sha1, 

176 HMAC_SHA224: hashlib.sha224, 

177 HMAC_SHA256: hashlib.sha256, 

178 HMAC_SHA256_128: (hashlib.sha256, 128), 

179 HMAC_SHA384: hashlib.sha384, 

180 HMAC_SHA384_192: (hashlib.sha384, 192), 

181 HMAC_SHA512: hashlib.sha512, 

182 HMAC_SHA512_256: (hashlib.sha512, 256), 

183 HMAC_MD5: hashlib.md5, 

184 } 

185 

186 def __init__(self, key, algorithm): 

187 try: 

188 hashinfo = self._hashes[algorithm] 

189 except KeyError: 

190 raise NotImplementedError(f"TSIG algorithm {algorithm} is not supported") 

191 

192 # create the HMAC context 

193 if isinstance(hashinfo, tuple): 

194 self.hmac_context = hmac.new(key, digestmod=hashinfo[0]) 

195 self.size = hashinfo[1] 

196 else: 

197 self.hmac_context = hmac.new(key, digestmod=hashinfo) 

198 self.size = None 

199 self.name = self.hmac_context.name 

200 if self.size: 

201 self.name += f"-{self.size}" 

202 

203 def update(self, data): 

204 return self.hmac_context.update(data) 

205 

206 def sign(self): 

207 # defer to the HMAC digest() function for that digestmod 

208 digest = self.hmac_context.digest() 

209 if self.size: 

210 digest = digest[: (self.size // 8)] 

211 return digest 

212 

213 def verify(self, expected): 

214 # re-digest and compare the results 

215 mac = self.sign() 

216 if not hmac.compare_digest(mac, expected): 

217 raise BadSignature 

218 

219 

220def _digest(wire, key, rdata, time=None, request_mac=None, ctx=None, multi=None): 

221 """Return a context containing the TSIG rdata for the input parameters 

222 @rtype: dns.tsig.HMACTSig or dns.tsig.GSSTSig object 

223 @raises ValueError: I{other_data} is too long 

224 @raises NotImplementedError: I{algorithm} is not supported 

225 """ 

226 

227 first = not (ctx and multi) 

228 if first: 

229 ctx = get_context(key) 

230 if request_mac: 

231 ctx.update(struct.pack("!H", len(request_mac))) 

232 ctx.update(request_mac) 

233 ctx.update(struct.pack("!H", rdata.original_id)) 

234 ctx.update(wire[2:]) 

235 if first: 

236 ctx.update(key.name.to_digestable()) 

237 ctx.update(struct.pack("!H", dns.rdataclass.ANY)) 

238 ctx.update(struct.pack("!I", 0)) 

239 if time is None: 

240 time = rdata.time_signed 

241 upper_time = (time >> 32) & 0xFFFF 

242 lower_time = time & 0xFFFFFFFF 

243 time_encoded = struct.pack("!HIH", upper_time, lower_time, rdata.fudge) 

244 other_len = len(rdata.other) 

245 if other_len > 65535: 

246 raise ValueError("TSIG Other Data is > 65535 bytes") 

247 if first: 

248 ctx.update(key.algorithm.to_digestable() + time_encoded) 

249 ctx.update(struct.pack("!HH", rdata.error, other_len) + rdata.other) 

250 else: 

251 ctx.update(time_encoded) 

252 return ctx 

253 

254 

255def _maybe_start_digest(key, mac, multi): 

256 """If this is the first message in a multi-message sequence, 

257 start a new context. 

258 @rtype: dns.tsig.HMACTSig or dns.tsig.GSSTSig object 

259 """ 

260 if multi: 

261 ctx = get_context(key) 

262 ctx.update(struct.pack("!H", len(mac))) 

263 ctx.update(mac) 

264 return ctx 

265 else: 

266 return None 

267 

268 

269def sign(wire, key, rdata, time=None, request_mac=None, ctx=None, multi=False): 

270 """Return a (tsig_rdata, mac, ctx) tuple containing the HMAC TSIG rdata 

271 for the input parameters, the HMAC MAC calculated by applying the 

272 TSIG signature algorithm, and the TSIG digest context. 

273 @rtype: (string, dns.tsig.HMACTSig or dns.tsig.GSSTSig object) 

274 @raises ValueError: I{other_data} is too long 

275 @raises NotImplementedError: I{algorithm} is not supported 

276 """ 

277 

278 ctx = _digest(wire, key, rdata, time, request_mac, ctx, multi) 

279 mac = ctx.sign() 

280 tsig = rdata.replace(time_signed=time, mac=mac) 

281 

282 return (tsig, _maybe_start_digest(key, mac, multi)) 

283 

284 

285def validate( 

286 wire, key, owner, rdata, now, request_mac, tsig_start, ctx=None, multi=False 

287): 

288 """Validate the specified TSIG rdata against the other input parameters. 

289 

290 @raises FormError: The TSIG is badly formed. 

291 @raises BadTime: There is too much time skew between the client and the 

292 server. 

293 @raises BadSignature: The TSIG signature did not validate 

294 @rtype: dns.tsig.HMACTSig or dns.tsig.GSSTSig object""" 

295 

296 (adcount,) = struct.unpack("!H", wire[10:12]) 

297 if adcount == 0: 

298 raise dns.exception.FormError 

299 adcount -= 1 

300 new_wire = wire[0:10] + struct.pack("!H", adcount) + wire[12:tsig_start] 

301 if rdata.error != 0: 

302 if rdata.error == dns.rcode.BADSIG: 

303 raise PeerBadSignature 

304 elif rdata.error == dns.rcode.BADKEY: 

305 raise PeerBadKey 

306 elif rdata.error == dns.rcode.BADTIME: 

307 raise PeerBadTime 

308 elif rdata.error == dns.rcode.BADTRUNC: 

309 raise PeerBadTruncation 

310 else: 

311 raise PeerError("unknown TSIG error code %d" % rdata.error) 

312 if abs(rdata.time_signed - now) > rdata.fudge: 

313 raise BadTime 

314 if key.name != owner: 

315 raise BadKey 

316 if key.algorithm != rdata.algorithm: 

317 raise BadAlgorithm 

318 ctx = _digest(new_wire, key, rdata, None, request_mac, ctx, multi) 

319 ctx.verify(rdata.mac) 

320 return _maybe_start_digest(key, rdata.mac, multi) 

321 

322 

323def get_context(key): 

324 """Returns an HMAC context for the specified key. 

325 

326 @rtype: HMAC context 

327 @raises NotImplementedError: I{algorithm} is not supported 

328 """ 

329 

330 if key.algorithm == GSS_TSIG: 

331 return GSSTSig(key.secret) 

332 else: 

333 return HMACTSig(key.secret, key.algorithm) 

334 

335 

336class Key: 

337 def __init__(self, name, secret, algorithm=default_algorithm): 

338 if isinstance(name, str): 

339 name = dns.name.from_text(name) 

340 self.name = name 

341 if isinstance(secret, str): 

342 secret = base64.decodebytes(secret.encode()) 

343 self.secret = secret 

344 if isinstance(algorithm, str): 

345 algorithm = dns.name.from_text(algorithm) 

346 self.algorithm = algorithm 

347 

348 def __eq__(self, other): 

349 return ( 

350 isinstance(other, Key) 

351 and self.name == other.name 

352 and self.secret == other.secret 

353 and self.algorithm == other.algorithm 

354 ) 

355 

356 def __repr__(self): 

357 r = f"<DNS key name='{self.name}', " + f"algorithm='{self.algorithm}'" 

358 if self.algorithm != GSS_TSIG: 

359 r += f", secret='{base64.b64encode(self.secret).decode()}'" 

360 r += ">" 

361 return r