Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/tsig.py: 31%
177 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
3# Copyright (C) 2001-2007, 2009-2011 Nominum, Inc.
4#
5# Permission to use, copy, modify, and distribute this software and its
6# documentation for any purpose with or without fee is hereby granted,
7# provided that the above copyright notice and this permission notice
8# appear in all copies.
9#
10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18"""DNS TSIG support."""
20import base64
21import hashlib
22import hmac
23import struct
25import dns.exception
26import dns.name
27import dns.rcode
28import dns.rdataclass
31class BadTime(dns.exception.DNSException):
33 """The current time is not within the TSIG's validity time."""
36class BadSignature(dns.exception.DNSException):
38 """The TSIG signature fails to verify."""
41class BadKey(dns.exception.DNSException):
43 """The TSIG record owner name does not match the key."""
46class BadAlgorithm(dns.exception.DNSException):
48 """The TSIG algorithm does not match the key."""
51class PeerError(dns.exception.DNSException):
53 """Base class for all TSIG errors generated by the remote peer"""
56class PeerBadKey(PeerError):
58 """The peer didn't know the key we used"""
61class PeerBadSignature(PeerError):
63 """The peer didn't like the signature we sent"""
66class PeerBadTime(PeerError):
68 """The peer didn't like the time we sent"""
71class PeerBadTruncation(PeerError):
73 """The peer didn't like amount of truncation in the TSIG we sent"""
76# TSIG Algorithms
78HMAC_MD5 = dns.name.from_text("HMAC-MD5.SIG-ALG.REG.INT")
79HMAC_SHA1 = dns.name.from_text("hmac-sha1")
80HMAC_SHA224 = dns.name.from_text("hmac-sha224")
81HMAC_SHA256 = dns.name.from_text("hmac-sha256")
82HMAC_SHA256_128 = dns.name.from_text("hmac-sha256-128")
83HMAC_SHA384 = dns.name.from_text("hmac-sha384")
84HMAC_SHA384_192 = dns.name.from_text("hmac-sha384-192")
85HMAC_SHA512 = dns.name.from_text("hmac-sha512")
86HMAC_SHA512_256 = dns.name.from_text("hmac-sha512-256")
87GSS_TSIG = dns.name.from_text("gss-tsig")
89default_algorithm = HMAC_SHA256
91mac_sizes = {
92 HMAC_SHA1: 20,
93 HMAC_SHA224: 28,
94 HMAC_SHA256: 32,
95 HMAC_SHA256_128: 16,
96 HMAC_SHA384: 48,
97 HMAC_SHA384_192: 24,
98 HMAC_SHA512: 64,
99 HMAC_SHA512_256: 32,
100 HMAC_MD5: 16,
101 GSS_TSIG: 128, # This is what we assume to be the worst case!
102}
105class GSSTSig:
106 """
107 GSS-TSIG TSIG implementation. This uses the GSS-API context established
108 in the TKEY message handshake to sign messages using GSS-API message
109 integrity codes, per the RFC.
111 In order to avoid a direct GSSAPI dependency, the keyring holds a ref
112 to the GSSAPI object required, rather than the key itself.
113 """
115 def __init__(self, gssapi_context):
116 self.gssapi_context = gssapi_context
117 self.data = b""
118 self.name = "gss-tsig"
120 def update(self, data):
121 self.data += data
123 def sign(self):
124 # defer to the GSSAPI function to sign
125 return self.gssapi_context.get_signature(self.data)
127 def verify(self, expected):
128 try:
129 # defer to the GSSAPI function to verify
130 return self.gssapi_context.verify_signature(self.data, expected)
131 except Exception:
132 # note the usage of a bare exception
133 raise BadSignature
136class GSSTSigAdapter:
137 def __init__(self, keyring):
138 self.keyring = keyring
140 def __call__(self, message, keyname):
141 if keyname in self.keyring:
142 key = self.keyring[keyname]
143 if isinstance(key, Key) and key.algorithm == GSS_TSIG:
144 if message:
145 GSSTSigAdapter.parse_tkey_and_step(key, message, keyname)
146 return key
147 else:
148 return None
150 @classmethod
151 def parse_tkey_and_step(cls, key, message, keyname):
152 # if the message is a TKEY type, absorb the key material
153 # into the context using step(); this is used to allow the
154 # client to complete the GSSAPI negotiation before attempting
155 # to verify the signed response to a TKEY message exchange
156 try:
157 rrset = message.find_rrset(
158 message.answer, keyname, dns.rdataclass.ANY, dns.rdatatype.TKEY
159 )
160 if rrset:
161 token = rrset[0].key
162 gssapi_context = key.secret
163 return gssapi_context.step(token)
164 except KeyError:
165 pass
168class HMACTSig:
169 """
170 HMAC TSIG implementation. This uses the HMAC python module to handle the
171 sign/verify operations.
172 """
174 _hashes = {
175 HMAC_SHA1: hashlib.sha1,
176 HMAC_SHA224: hashlib.sha224,
177 HMAC_SHA256: hashlib.sha256,
178 HMAC_SHA256_128: (hashlib.sha256, 128),
179 HMAC_SHA384: hashlib.sha384,
180 HMAC_SHA384_192: (hashlib.sha384, 192),
181 HMAC_SHA512: hashlib.sha512,
182 HMAC_SHA512_256: (hashlib.sha512, 256),
183 HMAC_MD5: hashlib.md5,
184 }
186 def __init__(self, key, algorithm):
187 try:
188 hashinfo = self._hashes[algorithm]
189 except KeyError:
190 raise NotImplementedError(f"TSIG algorithm {algorithm} is not supported")
192 # create the HMAC context
193 if isinstance(hashinfo, tuple):
194 self.hmac_context = hmac.new(key, digestmod=hashinfo[0])
195 self.size = hashinfo[1]
196 else:
197 self.hmac_context = hmac.new(key, digestmod=hashinfo)
198 self.size = None
199 self.name = self.hmac_context.name
200 if self.size:
201 self.name += f"-{self.size}"
203 def update(self, data):
204 return self.hmac_context.update(data)
206 def sign(self):
207 # defer to the HMAC digest() function for that digestmod
208 digest = self.hmac_context.digest()
209 if self.size:
210 digest = digest[: (self.size // 8)]
211 return digest
213 def verify(self, expected):
214 # re-digest and compare the results
215 mac = self.sign()
216 if not hmac.compare_digest(mac, expected):
217 raise BadSignature
220def _digest(wire, key, rdata, time=None, request_mac=None, ctx=None, multi=None):
221 """Return a context containing the TSIG rdata for the input parameters
222 @rtype: dns.tsig.HMACTSig or dns.tsig.GSSTSig object
223 @raises ValueError: I{other_data} is too long
224 @raises NotImplementedError: I{algorithm} is not supported
225 """
227 first = not (ctx and multi)
228 if first:
229 ctx = get_context(key)
230 if request_mac:
231 ctx.update(struct.pack("!H", len(request_mac)))
232 ctx.update(request_mac)
233 ctx.update(struct.pack("!H", rdata.original_id))
234 ctx.update(wire[2:])
235 if first:
236 ctx.update(key.name.to_digestable())
237 ctx.update(struct.pack("!H", dns.rdataclass.ANY))
238 ctx.update(struct.pack("!I", 0))
239 if time is None:
240 time = rdata.time_signed
241 upper_time = (time >> 32) & 0xFFFF
242 lower_time = time & 0xFFFFFFFF
243 time_encoded = struct.pack("!HIH", upper_time, lower_time, rdata.fudge)
244 other_len = len(rdata.other)
245 if other_len > 65535:
246 raise ValueError("TSIG Other Data is > 65535 bytes")
247 if first:
248 ctx.update(key.algorithm.to_digestable() + time_encoded)
249 ctx.update(struct.pack("!HH", rdata.error, other_len) + rdata.other)
250 else:
251 ctx.update(time_encoded)
252 return ctx
255def _maybe_start_digest(key, mac, multi):
256 """If this is the first message in a multi-message sequence,
257 start a new context.
258 @rtype: dns.tsig.HMACTSig or dns.tsig.GSSTSig object
259 """
260 if multi:
261 ctx = get_context(key)
262 ctx.update(struct.pack("!H", len(mac)))
263 ctx.update(mac)
264 return ctx
265 else:
266 return None
269def sign(wire, key, rdata, time=None, request_mac=None, ctx=None, multi=False):
270 """Return a (tsig_rdata, mac, ctx) tuple containing the HMAC TSIG rdata
271 for the input parameters, the HMAC MAC calculated by applying the
272 TSIG signature algorithm, and the TSIG digest context.
273 @rtype: (string, dns.tsig.HMACTSig or dns.tsig.GSSTSig object)
274 @raises ValueError: I{other_data} is too long
275 @raises NotImplementedError: I{algorithm} is not supported
276 """
278 ctx = _digest(wire, key, rdata, time, request_mac, ctx, multi)
279 mac = ctx.sign()
280 tsig = rdata.replace(time_signed=time, mac=mac)
282 return (tsig, _maybe_start_digest(key, mac, multi))
285def validate(
286 wire, key, owner, rdata, now, request_mac, tsig_start, ctx=None, multi=False
287):
288 """Validate the specified TSIG rdata against the other input parameters.
290 @raises FormError: The TSIG is badly formed.
291 @raises BadTime: There is too much time skew between the client and the
292 server.
293 @raises BadSignature: The TSIG signature did not validate
294 @rtype: dns.tsig.HMACTSig or dns.tsig.GSSTSig object"""
296 (adcount,) = struct.unpack("!H", wire[10:12])
297 if adcount == 0:
298 raise dns.exception.FormError
299 adcount -= 1
300 new_wire = wire[0:10] + struct.pack("!H", adcount) + wire[12:tsig_start]
301 if rdata.error != 0:
302 if rdata.error == dns.rcode.BADSIG:
303 raise PeerBadSignature
304 elif rdata.error == dns.rcode.BADKEY:
305 raise PeerBadKey
306 elif rdata.error == dns.rcode.BADTIME:
307 raise PeerBadTime
308 elif rdata.error == dns.rcode.BADTRUNC:
309 raise PeerBadTruncation
310 else:
311 raise PeerError("unknown TSIG error code %d" % rdata.error)
312 if abs(rdata.time_signed - now) > rdata.fudge:
313 raise BadTime
314 if key.name != owner:
315 raise BadKey
316 if key.algorithm != rdata.algorithm:
317 raise BadAlgorithm
318 ctx = _digest(new_wire, key, rdata, None, request_mac, ctx, multi)
319 ctx.verify(rdata.mac)
320 return _maybe_start_digest(key, rdata.mac, multi)
323def get_context(key):
324 """Returns an HMAC context for the specified key.
326 @rtype: HMAC context
327 @raises NotImplementedError: I{algorithm} is not supported
328 """
330 if key.algorithm == GSS_TSIG:
331 return GSSTSig(key.secret)
332 else:
333 return HMACTSig(key.secret, key.algorithm)
336class Key:
337 def __init__(self, name, secret, algorithm=default_algorithm):
338 if isinstance(name, str):
339 name = dns.name.from_text(name)
340 self.name = name
341 if isinstance(secret, str):
342 secret = base64.decodebytes(secret.encode())
343 self.secret = secret
344 if isinstance(algorithm, str):
345 algorithm = dns.name.from_text(algorithm)
346 self.algorithm = algorithm
348 def __eq__(self, other):
349 return (
350 isinstance(other, Key)
351 and self.name == other.name
352 and self.secret == other.secret
353 and self.algorithm == other.algorithm
354 )
356 def __repr__(self):
357 r = f"<DNS key name='{self.name}', " + f"algorithm='{self.algorithm}'"
358 if self.algorithm != GSS_TSIG:
359 r += f", secret='{base64.b64encode(self.secret).decode()}'"
360 r += ">"
361 return r