Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/ipsec.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

511 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) 2014 6WIND 

5 

6r""" 

7IPsec layer 

8=========== 

9 

10Example of use: 

11 

12>>> sa = SecurityAssociation(ESP, spi=0xdeadbeef, crypt_algo='AES-CBC', 

13... crypt_key=b'sixteenbytes key') 

14>>> p = IP(src='1.1.1.1', dst='2.2.2.2') 

15>>> p /= TCP(sport=45012, dport=80) 

16>>> p /= Raw('testdata') 

17>>> p = IP(raw(p)) 

18>>> p 

19<IP version=4L ihl=5L tos=0x0 len=48 id=1 flags= frag=0L ttl=64 proto=tcp chksum=0x74c2 src=1.1.1.1 dst=2.2.2.2 options=[] |<TCP sport=45012 dport=http seq=0 ack=0 dataofs=5L reserved=0L flags=S window=8192 chksum=0x1914 urgptr=0 options=[] |<Raw load='testdata' |>>> # noqa: E501 

20>>> 

21>>> e = sa.encrypt(p) 

22>>> e 

23<IP version=4L ihl=5L tos=0x0 len=76 id=1 flags= frag=0L ttl=64 proto=esp chksum=0x747a src=1.1.1.1 dst=2.2.2.2 |<ESP spi=0xdeadbeef seq=1 data=b'\xf8\xdb\x1e\x83[T\xab\\\xd2\x1b\xed\xd1\xe5\xc8Y\xc2\xa5d\x92\xc1\x05\x17\xa6\x92\x831\xe6\xc1]\x9a\xd6K}W\x8bFfd\xa5B*+\xde\xc8\x89\xbf{\xa9' |>> # noqa: E501 

24>>> 

25>>> d = sa.decrypt(e) 

26>>> d 

27<IP version=4L ihl=5L tos=0x0 len=48 id=1 flags= frag=0L ttl=64 proto=tcp chksum=0x74c2 src=1.1.1.1 dst=2.2.2.2 |<TCP sport=45012 dport=http seq=0 ack=0 dataofs=5L reserved=0L flags=S window=8192 chksum=0x1914 urgptr=0 options=[] |<Raw load='testdata' |>>> # noqa: E501 

28>>> 

29>>> d == p 

30True 

31""" 

32 

33try: 

34 from math import gcd 

35except ImportError: 

36 from fractions import gcd 

37import os 

38import socket 

39import struct 

40import warnings 

41 

42from scapy.config import conf, crypto_validator 

43from scapy.compat import orb, raw 

44from scapy.data import IP_PROTOS 

45from scapy.error import log_loading 

46from scapy.fields import ( 

47 ByteEnumField, 

48 ByteField, 

49 IntField, 

50 PacketField, 

51 ShortField, 

52 StrField, 

53 XByteField, 

54 XIntField, 

55 XStrField, 

56 XStrLenField, 

57) 

58from scapy.packet import ( 

59 Packet, 

60 Raw, 

61 bind_bottom_up, 

62 bind_layers, 

63 bind_top_down, 

64) 

65from scapy.layers.inet import IP, UDP 

66from scapy.layers.inet6 import IPv6, IPv6ExtHdrHopByHop, IPv6ExtHdrDestOpt, \ 

67 IPv6ExtHdrRouting 

68 

69 

70############################################################################### 

71class AH(Packet): 

72 """ 

73 Authentication Header 

74 

75 See https://tools.ietf.org/rfc/rfc4302.txt 

76 """ 

77 

78 name = 'AH' 

79 

80 def __get_icv_len(self): 

81 """ 

82 Compute the size of the ICV based on the payloadlen field. 

83 Padding size is included as it can only be known from the authentication # noqa: E501 

84 algorithm provided by the Security Association. 

85 """ 

86 # payloadlen = length of AH in 32-bit words (4-byte units), minus "2" 

87 # payloadlen = 3 32-bit word fixed fields + ICV + padding - 2 

88 # ICV = (payloadlen + 2 - 3 - padding) in 32-bit words 

89 return (self.payloadlen - 1) * 4 

90 

91 fields_desc = [ 

92 ByteEnumField('nh', None, IP_PROTOS), 

93 ByteField('payloadlen', None), 

94 ShortField('reserved', None), 

95 XIntField('spi', 0x00000001), 

96 IntField('seq', 0), 

97 XStrLenField('icv', None, length_from=__get_icv_len), 

98 # Padding len can only be known with the SecurityAssociation.auth_algo 

99 XStrLenField('padding', None, length_from=lambda x: 0), 

100 ] 

101 

102 overload_fields = { 

103 IP: {'proto': socket.IPPROTO_AH}, 

104 IPv6: {'nh': socket.IPPROTO_AH}, 

105 IPv6ExtHdrHopByHop: {'nh': socket.IPPROTO_AH}, 

106 IPv6ExtHdrDestOpt: {'nh': socket.IPPROTO_AH}, 

107 IPv6ExtHdrRouting: {'nh': socket.IPPROTO_AH}, 

108 } 

109 

110 

111bind_layers(IP, AH, proto=socket.IPPROTO_AH) 

112bind_layers(IPv6, AH, nh=socket.IPPROTO_AH) 

113bind_layers(AH, IP, nh=socket.IPPROTO_IP) 

114bind_layers(AH, IPv6, nh=socket.IPPROTO_IPV6) 

115 

116############################################################################### 

117 

118 

119class ESP(Packet): 

120 """ 

121 Encapsulated Security Payload 

122 

123 See https://tools.ietf.org/rfc/rfc4303.txt 

124 """ 

125 name = 'ESP' 

126 

127 fields_desc = [ 

128 XIntField('spi', 0x00000001), 

129 IntField('seq', 0), 

130 XStrField('data', None), 

131 ] 

132 

133 @classmethod 

134 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

135 if _pkt: 

136 if len(_pkt) >= 4 and struct.unpack("!I", _pkt[0:4])[0] == 0x00: 

137 return NON_ESP 

138 elif len(_pkt) == 1 and struct.unpack("!B", _pkt)[0] == 0xff: 

139 return NAT_KEEPALIVE 

140 else: 

141 return ESP 

142 return cls 

143 

144 overload_fields = { 

145 IP: {'proto': socket.IPPROTO_ESP}, 

146 IPv6: {'nh': socket.IPPROTO_ESP}, 

147 IPv6ExtHdrHopByHop: {'nh': socket.IPPROTO_ESP}, 

148 IPv6ExtHdrDestOpt: {'nh': socket.IPPROTO_ESP}, 

149 IPv6ExtHdrRouting: {'nh': socket.IPPROTO_ESP}, 

150 } 

151 

152 

153class NON_ESP(Packet): # RFC 3948, section 2.2 

154 fields_desc = [ 

155 XIntField("non_esp", 0x0) 

156 ] 

157 

158 

159class NAT_KEEPALIVE(Packet): # RFC 3948, section 2.2 

160 fields_desc = [ 

161 XByteField("nat_keepalive", 0xFF) 

162 ] 

163 

164 

165bind_layers(IP, ESP, proto=socket.IPPROTO_ESP) 

166bind_layers(IPv6, ESP, nh=socket.IPPROTO_ESP) 

167 

168# NAT-Traversal encapsulation 

169bind_bottom_up(UDP, ESP, dport=4500) 

170bind_bottom_up(UDP, ESP, sport=4500) 

171bind_top_down(UDP, ESP, dport=4500, sport=4500) 

172bind_top_down(UDP, NON_ESP, dport=4500, sport=4500) 

173bind_top_down(UDP, NAT_KEEPALIVE, dport=4500, sport=4500) 

174 

175############################################################################### 

176 

177 

178class _ESPPlain(Packet): 

179 """ 

180 Internal class to represent unencrypted ESP packets. 

181 """ 

182 name = 'ESP' 

183 

184 fields_desc = [ 

185 XIntField('spi', 0x0), 

186 IntField('seq', 0), 

187 

188 StrField('iv', ''), 

189 PacketField('data', '', Raw), 

190 StrField('padding', ''), 

191 

192 ByteField('padlen', 0), 

193 ByteEnumField('nh', 0, IP_PROTOS), 

194 StrField('icv', ''), 

195 ] 

196 

197 def data_for_encryption(self): 

198 return raw(self.data) + self.padding + struct.pack("BB", self.padlen, self.nh) # noqa: E501 

199 

200 

201############################################################################### 

202if conf.crypto_valid: 

203 from cryptography.exceptions import InvalidTag 

204 from cryptography.hazmat.backends import default_backend 

205 from cryptography.hazmat.primitives.ciphers import ( 

206 aead, 

207 Cipher, 

208 algorithms, 

209 modes, 

210 ) 

211 try: 

212 # cryptography > 43.0 

213 from cryptography.hazmat.decrepit.ciphers import ( 

214 algorithms as decrepit_algorithms 

215 ) 

216 except ImportError: 

217 decrepit_algorithms = algorithms 

218 

219 # cryptography's TripleDES can be used to simulate DES behavior 

220 DES = lambda key: decrepit_algorithms.TripleDES(key * 3) 

221 DES.key_sizes = decrepit_algorithms.TripleDES.key_sizes 

222 DES.block_size = decrepit_algorithms.TripleDES.block_size 

223else: 

224 log_loading.info("Can't import python-cryptography v1.7+. " 

225 "Disabled IPsec encryption/authentication.") 

226 default_backend = None 

227 InvalidTag = Exception 

228 Cipher = algorithms = modes = DES = None 

229 

230############################################################################### 

231 

232 

233def _lcm(a, b): 

234 """ 

235 Least Common Multiple between 2 integers. 

236 """ 

237 if a == 0 or b == 0: 

238 return 0 

239 else: 

240 return abs(a * b) // gcd(a, b) 

241 

242 

243class CryptAlgo(object): 

244 """ 

245 IPsec encryption algorithm 

246 """ 

247 

248 def __init__(self, name, cipher, mode, block_size=None, iv_size=None, 

249 key_size=None, icv_size=None, salt_size=None, format_mode_iv=None): # noqa: E501 

250 """ 

251 :param name: the name of this encryption algorithm 

252 :param cipher: a Cipher module 

253 :param mode: the mode used with the cipher module 

254 :param block_size: the length a block for this algo. Defaults to the 

255 `block_size` of the cipher. 

256 :param iv_size: the length of the initialization vector of this algo. 

257 Defaults to the `block_size` of the cipher. 

258 :param key_size: an integer or list/tuple of integers. If specified, 

259 force the secret keys length to one of the values. 

260 Defaults to the `key_size` of the cipher. 

261 :param icv_size: the length of the Integrity Check Value of this algo. 

262 Used by Combined Mode Algorithms e.g. GCM 

263 :param salt_size: the length of the salt to use as the IV prefix. 

264 Usually used by Counter modes e.g. CTR 

265 :param format_mode_iv: function to format the Initialization Vector 

266 e.g. handle the salt value 

267 Default is the random buffer from `generate_iv` 

268 """ 

269 self.name = name 

270 self.cipher = cipher 

271 self.mode = mode 

272 self.icv_size = icv_size 

273 

274 self.is_aead = False 

275 # If using cryptography.hazmat.primitives.cipher.aead 

276 self.ciphers_aead_api = False 

277 

278 if modes: 

279 if self.mode is not None: 

280 self.is_aead = issubclass(self.mode, 

281 modes.ModeWithAuthenticationTag) 

282 elif self.cipher in (aead.AESGCM, aead.AESCCM, 

283 aead.ChaCha20Poly1305): 

284 self.is_aead = True 

285 self.ciphers_aead_api = True 

286 

287 if block_size is not None: 

288 self.block_size = block_size 

289 elif cipher is not None: 

290 self.block_size = cipher.block_size // 8 

291 else: 

292 self.block_size = 1 

293 

294 if iv_size is None: 

295 self.iv_size = self.block_size 

296 else: 

297 self.iv_size = iv_size 

298 

299 if key_size is not None: 

300 self.key_size = key_size 

301 elif cipher is not None: 

302 self.key_size = tuple(i // 8 for i in cipher.key_sizes) 

303 else: 

304 self.key_size = None 

305 

306 if salt_size is None: 

307 self.salt_size = 0 

308 else: 

309 self.salt_size = salt_size 

310 

311 if format_mode_iv is None: 

312 self._format_mode_iv = lambda iv, **kw: iv 

313 else: 

314 self._format_mode_iv = format_mode_iv 

315 

316 def check_key(self, key): 

317 """ 

318 Check that the key length is valid. 

319 

320 :param key: a byte string 

321 """ 

322 if self.key_size and not (len(key) == self.key_size or len(key) in self.key_size): # noqa: E501 

323 raise TypeError('invalid key size %s, must be %s' % 

324 (len(key), self.key_size)) 

325 

326 def generate_iv(self): 

327 """ 

328 Generate a random initialization vector. 

329 """ 

330 # XXX: Handle counter modes with real counters? RFCs allow the use of 

331 # XXX: random bytes for counters, so it is not wrong to do it that way 

332 return os.urandom(self.iv_size) 

333 

334 @crypto_validator 

335 def new_cipher(self, key, mode_iv, digest=None): 

336 """ 

337 :param key: the secret key, a byte string 

338 :param mode_iv: the initialization vector or nonce, a byte string. 

339 Formatted by `format_mode_iv`. 

340 :param digest: also known as tag or icv. A byte string containing the 

341 digest of the encrypted data. Only use this during 

342 decryption! 

343 

344 :returns: an initialized cipher object for this algo 

345 """ 

346 if self.is_aead and digest is not None: 

347 # With AEAD, the mode needs the digest during decryption. 

348 return Cipher( 

349 self.cipher(key), 

350 self.mode(mode_iv, digest, len(digest)), 

351 default_backend(), 

352 ) 

353 else: 

354 return Cipher( 

355 self.cipher(key), 

356 self.mode(mode_iv), 

357 default_backend(), 

358 ) 

359 

360 def pad(self, esp): 

361 """ 

362 Add the correct amount of padding so that the data to encrypt is 

363 exactly a multiple of the algorithm's block size. 

364 

365 Also, make sure that the total ESP packet length is a multiple of 4 

366 bytes. 

367 

368 :param esp: an unencrypted _ESPPlain packet 

369 

370 :returns: an unencrypted _ESPPlain packet with valid padding 

371 """ 

372 # 2 extra bytes for padlen and nh 

373 data_len = len(esp.data) + 2 

374 

375 # according to the RFC4303, section 2.4. Padding (for Encryption) 

376 # the size of the ESP payload must be a multiple of 32 bits 

377 align = _lcm(self.block_size, 4) 

378 

379 # pad for block size 

380 esp.padlen = -data_len % align 

381 

382 # Still according to the RFC, the default value for padding *MUST* be an # noqa: E501 

383 # array of bytes starting from 1 to padlen 

384 # TODO: Handle padding function according to the encryption algo 

385 esp.padding = struct.pack("B" * esp.padlen, *range(1, esp.padlen + 1)) 

386 

387 # If the following test fails, it means that this algo does not comply 

388 # with the RFC 

389 payload_len = len(esp.iv) + len(esp.data) + len(esp.padding) + 2 

390 if payload_len % 4 != 0: 

391 raise ValueError('The size of the ESP data is not aligned to 32 bits after padding.') # noqa: E501 

392 

393 return esp 

394 

395 def encrypt(self, sa, esp, key, icv_size=None, esn_en=False, esn=0): 

396 """ 

397 Encrypt an ESP packet 

398 

399 :param sa: the SecurityAssociation associated with the ESP packet. 

400 :param esp: an unencrypted _ESPPlain packet with valid padding 

401 :param key: the secret key used for encryption 

402 :param icv_size: the length of the icv used for integrity check 

403 :esn_en: extended sequence number enable which allows to use 64-bit 

404 sequence number instead of 32-bit when using an AEAD 

405 algorithm 

406 :esn: extended sequence number (32 MSB) 

407 :return: a valid ESP packet encrypted with this algorithm 

408 """ 

409 if icv_size is None: 

410 icv_size = self.icv_size if self.is_aead else 0 

411 data = esp.data_for_encryption() 

412 

413 if self.cipher: 

414 mode_iv = self._format_mode_iv(algo=self, sa=sa, iv=esp.iv) 

415 aad = None 

416 if self.is_aead: 

417 if esn_en: 

418 aad = struct.pack('!LLL', esp.spi, esn, esp.seq) 

419 else: 

420 aad = struct.pack('!LL', esp.spi, esp.seq) 

421 if self.ciphers_aead_api: 

422 # New API 

423 if self.cipher == aead.AESCCM: 

424 cipher = self.cipher(key, tag_length=icv_size) 

425 else: 

426 cipher = self.cipher(key) 

427 if self.name == 'AES-NULL-GMAC': 

428 # Special case for GMAC (rfc 4543 sect 3) 

429 data = data + cipher.encrypt(mode_iv, b"", aad + esp.iv + data) 

430 else: 

431 data = cipher.encrypt(mode_iv, data, aad) 

432 else: 

433 cipher = self.new_cipher(key, mode_iv) 

434 encryptor = cipher.encryptor() 

435 

436 if self.is_aead: 

437 encryptor.authenticate_additional_data(aad) 

438 data = encryptor.update(data) + encryptor.finalize() 

439 data += encryptor.tag[:icv_size] 

440 else: 

441 data = encryptor.update(data) + encryptor.finalize() 

442 

443 return ESP(spi=esp.spi, seq=esp.seq, data=esp.iv + data) 

444 

445 def decrypt(self, sa, esp, key, icv_size=None, esn_en=False, esn=0): 

446 """ 

447 Decrypt an ESP packet 

448 

449 :param sa: the SecurityAssociation associated with the ESP packet. 

450 :param esp: an encrypted ESP packet 

451 :param key: the secret key used for encryption 

452 :param icv_size: the length of the icv used for integrity check 

453 :param esn_en: extended sequence number enable which allows to use 

454 64-bit sequence number instead of 32-bit when using an 

455 AEAD algorithm 

456 :param esn: extended sequence number (32 MSB) 

457 :returns: a valid ESP packet encrypted with this algorithm 

458 :raise scapy.layers.ipsec.IPSecIntegrityError: if the integrity check 

459 fails with an AEAD algorithm 

460 """ 

461 if icv_size is None: 

462 icv_size = self.icv_size if self.is_aead else 0 

463 

464 iv = esp.data[:self.iv_size] 

465 data = esp.data[self.iv_size:len(esp.data) - icv_size] 

466 icv = esp.data[len(esp.data) - icv_size:] 

467 

468 if self.cipher: 

469 mode_iv = self._format_mode_iv(sa=sa, iv=iv) 

470 aad = None 

471 if self.is_aead: 

472 if esn_en: 

473 aad = struct.pack('!LLL', esp.spi, esn, esp.seq) 

474 else: 

475 aad = struct.pack('!LL', esp.spi, esp.seq) 

476 if self.ciphers_aead_api: 

477 # New API 

478 if self.cipher == aead.AESCCM: 

479 cipher = self.cipher(key, tag_length=icv_size) 

480 else: 

481 cipher = self.cipher(key) 

482 try: 

483 if self.name == 'AES-NULL-GMAC': 

484 # Special case for GMAC (rfc 4543 sect 3) 

485 data = data + cipher.decrypt(mode_iv, icv, aad + iv + data) 

486 else: 

487 data = cipher.decrypt(mode_iv, data + icv, aad) 

488 except InvalidTag as err: 

489 raise IPSecIntegrityError(err) 

490 else: 

491 cipher = self.new_cipher(key, mode_iv, icv) 

492 decryptor = cipher.decryptor() 

493 

494 if self.is_aead: 

495 # Tag value check is done during the finalize method 

496 decryptor.authenticate_additional_data(aad) 

497 try: 

498 data = decryptor.update(data) + decryptor.finalize() 

499 except InvalidTag as err: 

500 raise IPSecIntegrityError(err) 

501 

502 # extract padlen and nh 

503 padlen = orb(data[-2]) 

504 nh = orb(data[-1]) 

505 

506 # then use padlen to determine data and padding 

507 padding = data[len(data) - padlen - 2: len(data) - 2] 

508 data = data[:len(data) - padlen - 2] 

509 

510 return _ESPPlain(spi=esp.spi, 

511 seq=esp.seq, 

512 iv=iv, 

513 data=data, 

514 padding=padding, 

515 padlen=padlen, 

516 nh=nh, 

517 icv=icv) 

518 

519############################################################################### 

520# The names of the encryption algorithms are the same than in scapy.contrib.ikev2 # noqa: E501 

521# see http://www.iana.org/assignments/ikev2-parameters/ikev2-parameters.xhtml 

522 

523 

524CRYPT_ALGOS = { 

525 'NULL': CryptAlgo('NULL', cipher=None, mode=None, iv_size=0), 

526} 

527 

528if algorithms: 

529 CRYPT_ALGOS['AES-CBC'] = CryptAlgo('AES-CBC', 

530 cipher=algorithms.AES, 

531 mode=modes.CBC) 

532 _aes_ctr_format_mode_iv = lambda sa, iv, **kw: sa.crypt_salt + iv + b'\x00\x00\x00\x01' # noqa: E501 

533 CRYPT_ALGOS['AES-CTR'] = CryptAlgo('AES-CTR', 

534 cipher=algorithms.AES, 

535 mode=modes.CTR, 

536 block_size=1, 

537 iv_size=8, 

538 salt_size=4, 

539 format_mode_iv=_aes_ctr_format_mode_iv) 

540 _salt_format_mode_iv = lambda sa, iv, **kw: sa.crypt_salt + iv 

541 CRYPT_ALGOS['AES-GCM'] = CryptAlgo('AES-GCM', 

542 cipher=aead.AESGCM, 

543 key_size=(16, 24, 32), 

544 mode=None, 

545 salt_size=4, 

546 block_size=1, 

547 iv_size=8, 

548 icv_size=16, 

549 format_mode_iv=_salt_format_mode_iv) 

550 # GMAC: rfc 4543, "companion to the AES Galois/Counter Mode ESP" 

551 # This is defined as a crypt_algo by rfc, but has the role of an auth_algo 

552 CRYPT_ALGOS['AES-NULL-GMAC'] = CryptAlgo('AES-NULL-GMAC', 

553 cipher=aead.AESGCM, 

554 key_size=(16, 24, 32), 

555 mode=None, 

556 salt_size=4, 

557 block_size=1, 

558 iv_size=8, 

559 icv_size=16, 

560 format_mode_iv=_salt_format_mode_iv) 

561 CRYPT_ALGOS['AES-CCM'] = CryptAlgo('AES-CCM', 

562 cipher=aead.AESCCM, 

563 mode=None, 

564 key_size=(16, 24, 32), 

565 block_size=1, 

566 iv_size=8, 

567 salt_size=3, 

568 icv_size=16, 

569 format_mode_iv=_salt_format_mode_iv) 

570 CRYPT_ALGOS['CHACHA20-POLY1305'] = CryptAlgo('CHACHA20-POLY1305', 

571 cipher=aead.ChaCha20Poly1305, 

572 mode=None, 

573 key_size=32, 

574 block_size=1, 

575 iv_size=8, 

576 salt_size=4, 

577 icv_size=16, 

578 format_mode_iv=_salt_format_mode_iv) # noqa: E501 

579 

580 # Using a TripleDES cipher algorithm for DES is done by using the same 64 

581 # bits key 3 times 

582 CRYPT_ALGOS['DES'] = CryptAlgo('DES', 

583 cipher=DES, 

584 mode=modes.CBC, 

585 key_size=(8,)) 

586 CRYPT_ALGOS['3DES'] = CryptAlgo('3DES', 

587 cipher=decrepit_algorithms.TripleDES, 

588 mode=modes.CBC) 

589 if decrepit_algorithms is algorithms: 

590 # cryptography < 43 raises a DeprecationWarning 

591 from cryptography.utils import CryptographyDeprecationWarning 

592 with warnings.catch_warnings(): 

593 # Hide deprecation warnings 

594 warnings.filterwarnings("ignore", 

595 category=CryptographyDeprecationWarning) 

596 CRYPT_ALGOS['CAST'] = CryptAlgo('CAST', 

597 cipher=decrepit_algorithms.CAST5, 

598 mode=modes.CBC) 

599 CRYPT_ALGOS['Blowfish'] = CryptAlgo('Blowfish', 

600 cipher=decrepit_algorithms.Blowfish, 

601 mode=modes.CBC) 

602 else: 

603 CRYPT_ALGOS['CAST'] = CryptAlgo('CAST', 

604 cipher=decrepit_algorithms.CAST5, 

605 mode=modes.CBC) 

606 CRYPT_ALGOS['Blowfish'] = CryptAlgo('Blowfish', 

607 cipher=decrepit_algorithms.Blowfish, 

608 mode=modes.CBC) 

609 

610 

611############################################################################### 

612if conf.crypto_valid: 

613 from cryptography.hazmat.primitives.hmac import HMAC 

614 from cryptography.hazmat.primitives.cmac import CMAC 

615 from cryptography.hazmat.primitives import hashes 

616else: 

617 # no error if cryptography is not available but authentication won't be supported # noqa: E501 

618 HMAC = CMAC = hashes = None 

619 

620############################################################################### 

621 

622 

623class IPSecIntegrityError(Exception): 

624 """ 

625 Error risen when the integrity check fails. 

626 """ 

627 pass 

628 

629 

630class AuthAlgo(object): 

631 """ 

632 IPsec integrity algorithm 

633 """ 

634 

635 def __init__(self, name, mac, digestmod, icv_size, key_size=None): 

636 """ 

637 :param name: the name of this integrity algorithm 

638 :param mac: a Message Authentication Code module 

639 :param digestmod: a Hash or Cipher module 

640 :param icv_size: the length of the integrity check value of this algo 

641 :param key_size: an integer or list/tuple of integers. If specified, 

642 force the secret keys length to one of the values. 

643 Defaults to the `key_size` of the cipher. 

644 """ 

645 self.name = name 

646 self.mac = mac 

647 self.digestmod = digestmod 

648 self.icv_size = icv_size 

649 self.key_size = key_size 

650 

651 def check_key(self, key): 

652 """ 

653 Check that the key length is valid. 

654 

655 :param key: a byte string 

656 """ 

657 if self.key_size and len(key) not in self.key_size: 

658 raise TypeError('invalid key size %s, must be one of %s' % 

659 (len(key), self.key_size)) 

660 

661 @crypto_validator 

662 def new_mac(self, key): 

663 """ 

664 :param key: a byte string 

665 :returns: an initialized mac object for this algo 

666 """ 

667 if self.mac is CMAC: 

668 return self.mac(self.digestmod(key), default_backend()) 

669 else: 

670 return self.mac(key, self.digestmod(), default_backend()) 

671 

672 def sign(self, pkt, key, esn_en=False, esn=0): 

673 """ 

674 Sign an IPsec (ESP or AH) packet with this algo. 

675 

676 :param pkt: a packet that contains a valid encrypted ESP or AH layer 

677 :param key: the authentication key, a byte string 

678 :param esn_en: extended sequence number enable which allows to use 

679 64-bit sequence number instead of 32-bit 

680 :param esn: extended sequence number (32 MSB) 

681 

682 :returns: the signed packet 

683 """ 

684 if not self.mac: 

685 return pkt 

686 

687 mac = self.new_mac(key) 

688 

689 if pkt.haslayer(ESP): 

690 mac.update(bytes(pkt[ESP])) 

691 if esn_en: 

692 # RFC4303 sect 2.2.1 

693 mac.update(struct.pack('!L', esn)) 

694 pkt[ESP].data += mac.finalize()[:self.icv_size] 

695 

696 elif pkt.haslayer(AH): 

697 mac.update(bytes(zero_mutable_fields(pkt.copy(), sending=True))) 

698 if esn_en: 

699 # RFC4302 sect 2.5.1 

700 mac.update(struct.pack('!L', esn)) 

701 pkt[AH].icv = mac.finalize()[:self.icv_size] 

702 

703 return pkt 

704 

705 def verify(self, pkt, key, esn_en=False, esn=0): 

706 """ 

707 Check that the integrity check value (icv) of a packet is valid. 

708 

709 :param pkt: a packet that contains a valid encrypted ESP or AH layer 

710 :param key: the authentication key, a byte string 

711 :param esn_en: extended sequence number enable which allows to use 

712 64-bit sequence number instead of 32-bit 

713 :param esn: extended sequence number (32 MSB) 

714 

715 :raise scapy.layers.ipsec.IPSecIntegrityError: if the integrity check 

716 fails 

717 """ 

718 if not self.mac or self.icv_size == 0: 

719 return 

720 

721 mac = self.new_mac(key) 

722 

723 pkt_icv = 'not found' 

724 

725 if isinstance(pkt, ESP): 

726 pkt_icv = pkt.data[len(pkt.data) - self.icv_size:] 

727 clone = pkt.copy() 

728 clone.data = clone.data[:len(clone.data) - self.icv_size] 

729 mac.update(bytes(clone)) 

730 if esn_en: 

731 # RFC4303 sect 2.2.1 

732 mac.update(struct.pack('!L', esn)) 

733 

734 elif pkt.haslayer(AH): 

735 if len(pkt[AH].icv) != self.icv_size: 

736 # Fill padding since we know the actual icv_size 

737 pkt[AH].padding = pkt[AH].icv[self.icv_size:] 

738 pkt[AH].icv = pkt[AH].icv[:self.icv_size] 

739 pkt_icv = pkt[AH].icv 

740 clone = zero_mutable_fields(pkt.copy(), sending=False) 

741 mac.update(bytes(clone)) 

742 if esn_en: 

743 # RFC4302 sect 2.5.1 

744 mac.update(struct.pack('!L', esn)) 

745 

746 computed_icv = mac.finalize()[:self.icv_size] 

747 

748 # XXX: Cannot use mac.verify because the ICV can be truncated 

749 if pkt_icv != computed_icv: 

750 raise IPSecIntegrityError('pkt_icv=%r, computed_icv=%r' % 

751 (pkt_icv, computed_icv)) 

752 

753############################################################################### 

754# The names of the integrity algorithms are the same than in scapy.contrib.ikev2 # noqa: E501 

755# see http://www.iana.org/assignments/ikev2-parameters/ikev2-parameters.xhtml 

756 

757 

758AUTH_ALGOS = { 

759 'NULL': AuthAlgo('NULL', mac=None, digestmod=None, icv_size=0), 

760} 

761 

762if HMAC and hashes: 

763 # XXX: NIST has deprecated SHA1 but is required by RFC7321 

764 AUTH_ALGOS['HMAC-SHA1-96'] = AuthAlgo('HMAC-SHA1-96', 

765 mac=HMAC, 

766 digestmod=hashes.SHA1, 

767 icv_size=12) 

768 AUTH_ALGOS['SHA2-256-128'] = AuthAlgo('SHA2-256-128', 

769 mac=HMAC, 

770 digestmod=hashes.SHA256, 

771 icv_size=16) 

772 AUTH_ALGOS['SHA2-384-192'] = AuthAlgo('SHA2-384-192', 

773 mac=HMAC, 

774 digestmod=hashes.SHA384, 

775 icv_size=24) 

776 AUTH_ALGOS['SHA2-512-256'] = AuthAlgo('SHA2-512-256', 

777 mac=HMAC, 

778 digestmod=hashes.SHA512, 

779 icv_size=32) 

780 # XXX:Flagged as deprecated by 'cryptography'. Kept for backward compat 

781 AUTH_ALGOS['HMAC-MD5-96'] = AuthAlgo('HMAC-MD5-96', 

782 mac=HMAC, 

783 digestmod=hashes.MD5, 

784 icv_size=12) 

785if CMAC and algorithms: 

786 AUTH_ALGOS['AES-CMAC-96'] = AuthAlgo('AES-CMAC-96', 

787 mac=CMAC, 

788 digestmod=algorithms.AES, 

789 icv_size=12, 

790 key_size=(16,)) 

791 

792############################################################################### 

793 

794 

795def split_for_transport(orig_pkt, transport_proto): 

796 """ 

797 Split an IP(v6) packet in the correct location to insert an ESP or AH 

798 header. 

799 

800 :param orig_pkt: the packet to split. Must be an IP or IPv6 packet 

801 :param transport_proto: the IPsec protocol number that will be inserted 

802 at the split position. 

803 :returns: a tuple (header, nh, payload) where nh is the protocol number of 

804 payload. 

805 """ 

806 # force resolution of default fields to avoid padding errors 

807 header = orig_pkt.__class__(raw(orig_pkt)) 

808 next_hdr = header.payload 

809 nh = None 

810 

811 if header.version == 4: 

812 nh = header.proto 

813 header.proto = transport_proto 

814 header.remove_payload() 

815 del header.chksum 

816 del header.len 

817 

818 return header, nh, next_hdr 

819 else: 

820 found_rt_hdr = False 

821 prev = header 

822 

823 # Since the RFC 4302 is vague about where the ESP/AH headers should be 

824 # inserted in IPv6, I chose to follow the linux implementation. 

825 while isinstance(next_hdr, (IPv6ExtHdrHopByHop, IPv6ExtHdrRouting, IPv6ExtHdrDestOpt)): # noqa: E501 

826 if isinstance(next_hdr, IPv6ExtHdrHopByHop): 

827 pass 

828 if isinstance(next_hdr, IPv6ExtHdrRouting): 

829 found_rt_hdr = True 

830 elif isinstance(next_hdr, IPv6ExtHdrDestOpt) and found_rt_hdr: 

831 break 

832 

833 prev = next_hdr 

834 next_hdr = next_hdr.payload 

835 

836 nh = prev.nh 

837 prev.nh = transport_proto 

838 prev.remove_payload() 

839 del header.plen 

840 

841 return header, nh, next_hdr 

842 

843 

844############################################################################### 

845# see RFC 4302 - Appendix A. Mutability of IP Options/Extension Headers 

846IMMUTABLE_IPV4_OPTIONS = ( 

847 0, # End Of List 

848 1, # No OPeration 

849 2, # Security 

850 5, # Extended Security 

851 6, # Commercial Security 

852 20, # Router Alert 

853 21, # Sender Directed Multi-Destination Delivery 

854) 

855 

856 

857def zero_mutable_fields(pkt, sending=False): 

858 """ 

859 When using AH, all "mutable" fields must be "zeroed" before calculating 

860 the ICV. See RFC 4302, Section 3.3.3.1. Handling Mutable Fields. 

861 

862 :param pkt: an IP(v6) packet containing an AH layer. 

863 NOTE: The packet will be modified 

864 :param sending: if true, ipv6 routing headers will not be reordered 

865 """ 

866 

867 if pkt.haslayer(AH): 

868 pkt[AH].icv = b"\x00" * len(pkt[AH].icv) 

869 else: 

870 raise TypeError('no AH layer found') 

871 

872 if pkt.version == 4: 

873 # the tos field has been replaced by DSCP and ECN 

874 # Routers may rewrite the DS field as needed to provide a 

875 # desired local or end-to-end service 

876 pkt.tos = 0 

877 # an intermediate router might set the DF bit, even if the source 

878 # did not select it. 

879 pkt.flags = 0 

880 # changed en route as a normal course of processing by routers 

881 pkt.ttl = 0 

882 # will change if any of these other fields change 

883 pkt.chksum = 0 

884 

885 immutable_opts = [] 

886 for opt in pkt.options: 

887 if opt.option in IMMUTABLE_IPV4_OPTIONS: 

888 immutable_opts.append(opt) 

889 else: 

890 immutable_opts.append(Raw(b"\x00" * len(opt))) 

891 pkt.options = immutable_opts 

892 

893 else: 

894 # holds DSCP and ECN 

895 pkt.tc = 0 

896 # The flow label described in AHv1 was mutable, and in RFC 2460 [DH98] 

897 # was potentially mutable. To retain compatibility with existing AH 

898 # implementations, the flow label is not included in the ICV in AHv2. 

899 pkt.fl = 0 

900 # same as ttl 

901 pkt.hlim = 0 

902 

903 next_hdr = pkt.payload 

904 

905 while isinstance(next_hdr, (IPv6ExtHdrHopByHop, IPv6ExtHdrRouting, IPv6ExtHdrDestOpt)): # noqa: E501 

906 if isinstance(next_hdr, (IPv6ExtHdrHopByHop, IPv6ExtHdrDestOpt)): 

907 for opt in next_hdr.options: 

908 if opt.otype & 0x20: 

909 # option data can change en-route and must be zeroed 

910 opt.optdata = b"\x00" * opt.optlen 

911 elif isinstance(next_hdr, IPv6ExtHdrRouting) and sending: 

912 # The sender must order the field so that it appears as it 

913 # will at the receiver, prior to performing the ICV computation. # noqa: E501 

914 next_hdr.segleft = 0 

915 if next_hdr.addresses: 

916 final = next_hdr.addresses.pop() 

917 next_hdr.addresses.insert(0, pkt.dst) 

918 pkt.dst = final 

919 else: 

920 break 

921 

922 next_hdr = next_hdr.payload 

923 

924 return pkt 

925 

926############################################################################### 

927 

928 

929class SecurityAssociation(object): 

930 """ 

931 This class is responsible of "encryption" and "decryption" of IPsec packets. # noqa: E501 

932 """ 

933 

934 SUPPORTED_PROTOS = (IP, IPv6) 

935 

936 def __init__(self, proto, spi, seq_num=1, crypt_algo=None, crypt_key=None, 

937 crypt_icv_size=None, 

938 auth_algo=None, auth_key=None, 

939 tunnel_header=None, nat_t_header=None, esn_en=False, esn=0): 

940 """ 

941 :param proto: the IPsec proto to use (ESP or AH) 

942 :param spi: the Security Parameters Index of this SA 

943 :param seq_num: the initial value for the sequence number on encrypted 

944 packets 

945 :param crypt_algo: the encryption algorithm name (only used with ESP) 

946 :param crypt_key: the encryption key (only used with ESP) 

947 :param crypt_icv_size: change the default size of the crypt_algo 

948 (only used with ESP) 

949 :param auth_algo: the integrity algorithm name 

950 :param auth_key: the integrity key 

951 :param tunnel_header: an instance of a IP(v6) header that will be used 

952 to encapsulate the encrypted packets. 

953 :param nat_t_header: an instance of a UDP header that will be used 

954 for NAT-Traversal. 

955 :param esn_en: extended sequence number enable which allows to use 

956 64-bit sequence number instead of 32-bit when using an 

957 AEAD algorithm 

958 :param esn: extended sequence number (32 MSB) 

959 """ 

960 

961 if proto not in {ESP, AH, ESP.name, AH.name}: 

962 raise ValueError("proto must be either ESP or AH") 

963 if isinstance(proto, str): 

964 self.proto = {ESP.name: ESP, AH.name: AH}[proto] 

965 else: 

966 self.proto = proto 

967 

968 self.spi = spi 

969 self.seq_num = seq_num 

970 self.esn_en = esn_en 

971 # Get Extended Sequence (32 MSB) 

972 self.esn = esn 

973 if crypt_algo: 

974 if crypt_algo not in CRYPT_ALGOS: 

975 raise TypeError('unsupported encryption algo %r, try %r' % 

976 (crypt_algo, list(CRYPT_ALGOS))) 

977 self.crypt_algo = CRYPT_ALGOS[crypt_algo] 

978 

979 if crypt_key: 

980 salt_size = self.crypt_algo.salt_size 

981 self.crypt_key = crypt_key[:len(crypt_key) - salt_size] 

982 self.crypt_salt = crypt_key[len(crypt_key) - salt_size:] 

983 else: 

984 self.crypt_key = None 

985 self.crypt_salt = None 

986 

987 else: 

988 self.crypt_algo = CRYPT_ALGOS['NULL'] 

989 self.crypt_key = None 

990 self.crypt_salt = None 

991 self.crypt_icv_size = crypt_icv_size 

992 

993 if auth_algo: 

994 if auth_algo not in AUTH_ALGOS: 

995 raise TypeError('unsupported integrity algo %r, try %r' % 

996 (auth_algo, list(AUTH_ALGOS))) 

997 self.auth_algo = AUTH_ALGOS[auth_algo] 

998 self.auth_key = auth_key 

999 else: 

1000 self.auth_algo = AUTH_ALGOS['NULL'] 

1001 self.auth_key = None 

1002 

1003 if tunnel_header and not isinstance(tunnel_header, (IP, IPv6)): 

1004 raise TypeError('tunnel_header must be %s or %s' % (IP.name, IPv6.name)) # noqa: E501 

1005 self.tunnel_header = tunnel_header 

1006 

1007 if nat_t_header: 

1008 if proto is not ESP: 

1009 raise TypeError('nat_t_header is only allowed with ESP') 

1010 if not isinstance(nat_t_header, UDP): 

1011 raise TypeError('nat_t_header must be %s' % UDP.name) 

1012 self.nat_t_header = nat_t_header 

1013 

1014 def check_spi(self, pkt): 

1015 if pkt.spi != self.spi: 

1016 raise TypeError('packet spi=0x%x does not match the SA spi=0x%x' % 

1017 (pkt.spi, self.spi)) 

1018 

1019 def _encrypt_esp(self, pkt, seq_num=None, iv=None, esn_en=None, esn=None): 

1020 

1021 if iv is None: 

1022 iv = self.crypt_algo.generate_iv() 

1023 else: 

1024 if len(iv) != self.crypt_algo.iv_size: 

1025 raise TypeError('iv length must be %s' % self.crypt_algo.iv_size) # noqa: E501 

1026 

1027 esp = _ESPPlain(spi=self.spi, seq=seq_num or self.seq_num, iv=iv) 

1028 

1029 if self.tunnel_header: 

1030 tunnel = self.tunnel_header.copy() 

1031 

1032 if tunnel.version == 4: 

1033 del tunnel.proto 

1034 del tunnel.len 

1035 del tunnel.chksum 

1036 else: 

1037 del tunnel.nh 

1038 del tunnel.plen 

1039 

1040 pkt = tunnel.__class__(raw(tunnel / pkt)) 

1041 

1042 ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_ESP) 

1043 esp.data = payload 

1044 esp.nh = nh 

1045 

1046 esp = self.crypt_algo.pad(esp) 

1047 esp = self.crypt_algo.encrypt(self, esp, self.crypt_key, 

1048 self.crypt_icv_size, 

1049 esn_en=esn_en or self.esn_en, 

1050 esn=esn or self.esn) 

1051 

1052 self.auth_algo.sign(esp, 

1053 self.auth_key, 

1054 esn_en=esn_en or self.esn_en, 

1055 esn=esn or self.esn) 

1056 

1057 if self.nat_t_header: 

1058 nat_t_header = self.nat_t_header.copy() 

1059 nat_t_header.chksum = 0 

1060 del nat_t_header.len 

1061 if ip_header.version == 4: 

1062 del ip_header.proto 

1063 else: 

1064 del ip_header.nh 

1065 ip_header /= nat_t_header 

1066 

1067 if ip_header.version == 4: 

1068 del ip_header.len 

1069 del ip_header.chksum 

1070 else: 

1071 del ip_header.plen 

1072 

1073 # sequence number must always change, unless specified by the user 

1074 if seq_num is None: 

1075 self.seq_num += 1 

1076 

1077 return ip_header.__class__(raw(ip_header / esp)) 

1078 

1079 def _encrypt_ah(self, pkt, seq_num=None, esn_en=False, esn=0): 

1080 

1081 ah = AH(spi=self.spi, seq=seq_num or self.seq_num, 

1082 icv=b"\x00" * self.auth_algo.icv_size) 

1083 

1084 if self.tunnel_header: 

1085 tunnel = self.tunnel_header.copy() 

1086 

1087 if tunnel.version == 4: 

1088 del tunnel.proto 

1089 del tunnel.len 

1090 del tunnel.chksum 

1091 else: 

1092 del tunnel.nh 

1093 del tunnel.plen 

1094 

1095 pkt = tunnel.__class__(raw(tunnel / pkt)) 

1096 

1097 ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_AH) 

1098 ah.nh = nh 

1099 

1100 if ip_header.version == 6 and len(ah) % 8 != 0: 

1101 # For IPv6, the total length of the header must be a multiple of 

1102 # 8-octet units. 

1103 ah.padding = b"\x00" * (-len(ah) % 8) 

1104 elif len(ah) % 4 != 0: 

1105 # For IPv4, the total length of the header must be a multiple of 

1106 # 4-octet units. 

1107 ah.padding = b"\x00" * (-len(ah) % 4) 

1108 

1109 # RFC 4302 - Section 2.2. Payload Length 

1110 # This 8-bit field specifies the length of AH in 32-bit words (4-byte 

1111 # units), minus "2". 

1112 ah.payloadlen = len(ah) // 4 - 2 

1113 

1114 if ip_header.version == 4: 

1115 ip_header.len = len(ip_header) + len(ah) + len(payload) 

1116 del ip_header.chksum 

1117 ip_header = ip_header.__class__(raw(ip_header)) 

1118 else: 

1119 ip_header.plen = len(ip_header.payload) + len(ah) + len(payload) 

1120 

1121 signed_pkt = self.auth_algo.sign(ip_header / ah / payload, 

1122 self.auth_key, 

1123 esn_en=esn_en or self.esn_en, 

1124 esn=esn or self.esn) 

1125 

1126 # sequence number must always change, unless specified by the user 

1127 if seq_num is None: 

1128 self.seq_num += 1 

1129 

1130 return signed_pkt 

1131 

1132 def encrypt(self, pkt, seq_num=None, iv=None, esn_en=None, esn=None): 

1133 """ 

1134 Encrypt (and encapsulate) an IP(v6) packet with ESP or AH according 

1135 to this SecurityAssociation. 

1136 

1137 :param pkt: the packet to encrypt 

1138 :param seq_num: if specified, use this sequence number instead of the 

1139 generated one 

1140 :param esn_en: extended sequence number enable which allows to 

1141 use 64-bit sequence number instead of 32-bit when 

1142 using an AEAD algorithm 

1143 :param esn: extended sequence number (32 MSB) 

1144 :param iv: if specified, use this initialization vector for 

1145 encryption instead of a random one. 

1146 

1147 :returns: the encrypted/encapsulated packet 

1148 """ 

1149 if not isinstance(pkt, self.SUPPORTED_PROTOS): 

1150 raise TypeError('cannot encrypt %s, supported protos are %s' 

1151 % (pkt.__class__, self.SUPPORTED_PROTOS)) 

1152 if self.proto is ESP: 

1153 return self._encrypt_esp(pkt, seq_num=seq_num, 

1154 iv=iv, esn_en=esn_en, 

1155 esn=esn) 

1156 else: 

1157 return self._encrypt_ah(pkt, seq_num=seq_num, 

1158 esn_en=esn_en, esn=esn) 

1159 

1160 def _decrypt_esp(self, pkt, verify=True, esn_en=None, esn=None): 

1161 

1162 encrypted = pkt[ESP] 

1163 

1164 if verify: 

1165 self.check_spi(pkt) 

1166 self.auth_algo.verify(encrypted, self.auth_key, 

1167 esn_en=esn_en or self.esn_en, 

1168 esn=esn or self.esn) 

1169 

1170 esp = self.crypt_algo.decrypt(self, encrypted, self.crypt_key, 

1171 self.crypt_icv_size or 

1172 self.crypt_algo.icv_size or 

1173 self.auth_algo.icv_size, 

1174 esn_en=esn_en or self.esn_en, 

1175 esn=esn or self.esn) 

1176 

1177 if self.tunnel_header: 

1178 # drop the tunnel header and return the payload untouched 

1179 

1180 pkt.remove_payload() 

1181 if pkt.version == 4: 

1182 pkt.proto = esp.nh 

1183 else: 

1184 pkt.nh = esp.nh 

1185 cls = pkt.guess_payload_class(esp.data) 

1186 

1187 return cls(esp.data) 

1188 else: 

1189 ip_header = pkt 

1190 

1191 if ip_header.version == 4: 

1192 ip_header.proto = esp.nh 

1193 del ip_header.chksum 

1194 ip_header.remove_payload() 

1195 ip_header.len = len(ip_header) + len(esp.data) 

1196 # recompute checksum 

1197 ip_header = ip_header.__class__(raw(ip_header)) 

1198 else: 

1199 if self.nat_t_header: 

1200 # drop the UDP header and return the payload untouched 

1201 ip_header.nh = esp.nh 

1202 ip_header.remove_payload() 

1203 else: 

1204 encrypted.underlayer.nh = esp.nh 

1205 encrypted.underlayer.remove_payload() 

1206 ip_header.plen = len(ip_header.payload) + len(esp.data) 

1207 

1208 cls = ip_header.guess_payload_class(esp.data) 

1209 

1210 # reassemble the ip_header with the ESP payload 

1211 return ip_header / cls(esp.data) 

1212 

1213 def _decrypt_ah(self, pkt, verify=True, esn_en=None, esn=None): 

1214 

1215 if verify: 

1216 self.check_spi(pkt) 

1217 self.auth_algo.verify(pkt, self.auth_key, 

1218 esn_en=esn_en or self.esn_en, 

1219 esn=esn or self.esn) 

1220 

1221 ah = pkt[AH] 

1222 payload = ah.payload 

1223 payload.remove_underlayer(None) # useless argument... 

1224 

1225 if self.tunnel_header: 

1226 return payload 

1227 else: 

1228 ip_header = pkt 

1229 

1230 if ip_header.version == 4: 

1231 ip_header.proto = ah.nh 

1232 del ip_header.chksum 

1233 ip_header.remove_payload() 

1234 ip_header.len = len(ip_header) + len(payload) 

1235 # recompute checksum 

1236 ip_header = ip_header.__class__(raw(ip_header)) 

1237 else: 

1238 ah.underlayer.nh = ah.nh 

1239 ah.underlayer.remove_payload() 

1240 ip_header.plen = len(ip_header.payload) + len(payload) 

1241 

1242 # reassemble the ip_header with the AH payload 

1243 return ip_header / payload 

1244 

1245 def decrypt(self, pkt, verify=True, esn_en=None, esn=None): 

1246 """ 

1247 Decrypt (and decapsulate) an IP(v6) packet containing ESP or AH. 

1248 

1249 :param pkt: the packet to decrypt 

1250 :param verify: if False, do not perform the integrity check 

1251 :param esn_en: extended sequence number enable which allows to use 

1252 64-bit sequence number instead of 32-bit when using an 

1253 AEAD algorithm 

1254 :param esn: extended sequence number (32 MSB) 

1255 :returns: the decrypted/decapsulated packet 

1256 :raise scapy.layers.ipsec.IPSecIntegrityError: if the integrity check 

1257 fails 

1258 """ 

1259 if not isinstance(pkt, self.SUPPORTED_PROTOS): 

1260 raise TypeError('cannot decrypt %s, supported protos are %s' 

1261 % (pkt.__class__, self.SUPPORTED_PROTOS)) 

1262 

1263 if self.proto is ESP and pkt.haslayer(ESP): 

1264 return self._decrypt_esp(pkt, verify=verify, 

1265 esn_en=esn_en, esn=esn) 

1266 elif self.proto is AH and pkt.haslayer(AH): 

1267 return self._decrypt_ah(pkt, verify=verify, esn_en=esn_en, esn=esn) 

1268 else: 

1269 raise TypeError('%s has no %s layer' % (pkt, self.proto.name))