Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/ipsec.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) 2014 6WIND
6r"""
7IPsec layer
8===========
10Example of use:
12>>> sa = SecurityAssociation(ESP, spi=0xdeadbeef, crypt_algo='AES-CBC',
13... crypt_key=b'sixteenbytes key')
14>>> p = IP(src='1.1.1.1', dst='2.2.2.2')
15>>> p /= TCP(sport=45012, dport=80)
16>>> p /= Raw('testdata')
17>>> p = IP(raw(p))
18>>> p
19<IP version=4L ihl=5L tos=0x0 len=48 id=1 flags= frag=0L ttl=64 proto=tcp chksum=0x74c2 src=1.1.1.1 dst=2.2.2.2 options=[] |<TCP sport=45012 dport=http seq=0 ack=0 dataofs=5L reserved=0L flags=S window=8192 chksum=0x1914 urgptr=0 options=[] |<Raw load='testdata' |>>> # noqa: E501
20>>>
21>>> e = sa.encrypt(p)
22>>> e
23<IP version=4L ihl=5L tos=0x0 len=76 id=1 flags= frag=0L ttl=64 proto=esp chksum=0x747a src=1.1.1.1 dst=2.2.2.2 |<ESP spi=0xdeadbeef seq=1 data=b'\xf8\xdb\x1e\x83[T\xab\\\xd2\x1b\xed\xd1\xe5\xc8Y\xc2\xa5d\x92\xc1\x05\x17\xa6\x92\x831\xe6\xc1]\x9a\xd6K}W\x8bFfd\xa5B*+\xde\xc8\x89\xbf{\xa9' |>> # noqa: E501
24>>>
25>>> d = sa.decrypt(e)
26>>> d
27<IP version=4L ihl=5L tos=0x0 len=48 id=1 flags= frag=0L ttl=64 proto=tcp chksum=0x74c2 src=1.1.1.1 dst=2.2.2.2 |<TCP sport=45012 dport=http seq=0 ack=0 dataofs=5L reserved=0L flags=S window=8192 chksum=0x1914 urgptr=0 options=[] |<Raw load='testdata' |>>> # noqa: E501
28>>>
29>>> d == p
30True
31"""
33try:
34 from math import gcd
35except ImportError:
36 from fractions import gcd
37import os
38import socket
39import struct
40import warnings
42from scapy.config import conf, crypto_validator
43from scapy.compat import orb, raw
44from scapy.data import IP_PROTOS
45from scapy.error import log_loading
46from scapy.fields import (
47 ByteEnumField,
48 ByteField,
49 IntField,
50 PacketField,
51 ShortField,
52 StrField,
53 XByteField,
54 XIntField,
55 XStrField,
56 XStrLenField,
57)
58from scapy.packet import (
59 Packet,
60 Raw,
61 bind_bottom_up,
62 bind_layers,
63 bind_top_down,
64)
65from scapy.layers.inet import IP, UDP
66from scapy.layers.inet6 import IPv6, IPv6ExtHdrHopByHop, IPv6ExtHdrDestOpt, \
67 IPv6ExtHdrRouting
70###############################################################################
71class AH(Packet):
72 """
73 Authentication Header
75 See https://tools.ietf.org/rfc/rfc4302.txt
76 """
78 name = 'AH'
80 def __get_icv_len(self):
81 """
82 Compute the size of the ICV based on the payloadlen field.
83 Padding size is included as it can only be known from the authentication # noqa: E501
84 algorithm provided by the Security Association.
85 """
86 # payloadlen = length of AH in 32-bit words (4-byte units), minus "2"
87 # payloadlen = 3 32-bit word fixed fields + ICV + padding - 2
88 # ICV = (payloadlen + 2 - 3 - padding) in 32-bit words
89 return (self.payloadlen - 1) * 4
91 fields_desc = [
92 ByteEnumField('nh', None, IP_PROTOS),
93 ByteField('payloadlen', None),
94 ShortField('reserved', None),
95 XIntField('spi', 0x00000001),
96 IntField('seq', 0),
97 XStrLenField('icv', None, length_from=__get_icv_len),
98 # Padding len can only be known with the SecurityAssociation.auth_algo
99 XStrLenField('padding', None, length_from=lambda x: 0),
100 ]
102 overload_fields = {
103 IP: {'proto': socket.IPPROTO_AH},
104 IPv6: {'nh': socket.IPPROTO_AH},
105 IPv6ExtHdrHopByHop: {'nh': socket.IPPROTO_AH},
106 IPv6ExtHdrDestOpt: {'nh': socket.IPPROTO_AH},
107 IPv6ExtHdrRouting: {'nh': socket.IPPROTO_AH},
108 }
111bind_layers(IP, AH, proto=socket.IPPROTO_AH)
112bind_layers(IPv6, AH, nh=socket.IPPROTO_AH)
113bind_layers(AH, IP, nh=socket.IPPROTO_IP)
114bind_layers(AH, IPv6, nh=socket.IPPROTO_IPV6)
116###############################################################################
119class ESP(Packet):
120 """
121 Encapsulated Security Payload
123 See https://tools.ietf.org/rfc/rfc4303.txt
124 """
125 name = 'ESP'
127 fields_desc = [
128 XIntField('spi', 0x00000001),
129 IntField('seq', 0),
130 XStrField('data', None),
131 ]
133 @classmethod
134 def dispatch_hook(cls, _pkt=None, *args, **kargs):
135 if _pkt:
136 if len(_pkt) >= 4 and struct.unpack("!I", _pkt[0:4])[0] == 0x00:
137 return NON_ESP
138 elif len(_pkt) == 1 and struct.unpack("!B", _pkt)[0] == 0xff:
139 return NAT_KEEPALIVE
140 else:
141 return ESP
142 return cls
144 overload_fields = {
145 IP: {'proto': socket.IPPROTO_ESP},
146 IPv6: {'nh': socket.IPPROTO_ESP},
147 IPv6ExtHdrHopByHop: {'nh': socket.IPPROTO_ESP},
148 IPv6ExtHdrDestOpt: {'nh': socket.IPPROTO_ESP},
149 IPv6ExtHdrRouting: {'nh': socket.IPPROTO_ESP},
150 }
153class NON_ESP(Packet): # RFC 3948, section 2.2
154 fields_desc = [
155 XIntField("non_esp", 0x0)
156 ]
159class NAT_KEEPALIVE(Packet): # RFC 3948, section 2.2
160 fields_desc = [
161 XByteField("nat_keepalive", 0xFF)
162 ]
165bind_layers(IP, ESP, proto=socket.IPPROTO_ESP)
166bind_layers(IPv6, ESP, nh=socket.IPPROTO_ESP)
168# NAT-Traversal encapsulation
169bind_bottom_up(UDP, ESP, dport=4500)
170bind_bottom_up(UDP, ESP, sport=4500)
171bind_top_down(UDP, ESP, dport=4500, sport=4500)
172bind_top_down(UDP, NON_ESP, dport=4500, sport=4500)
173bind_top_down(UDP, NAT_KEEPALIVE, dport=4500, sport=4500)
175###############################################################################
178class _ESPPlain(Packet):
179 """
180 Internal class to represent unencrypted ESP packets.
181 """
182 name = 'ESP'
184 fields_desc = [
185 XIntField('spi', 0x0),
186 IntField('seq', 0),
188 StrField('iv', ''),
189 PacketField('data', '', Raw),
190 StrField('padding', ''),
192 ByteField('padlen', 0),
193 ByteEnumField('nh', 0, IP_PROTOS),
194 StrField('icv', ''),
195 ]
197 def data_for_encryption(self):
198 return raw(self.data) + self.padding + struct.pack("BB", self.padlen, self.nh) # noqa: E501
201###############################################################################
202if conf.crypto_valid:
203 from cryptography.exceptions import InvalidTag
204 from cryptography.hazmat.backends import default_backend
205 from cryptography.hazmat.primitives.ciphers import (
206 aead,
207 Cipher,
208 algorithms,
209 modes,
210 )
211 try:
212 # cryptography > 43.0
213 from cryptography.hazmat.decrepit.ciphers import (
214 algorithms as decrepit_algorithms
215 )
216 except ImportError:
217 decrepit_algorithms = algorithms
219 # cryptography's TripleDES can be used to simulate DES behavior
220 DES = lambda key: decrepit_algorithms.TripleDES(key * 3)
221 DES.key_sizes = decrepit_algorithms.TripleDES.key_sizes
222 DES.block_size = decrepit_algorithms.TripleDES.block_size
223else:
224 log_loading.info("Can't import python-cryptography v1.7+. "
225 "Disabled IPsec encryption/authentication.")
226 default_backend = None
227 InvalidTag = Exception
228 Cipher = algorithms = modes = DES = None
230###############################################################################
233def _lcm(a, b):
234 """
235 Least Common Multiple between 2 integers.
236 """
237 if a == 0 or b == 0:
238 return 0
239 else:
240 return abs(a * b) // gcd(a, b)
243class CryptAlgo(object):
244 """
245 IPsec encryption algorithm
246 """
248 def __init__(self, name, cipher, mode, block_size=None, iv_size=None,
249 key_size=None, icv_size=None, salt_size=None, format_mode_iv=None): # noqa: E501
250 """
251 :param name: the name of this encryption algorithm
252 :param cipher: a Cipher module
253 :param mode: the mode used with the cipher module
254 :param block_size: the length a block for this algo. Defaults to the
255 `block_size` of the cipher.
256 :param iv_size: the length of the initialization vector of this algo.
257 Defaults to the `block_size` of the cipher.
258 :param key_size: an integer or list/tuple of integers. If specified,
259 force the secret keys length to one of the values.
260 Defaults to the `key_size` of the cipher.
261 :param icv_size: the length of the Integrity Check Value of this algo.
262 Used by Combined Mode Algorithms e.g. GCM
263 :param salt_size: the length of the salt to use as the IV prefix.
264 Usually used by Counter modes e.g. CTR
265 :param format_mode_iv: function to format the Initialization Vector
266 e.g. handle the salt value
267 Default is the random buffer from `generate_iv`
268 """
269 self.name = name
270 self.cipher = cipher
271 self.mode = mode
272 self.icv_size = icv_size
274 self.is_aead = False
275 # If using cryptography.hazmat.primitives.cipher.aead
276 self.ciphers_aead_api = False
278 if modes:
279 if self.mode is not None:
280 self.is_aead = issubclass(self.mode,
281 modes.ModeWithAuthenticationTag)
282 elif self.cipher in (aead.AESGCM, aead.AESCCM,
283 aead.ChaCha20Poly1305):
284 self.is_aead = True
285 self.ciphers_aead_api = True
287 if block_size is not None:
288 self.block_size = block_size
289 elif cipher is not None:
290 self.block_size = cipher.block_size // 8
291 else:
292 self.block_size = 1
294 if iv_size is None:
295 self.iv_size = self.block_size
296 else:
297 self.iv_size = iv_size
299 if key_size is not None:
300 self.key_size = key_size
301 elif cipher is not None:
302 self.key_size = tuple(i // 8 for i in cipher.key_sizes)
303 else:
304 self.key_size = None
306 if salt_size is None:
307 self.salt_size = 0
308 else:
309 self.salt_size = salt_size
311 if format_mode_iv is None:
312 self._format_mode_iv = lambda iv, **kw: iv
313 else:
314 self._format_mode_iv = format_mode_iv
316 def check_key(self, key):
317 """
318 Check that the key length is valid.
320 :param key: a byte string
321 """
322 if self.key_size and not (len(key) == self.key_size or len(key) in self.key_size): # noqa: E501
323 raise TypeError('invalid key size %s, must be %s' %
324 (len(key), self.key_size))
326 def generate_iv(self):
327 """
328 Generate a random initialization vector.
329 """
330 # XXX: Handle counter modes with real counters? RFCs allow the use of
331 # XXX: random bytes for counters, so it is not wrong to do it that way
332 return os.urandom(self.iv_size)
334 @crypto_validator
335 def new_cipher(self, key, mode_iv, digest=None):
336 """
337 :param key: the secret key, a byte string
338 :param mode_iv: the initialization vector or nonce, a byte string.
339 Formatted by `format_mode_iv`.
340 :param digest: also known as tag or icv. A byte string containing the
341 digest of the encrypted data. Only use this during
342 decryption!
344 :returns: an initialized cipher object for this algo
345 """
346 if self.is_aead and digest is not None:
347 # With AEAD, the mode needs the digest during decryption.
348 return Cipher(
349 self.cipher(key),
350 self.mode(mode_iv, digest, len(digest)),
351 default_backend(),
352 )
353 else:
354 return Cipher(
355 self.cipher(key),
356 self.mode(mode_iv),
357 default_backend(),
358 )
360 def pad(self, esp):
361 """
362 Add the correct amount of padding so that the data to encrypt is
363 exactly a multiple of the algorithm's block size.
365 Also, make sure that the total ESP packet length is a multiple of 4
366 bytes.
368 :param esp: an unencrypted _ESPPlain packet
370 :returns: an unencrypted _ESPPlain packet with valid padding
371 """
372 # 2 extra bytes for padlen and nh
373 data_len = len(esp.data) + 2
375 # according to the RFC4303, section 2.4. Padding (for Encryption)
376 # the size of the ESP payload must be a multiple of 32 bits
377 align = _lcm(self.block_size, 4)
379 # pad for block size
380 esp.padlen = -data_len % align
382 # Still according to the RFC, the default value for padding *MUST* be an # noqa: E501
383 # array of bytes starting from 1 to padlen
384 # TODO: Handle padding function according to the encryption algo
385 esp.padding = struct.pack("B" * esp.padlen, *range(1, esp.padlen + 1))
387 # If the following test fails, it means that this algo does not comply
388 # with the RFC
389 payload_len = len(esp.iv) + len(esp.data) + len(esp.padding) + 2
390 if payload_len % 4 != 0:
391 raise ValueError('The size of the ESP data is not aligned to 32 bits after padding.') # noqa: E501
393 return esp
395 def encrypt(self, sa, esp, key, icv_size=None, esn_en=False, esn=0):
396 """
397 Encrypt an ESP packet
399 :param sa: the SecurityAssociation associated with the ESP packet.
400 :param esp: an unencrypted _ESPPlain packet with valid padding
401 :param key: the secret key used for encryption
402 :param icv_size: the length of the icv used for integrity check
403 :esn_en: extended sequence number enable which allows to use 64-bit
404 sequence number instead of 32-bit when using an AEAD
405 algorithm
406 :esn: extended sequence number (32 MSB)
407 :return: a valid ESP packet encrypted with this algorithm
408 """
409 if icv_size is None:
410 icv_size = self.icv_size if self.is_aead else 0
411 data = esp.data_for_encryption()
413 if self.cipher:
414 mode_iv = self._format_mode_iv(algo=self, sa=sa, iv=esp.iv)
415 aad = None
416 if self.is_aead:
417 if esn_en:
418 aad = struct.pack('!LLL', esp.spi, esn, esp.seq)
419 else:
420 aad = struct.pack('!LL', esp.spi, esp.seq)
421 if self.ciphers_aead_api:
422 # New API
423 if self.cipher == aead.AESCCM:
424 cipher = self.cipher(key, tag_length=icv_size)
425 else:
426 cipher = self.cipher(key)
427 if self.name == 'AES-NULL-GMAC':
428 # Special case for GMAC (rfc 4543 sect 3)
429 data = data + cipher.encrypt(mode_iv, b"", aad + esp.iv + data)
430 else:
431 data = cipher.encrypt(mode_iv, data, aad)
432 else:
433 cipher = self.new_cipher(key, mode_iv)
434 encryptor = cipher.encryptor()
436 if self.is_aead:
437 encryptor.authenticate_additional_data(aad)
438 data = encryptor.update(data) + encryptor.finalize()
439 data += encryptor.tag[:icv_size]
440 else:
441 data = encryptor.update(data) + encryptor.finalize()
443 return ESP(spi=esp.spi, seq=esp.seq, data=esp.iv + data)
445 def decrypt(self, sa, esp, key, icv_size=None, esn_en=False, esn=0):
446 """
447 Decrypt an ESP packet
449 :param sa: the SecurityAssociation associated with the ESP packet.
450 :param esp: an encrypted ESP packet
451 :param key: the secret key used for encryption
452 :param icv_size: the length of the icv used for integrity check
453 :param esn_en: extended sequence number enable which allows to use
454 64-bit sequence number instead of 32-bit when using an
455 AEAD algorithm
456 :param esn: extended sequence number (32 MSB)
457 :returns: a valid ESP packet encrypted with this algorithm
458 :raise scapy.layers.ipsec.IPSecIntegrityError: if the integrity check
459 fails with an AEAD algorithm
460 """
461 if icv_size is None:
462 icv_size = self.icv_size if self.is_aead else 0
464 iv = esp.data[:self.iv_size]
465 data = esp.data[self.iv_size:len(esp.data) - icv_size]
466 icv = esp.data[len(esp.data) - icv_size:]
468 if self.cipher:
469 mode_iv = self._format_mode_iv(sa=sa, iv=iv)
470 aad = None
471 if self.is_aead:
472 if esn_en:
473 aad = struct.pack('!LLL', esp.spi, esn, esp.seq)
474 else:
475 aad = struct.pack('!LL', esp.spi, esp.seq)
476 if self.ciphers_aead_api:
477 # New API
478 if self.cipher == aead.AESCCM:
479 cipher = self.cipher(key, tag_length=icv_size)
480 else:
481 cipher = self.cipher(key)
482 try:
483 if self.name == 'AES-NULL-GMAC':
484 # Special case for GMAC (rfc 4543 sect 3)
485 data = data + cipher.decrypt(mode_iv, icv, aad + iv + data)
486 else:
487 data = cipher.decrypt(mode_iv, data + icv, aad)
488 except InvalidTag as err:
489 raise IPSecIntegrityError(err)
490 else:
491 cipher = self.new_cipher(key, mode_iv, icv)
492 decryptor = cipher.decryptor()
494 if self.is_aead:
495 # Tag value check is done during the finalize method
496 decryptor.authenticate_additional_data(aad)
497 try:
498 data = decryptor.update(data) + decryptor.finalize()
499 except InvalidTag as err:
500 raise IPSecIntegrityError(err)
502 # extract padlen and nh
503 padlen = orb(data[-2])
504 nh = orb(data[-1])
506 # then use padlen to determine data and padding
507 padding = data[len(data) - padlen - 2: len(data) - 2]
508 data = data[:len(data) - padlen - 2]
510 return _ESPPlain(spi=esp.spi,
511 seq=esp.seq,
512 iv=iv,
513 data=data,
514 padding=padding,
515 padlen=padlen,
516 nh=nh,
517 icv=icv)
519###############################################################################
520# The names of the encryption algorithms are the same than in scapy.contrib.ikev2 # noqa: E501
521# see http://www.iana.org/assignments/ikev2-parameters/ikev2-parameters.xhtml
524CRYPT_ALGOS = {
525 'NULL': CryptAlgo('NULL', cipher=None, mode=None, iv_size=0),
526}
528if algorithms:
529 CRYPT_ALGOS['AES-CBC'] = CryptAlgo('AES-CBC',
530 cipher=algorithms.AES,
531 mode=modes.CBC)
532 _aes_ctr_format_mode_iv = lambda sa, iv, **kw: sa.crypt_salt + iv + b'\x00\x00\x00\x01' # noqa: E501
533 CRYPT_ALGOS['AES-CTR'] = CryptAlgo('AES-CTR',
534 cipher=algorithms.AES,
535 mode=modes.CTR,
536 block_size=1,
537 iv_size=8,
538 salt_size=4,
539 format_mode_iv=_aes_ctr_format_mode_iv)
540 _salt_format_mode_iv = lambda sa, iv, **kw: sa.crypt_salt + iv
541 CRYPT_ALGOS['AES-GCM'] = CryptAlgo('AES-GCM',
542 cipher=aead.AESGCM,
543 key_size=(16, 24, 32),
544 mode=None,
545 salt_size=4,
546 block_size=1,
547 iv_size=8,
548 icv_size=16,
549 format_mode_iv=_salt_format_mode_iv)
550 # GMAC: rfc 4543, "companion to the AES Galois/Counter Mode ESP"
551 # This is defined as a crypt_algo by rfc, but has the role of an auth_algo
552 CRYPT_ALGOS['AES-NULL-GMAC'] = CryptAlgo('AES-NULL-GMAC',
553 cipher=aead.AESGCM,
554 key_size=(16, 24, 32),
555 mode=None,
556 salt_size=4,
557 block_size=1,
558 iv_size=8,
559 icv_size=16,
560 format_mode_iv=_salt_format_mode_iv)
561 CRYPT_ALGOS['AES-CCM'] = CryptAlgo('AES-CCM',
562 cipher=aead.AESCCM,
563 mode=None,
564 key_size=(16, 24, 32),
565 block_size=1,
566 iv_size=8,
567 salt_size=3,
568 icv_size=16,
569 format_mode_iv=_salt_format_mode_iv)
570 CRYPT_ALGOS['CHACHA20-POLY1305'] = CryptAlgo('CHACHA20-POLY1305',
571 cipher=aead.ChaCha20Poly1305,
572 mode=None,
573 key_size=32,
574 block_size=1,
575 iv_size=8,
576 salt_size=4,
577 icv_size=16,
578 format_mode_iv=_salt_format_mode_iv) # noqa: E501
580 # Using a TripleDES cipher algorithm for DES is done by using the same 64
581 # bits key 3 times
582 CRYPT_ALGOS['DES'] = CryptAlgo('DES',
583 cipher=DES,
584 mode=modes.CBC,
585 key_size=(8,))
586 CRYPT_ALGOS['3DES'] = CryptAlgo('3DES',
587 cipher=decrepit_algorithms.TripleDES,
588 mode=modes.CBC)
589 if decrepit_algorithms is algorithms:
590 # cryptography < 43 raises a DeprecationWarning
591 from cryptography.utils import CryptographyDeprecationWarning
592 with warnings.catch_warnings():
593 # Hide deprecation warnings
594 warnings.filterwarnings("ignore",
595 category=CryptographyDeprecationWarning)
596 CRYPT_ALGOS['CAST'] = CryptAlgo('CAST',
597 cipher=decrepit_algorithms.CAST5,
598 mode=modes.CBC)
599 CRYPT_ALGOS['Blowfish'] = CryptAlgo('Blowfish',
600 cipher=decrepit_algorithms.Blowfish,
601 mode=modes.CBC)
602 else:
603 CRYPT_ALGOS['CAST'] = CryptAlgo('CAST',
604 cipher=decrepit_algorithms.CAST5,
605 mode=modes.CBC)
606 CRYPT_ALGOS['Blowfish'] = CryptAlgo('Blowfish',
607 cipher=decrepit_algorithms.Blowfish,
608 mode=modes.CBC)
611###############################################################################
612if conf.crypto_valid:
613 from cryptography.hazmat.primitives.hmac import HMAC
614 from cryptography.hazmat.primitives.cmac import CMAC
615 from cryptography.hazmat.primitives import hashes
616else:
617 # no error if cryptography is not available but authentication won't be supported # noqa: E501
618 HMAC = CMAC = hashes = None
620###############################################################################
623class IPSecIntegrityError(Exception):
624 """
625 Error risen when the integrity check fails.
626 """
627 pass
630class AuthAlgo(object):
631 """
632 IPsec integrity algorithm
633 """
635 def __init__(self, name, mac, digestmod, icv_size, key_size=None):
636 """
637 :param name: the name of this integrity algorithm
638 :param mac: a Message Authentication Code module
639 :param digestmod: a Hash or Cipher module
640 :param icv_size: the length of the integrity check value of this algo
641 :param key_size: an integer or list/tuple of integers. If specified,
642 force the secret keys length to one of the values.
643 Defaults to the `key_size` of the cipher.
644 """
645 self.name = name
646 self.mac = mac
647 self.digestmod = digestmod
648 self.icv_size = icv_size
649 self.key_size = key_size
651 def check_key(self, key):
652 """
653 Check that the key length is valid.
655 :param key: a byte string
656 """
657 if self.key_size and len(key) not in self.key_size:
658 raise TypeError('invalid key size %s, must be one of %s' %
659 (len(key), self.key_size))
661 @crypto_validator
662 def new_mac(self, key):
663 """
664 :param key: a byte string
665 :returns: an initialized mac object for this algo
666 """
667 if self.mac is CMAC:
668 return self.mac(self.digestmod(key), default_backend())
669 else:
670 return self.mac(key, self.digestmod(), default_backend())
672 def sign(self, pkt, key, esn_en=False, esn=0):
673 """
674 Sign an IPsec (ESP or AH) packet with this algo.
676 :param pkt: a packet that contains a valid encrypted ESP or AH layer
677 :param key: the authentication key, a byte string
678 :param esn_en: extended sequence number enable which allows to use
679 64-bit sequence number instead of 32-bit
680 :param esn: extended sequence number (32 MSB)
682 :returns: the signed packet
683 """
684 if not self.mac:
685 return pkt
687 mac = self.new_mac(key)
689 if pkt.haslayer(ESP):
690 mac.update(bytes(pkt[ESP]))
691 if esn_en:
692 # RFC4303 sect 2.2.1
693 mac.update(struct.pack('!L', esn))
694 pkt[ESP].data += mac.finalize()[:self.icv_size]
696 elif pkt.haslayer(AH):
697 mac.update(bytes(zero_mutable_fields(pkt.copy(), sending=True)))
698 if esn_en:
699 # RFC4302 sect 2.5.1
700 mac.update(struct.pack('!L', esn))
701 pkt[AH].icv = mac.finalize()[:self.icv_size]
703 return pkt
705 def verify(self, pkt, key, esn_en=False, esn=0):
706 """
707 Check that the integrity check value (icv) of a packet is valid.
709 :param pkt: a packet that contains a valid encrypted ESP or AH layer
710 :param key: the authentication key, a byte string
711 :param esn_en: extended sequence number enable which allows to use
712 64-bit sequence number instead of 32-bit
713 :param esn: extended sequence number (32 MSB)
715 :raise scapy.layers.ipsec.IPSecIntegrityError: if the integrity check
716 fails
717 """
718 if not self.mac or self.icv_size == 0:
719 return
721 mac = self.new_mac(key)
723 pkt_icv = 'not found'
725 if isinstance(pkt, ESP):
726 pkt_icv = pkt.data[len(pkt.data) - self.icv_size:]
727 clone = pkt.copy()
728 clone.data = clone.data[:len(clone.data) - self.icv_size]
729 mac.update(bytes(clone))
730 if esn_en:
731 # RFC4303 sect 2.2.1
732 mac.update(struct.pack('!L', esn))
734 elif pkt.haslayer(AH):
735 if len(pkt[AH].icv) != self.icv_size:
736 # Fill padding since we know the actual icv_size
737 pkt[AH].padding = pkt[AH].icv[self.icv_size:]
738 pkt[AH].icv = pkt[AH].icv[:self.icv_size]
739 pkt_icv = pkt[AH].icv
740 clone = zero_mutable_fields(pkt.copy(), sending=False)
741 mac.update(bytes(clone))
742 if esn_en:
743 # RFC4302 sect 2.5.1
744 mac.update(struct.pack('!L', esn))
746 computed_icv = mac.finalize()[:self.icv_size]
748 # XXX: Cannot use mac.verify because the ICV can be truncated
749 if pkt_icv != computed_icv:
750 raise IPSecIntegrityError('pkt_icv=%r, computed_icv=%r' %
751 (pkt_icv, computed_icv))
753###############################################################################
754# The names of the integrity algorithms are the same than in scapy.contrib.ikev2 # noqa: E501
755# see http://www.iana.org/assignments/ikev2-parameters/ikev2-parameters.xhtml
758AUTH_ALGOS = {
759 'NULL': AuthAlgo('NULL', mac=None, digestmod=None, icv_size=0),
760}
762if HMAC and hashes:
763 # XXX: NIST has deprecated SHA1 but is required by RFC7321
764 AUTH_ALGOS['HMAC-SHA1-96'] = AuthAlgo('HMAC-SHA1-96',
765 mac=HMAC,
766 digestmod=hashes.SHA1,
767 icv_size=12)
768 AUTH_ALGOS['SHA2-256-128'] = AuthAlgo('SHA2-256-128',
769 mac=HMAC,
770 digestmod=hashes.SHA256,
771 icv_size=16)
772 AUTH_ALGOS['SHA2-384-192'] = AuthAlgo('SHA2-384-192',
773 mac=HMAC,
774 digestmod=hashes.SHA384,
775 icv_size=24)
776 AUTH_ALGOS['SHA2-512-256'] = AuthAlgo('SHA2-512-256',
777 mac=HMAC,
778 digestmod=hashes.SHA512,
779 icv_size=32)
780 # XXX:Flagged as deprecated by 'cryptography'. Kept for backward compat
781 AUTH_ALGOS['HMAC-MD5-96'] = AuthAlgo('HMAC-MD5-96',
782 mac=HMAC,
783 digestmod=hashes.MD5,
784 icv_size=12)
785if CMAC and algorithms:
786 AUTH_ALGOS['AES-CMAC-96'] = AuthAlgo('AES-CMAC-96',
787 mac=CMAC,
788 digestmod=algorithms.AES,
789 icv_size=12,
790 key_size=(16,))
792###############################################################################
795def split_for_transport(orig_pkt, transport_proto):
796 """
797 Split an IP(v6) packet in the correct location to insert an ESP or AH
798 header.
800 :param orig_pkt: the packet to split. Must be an IP or IPv6 packet
801 :param transport_proto: the IPsec protocol number that will be inserted
802 at the split position.
803 :returns: a tuple (header, nh, payload) where nh is the protocol number of
804 payload.
805 """
806 # force resolution of default fields to avoid padding errors
807 header = orig_pkt.__class__(raw(orig_pkt))
808 next_hdr = header.payload
809 nh = None
811 if header.version == 4:
812 nh = header.proto
813 header.proto = transport_proto
814 header.remove_payload()
815 del header.chksum
816 del header.len
818 return header, nh, next_hdr
819 else:
820 found_rt_hdr = False
821 prev = header
823 # Since the RFC 4302 is vague about where the ESP/AH headers should be
824 # inserted in IPv6, I chose to follow the linux implementation.
825 while isinstance(next_hdr, (IPv6ExtHdrHopByHop, IPv6ExtHdrRouting, IPv6ExtHdrDestOpt)): # noqa: E501
826 if isinstance(next_hdr, IPv6ExtHdrHopByHop):
827 pass
828 if isinstance(next_hdr, IPv6ExtHdrRouting):
829 found_rt_hdr = True
830 elif isinstance(next_hdr, IPv6ExtHdrDestOpt) and found_rt_hdr:
831 break
833 prev = next_hdr
834 next_hdr = next_hdr.payload
836 nh = prev.nh
837 prev.nh = transport_proto
838 prev.remove_payload()
839 del header.plen
841 return header, nh, next_hdr
844###############################################################################
845# see RFC 4302 - Appendix A. Mutability of IP Options/Extension Headers
846IMMUTABLE_IPV4_OPTIONS = (
847 0, # End Of List
848 1, # No OPeration
849 2, # Security
850 5, # Extended Security
851 6, # Commercial Security
852 20, # Router Alert
853 21, # Sender Directed Multi-Destination Delivery
854)
857def zero_mutable_fields(pkt, sending=False):
858 """
859 When using AH, all "mutable" fields must be "zeroed" before calculating
860 the ICV. See RFC 4302, Section 3.3.3.1. Handling Mutable Fields.
862 :param pkt: an IP(v6) packet containing an AH layer.
863 NOTE: The packet will be modified
864 :param sending: if true, ipv6 routing headers will not be reordered
865 """
867 if pkt.haslayer(AH):
868 pkt[AH].icv = b"\x00" * len(pkt[AH].icv)
869 else:
870 raise TypeError('no AH layer found')
872 if pkt.version == 4:
873 # the tos field has been replaced by DSCP and ECN
874 # Routers may rewrite the DS field as needed to provide a
875 # desired local or end-to-end service
876 pkt.tos = 0
877 # an intermediate router might set the DF bit, even if the source
878 # did not select it.
879 pkt.flags = 0
880 # changed en route as a normal course of processing by routers
881 pkt.ttl = 0
882 # will change if any of these other fields change
883 pkt.chksum = 0
885 immutable_opts = []
886 for opt in pkt.options:
887 if opt.option in IMMUTABLE_IPV4_OPTIONS:
888 immutable_opts.append(opt)
889 else:
890 immutable_opts.append(Raw(b"\x00" * len(opt)))
891 pkt.options = immutable_opts
893 else:
894 # holds DSCP and ECN
895 pkt.tc = 0
896 # The flow label described in AHv1 was mutable, and in RFC 2460 [DH98]
897 # was potentially mutable. To retain compatibility with existing AH
898 # implementations, the flow label is not included in the ICV in AHv2.
899 pkt.fl = 0
900 # same as ttl
901 pkt.hlim = 0
903 next_hdr = pkt.payload
905 while isinstance(next_hdr, (IPv6ExtHdrHopByHop, IPv6ExtHdrRouting, IPv6ExtHdrDestOpt)): # noqa: E501
906 if isinstance(next_hdr, (IPv6ExtHdrHopByHop, IPv6ExtHdrDestOpt)):
907 for opt in next_hdr.options:
908 if opt.otype & 0x20:
909 # option data can change en-route and must be zeroed
910 opt.optdata = b"\x00" * opt.optlen
911 elif isinstance(next_hdr, IPv6ExtHdrRouting) and sending:
912 # The sender must order the field so that it appears as it
913 # will at the receiver, prior to performing the ICV computation. # noqa: E501
914 next_hdr.segleft = 0
915 if next_hdr.addresses:
916 final = next_hdr.addresses.pop()
917 next_hdr.addresses.insert(0, pkt.dst)
918 pkt.dst = final
919 else:
920 break
922 next_hdr = next_hdr.payload
924 return pkt
926###############################################################################
929class SecurityAssociation(object):
930 """
931 This class is responsible of "encryption" and "decryption" of IPsec packets. # noqa: E501
932 """
934 SUPPORTED_PROTOS = (IP, IPv6)
936 def __init__(self, proto, spi, seq_num=1, crypt_algo=None, crypt_key=None,
937 crypt_icv_size=None,
938 auth_algo=None, auth_key=None,
939 tunnel_header=None, nat_t_header=None, esn_en=False, esn=0):
940 """
941 :param proto: the IPsec proto to use (ESP or AH)
942 :param spi: the Security Parameters Index of this SA
943 :param seq_num: the initial value for the sequence number on encrypted
944 packets
945 :param crypt_algo: the encryption algorithm name (only used with ESP)
946 :param crypt_key: the encryption key (only used with ESP)
947 :param crypt_icv_size: change the default size of the crypt_algo
948 (only used with ESP)
949 :param auth_algo: the integrity algorithm name
950 :param auth_key: the integrity key
951 :param tunnel_header: an instance of a IP(v6) header that will be used
952 to encapsulate the encrypted packets.
953 :param nat_t_header: an instance of a UDP header that will be used
954 for NAT-Traversal.
955 :param esn_en: extended sequence number enable which allows to use
956 64-bit sequence number instead of 32-bit when using an
957 AEAD algorithm
958 :param esn: extended sequence number (32 MSB)
959 """
961 if proto not in {ESP, AH, ESP.name, AH.name}:
962 raise ValueError("proto must be either ESP or AH")
963 if isinstance(proto, str):
964 self.proto = {ESP.name: ESP, AH.name: AH}[proto]
965 else:
966 self.proto = proto
968 self.spi = spi
969 self.seq_num = seq_num
970 self.esn_en = esn_en
971 # Get Extended Sequence (32 MSB)
972 self.esn = esn
973 if crypt_algo:
974 if crypt_algo not in CRYPT_ALGOS:
975 raise TypeError('unsupported encryption algo %r, try %r' %
976 (crypt_algo, list(CRYPT_ALGOS)))
977 self.crypt_algo = CRYPT_ALGOS[crypt_algo]
979 if crypt_key:
980 salt_size = self.crypt_algo.salt_size
981 self.crypt_key = crypt_key[:len(crypt_key) - salt_size]
982 self.crypt_salt = crypt_key[len(crypt_key) - salt_size:]
983 else:
984 self.crypt_key = None
985 self.crypt_salt = None
987 else:
988 self.crypt_algo = CRYPT_ALGOS['NULL']
989 self.crypt_key = None
990 self.crypt_salt = None
991 self.crypt_icv_size = crypt_icv_size
993 if auth_algo:
994 if auth_algo not in AUTH_ALGOS:
995 raise TypeError('unsupported integrity algo %r, try %r' %
996 (auth_algo, list(AUTH_ALGOS)))
997 self.auth_algo = AUTH_ALGOS[auth_algo]
998 self.auth_key = auth_key
999 else:
1000 self.auth_algo = AUTH_ALGOS['NULL']
1001 self.auth_key = None
1003 if tunnel_header and not isinstance(tunnel_header, (IP, IPv6)):
1004 raise TypeError('tunnel_header must be %s or %s' % (IP.name, IPv6.name)) # noqa: E501
1005 self.tunnel_header = tunnel_header
1007 if nat_t_header:
1008 if proto is not ESP:
1009 raise TypeError('nat_t_header is only allowed with ESP')
1010 if not isinstance(nat_t_header, UDP):
1011 raise TypeError('nat_t_header must be %s' % UDP.name)
1012 self.nat_t_header = nat_t_header
1014 def check_spi(self, pkt):
1015 if pkt.spi != self.spi:
1016 raise TypeError('packet spi=0x%x does not match the SA spi=0x%x' %
1017 (pkt.spi, self.spi))
1019 def _encrypt_esp(self, pkt, seq_num=None, iv=None, esn_en=None, esn=None):
1021 if iv is None:
1022 iv = self.crypt_algo.generate_iv()
1023 else:
1024 if len(iv) != self.crypt_algo.iv_size:
1025 raise TypeError('iv length must be %s' % self.crypt_algo.iv_size) # noqa: E501
1027 esp = _ESPPlain(spi=self.spi, seq=seq_num or self.seq_num, iv=iv)
1029 if self.tunnel_header:
1030 tunnel = self.tunnel_header.copy()
1032 if tunnel.version == 4:
1033 del tunnel.proto
1034 del tunnel.len
1035 del tunnel.chksum
1036 else:
1037 del tunnel.nh
1038 del tunnel.plen
1040 pkt = tunnel.__class__(raw(tunnel / pkt))
1042 ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_ESP)
1043 esp.data = payload
1044 esp.nh = nh
1046 esp = self.crypt_algo.pad(esp)
1047 esp = self.crypt_algo.encrypt(self, esp, self.crypt_key,
1048 self.crypt_icv_size,
1049 esn_en=esn_en or self.esn_en,
1050 esn=esn or self.esn)
1052 self.auth_algo.sign(esp,
1053 self.auth_key,
1054 esn_en=esn_en or self.esn_en,
1055 esn=esn or self.esn)
1057 if self.nat_t_header:
1058 nat_t_header = self.nat_t_header.copy()
1059 nat_t_header.chksum = 0
1060 del nat_t_header.len
1061 if ip_header.version == 4:
1062 del ip_header.proto
1063 else:
1064 del ip_header.nh
1065 ip_header /= nat_t_header
1067 if ip_header.version == 4:
1068 del ip_header.len
1069 del ip_header.chksum
1070 else:
1071 del ip_header.plen
1073 # sequence number must always change, unless specified by the user
1074 if seq_num is None:
1075 self.seq_num += 1
1077 return ip_header.__class__(raw(ip_header / esp))
1079 def _encrypt_ah(self, pkt, seq_num=None, esn_en=False, esn=0):
1081 ah = AH(spi=self.spi, seq=seq_num or self.seq_num,
1082 icv=b"\x00" * self.auth_algo.icv_size)
1084 if self.tunnel_header:
1085 tunnel = self.tunnel_header.copy()
1087 if tunnel.version == 4:
1088 del tunnel.proto
1089 del tunnel.len
1090 del tunnel.chksum
1091 else:
1092 del tunnel.nh
1093 del tunnel.plen
1095 pkt = tunnel.__class__(raw(tunnel / pkt))
1097 ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_AH)
1098 ah.nh = nh
1100 if ip_header.version == 6 and len(ah) % 8 != 0:
1101 # For IPv6, the total length of the header must be a multiple of
1102 # 8-octet units.
1103 ah.padding = b"\x00" * (-len(ah) % 8)
1104 elif len(ah) % 4 != 0:
1105 # For IPv4, the total length of the header must be a multiple of
1106 # 4-octet units.
1107 ah.padding = b"\x00" * (-len(ah) % 4)
1109 # RFC 4302 - Section 2.2. Payload Length
1110 # This 8-bit field specifies the length of AH in 32-bit words (4-byte
1111 # units), minus "2".
1112 ah.payloadlen = len(ah) // 4 - 2
1114 if ip_header.version == 4:
1115 ip_header.len = len(ip_header) + len(ah) + len(payload)
1116 del ip_header.chksum
1117 ip_header = ip_header.__class__(raw(ip_header))
1118 else:
1119 ip_header.plen = len(ip_header.payload) + len(ah) + len(payload)
1121 signed_pkt = self.auth_algo.sign(ip_header / ah / payload,
1122 self.auth_key,
1123 esn_en=esn_en or self.esn_en,
1124 esn=esn or self.esn)
1126 # sequence number must always change, unless specified by the user
1127 if seq_num is None:
1128 self.seq_num += 1
1130 return signed_pkt
1132 def encrypt(self, pkt, seq_num=None, iv=None, esn_en=None, esn=None):
1133 """
1134 Encrypt (and encapsulate) an IP(v6) packet with ESP or AH according
1135 to this SecurityAssociation.
1137 :param pkt: the packet to encrypt
1138 :param seq_num: if specified, use this sequence number instead of the
1139 generated one
1140 :param esn_en: extended sequence number enable which allows to
1141 use 64-bit sequence number instead of 32-bit when
1142 using an AEAD algorithm
1143 :param esn: extended sequence number (32 MSB)
1144 :param iv: if specified, use this initialization vector for
1145 encryption instead of a random one.
1147 :returns: the encrypted/encapsulated packet
1148 """
1149 if not isinstance(pkt, self.SUPPORTED_PROTOS):
1150 raise TypeError('cannot encrypt %s, supported protos are %s'
1151 % (pkt.__class__, self.SUPPORTED_PROTOS))
1152 if self.proto is ESP:
1153 return self._encrypt_esp(pkt, seq_num=seq_num,
1154 iv=iv, esn_en=esn_en,
1155 esn=esn)
1156 else:
1157 return self._encrypt_ah(pkt, seq_num=seq_num,
1158 esn_en=esn_en, esn=esn)
1160 def _decrypt_esp(self, pkt, verify=True, esn_en=None, esn=None):
1162 encrypted = pkt[ESP]
1164 if verify:
1165 self.check_spi(pkt)
1166 self.auth_algo.verify(encrypted, self.auth_key,
1167 esn_en=esn_en or self.esn_en,
1168 esn=esn or self.esn)
1170 esp = self.crypt_algo.decrypt(self, encrypted, self.crypt_key,
1171 self.crypt_icv_size or
1172 self.crypt_algo.icv_size or
1173 self.auth_algo.icv_size,
1174 esn_en=esn_en or self.esn_en,
1175 esn=esn or self.esn)
1177 if self.tunnel_header:
1178 # drop the tunnel header and return the payload untouched
1180 pkt.remove_payload()
1181 if pkt.version == 4:
1182 pkt.proto = esp.nh
1183 else:
1184 pkt.nh = esp.nh
1185 cls = pkt.guess_payload_class(esp.data)
1187 return cls(esp.data)
1188 else:
1189 ip_header = pkt
1191 if ip_header.version == 4:
1192 ip_header.proto = esp.nh
1193 del ip_header.chksum
1194 ip_header.remove_payload()
1195 ip_header.len = len(ip_header) + len(esp.data)
1196 # recompute checksum
1197 ip_header = ip_header.__class__(raw(ip_header))
1198 else:
1199 if self.nat_t_header:
1200 # drop the UDP header and return the payload untouched
1201 ip_header.nh = esp.nh
1202 ip_header.remove_payload()
1203 else:
1204 encrypted.underlayer.nh = esp.nh
1205 encrypted.underlayer.remove_payload()
1206 ip_header.plen = len(ip_header.payload) + len(esp.data)
1208 cls = ip_header.guess_payload_class(esp.data)
1210 # reassemble the ip_header with the ESP payload
1211 return ip_header / cls(esp.data)
1213 def _decrypt_ah(self, pkt, verify=True, esn_en=None, esn=None):
1215 if verify:
1216 self.check_spi(pkt)
1217 self.auth_algo.verify(pkt, self.auth_key,
1218 esn_en=esn_en or self.esn_en,
1219 esn=esn or self.esn)
1221 ah = pkt[AH]
1222 payload = ah.payload
1223 payload.remove_underlayer(None) # useless argument...
1225 if self.tunnel_header:
1226 return payload
1227 else:
1228 ip_header = pkt
1230 if ip_header.version == 4:
1231 ip_header.proto = ah.nh
1232 del ip_header.chksum
1233 ip_header.remove_payload()
1234 ip_header.len = len(ip_header) + len(payload)
1235 # recompute checksum
1236 ip_header = ip_header.__class__(raw(ip_header))
1237 else:
1238 ah.underlayer.nh = ah.nh
1239 ah.underlayer.remove_payload()
1240 ip_header.plen = len(ip_header.payload) + len(payload)
1242 # reassemble the ip_header with the AH payload
1243 return ip_header / payload
1245 def decrypt(self, pkt, verify=True, esn_en=None, esn=None):
1246 """
1247 Decrypt (and decapsulate) an IP(v6) packet containing ESP or AH.
1249 :param pkt: the packet to decrypt
1250 :param verify: if False, do not perform the integrity check
1251 :param esn_en: extended sequence number enable which allows to use
1252 64-bit sequence number instead of 32-bit when using an
1253 AEAD algorithm
1254 :param esn: extended sequence number (32 MSB)
1255 :returns: the decrypted/decapsulated packet
1256 :raise scapy.layers.ipsec.IPSecIntegrityError: if the integrity check
1257 fails
1258 """
1259 if not isinstance(pkt, self.SUPPORTED_PROTOS):
1260 raise TypeError('cannot decrypt %s, supported protos are %s'
1261 % (pkt.__class__, self.SUPPORTED_PROTOS))
1263 if self.proto is ESP and pkt.haslayer(ESP):
1264 return self._decrypt_esp(pkt, verify=verify,
1265 esn_en=esn_en, esn=esn)
1266 elif self.proto is AH and pkt.haslayer(AH):
1267 return self._decrypt_ah(pkt, verify=verify, esn_en=esn_en, esn=esn)
1268 else:
1269 raise TypeError('%s has no %s layer' % (pkt, self.proto.name))