Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/layers/gssapi.py: 83%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

172 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr> 

5 

6""" 

7Generic Security Services (GSS) API 

8 

9Implements parts of: 

10 

11 - GSSAPI: RFC4121 / RFC2743 

12 - GSSAPI C bindings: RFC2744 

13 

14This is implemented in the following SSPs: 

15 

16 - :class:`~scapy.layers.ntlm.NTLMSSP` 

17 - :class:`~scapy.layers.kerberos.KerberosSSP` 

18 - :class:`~scapy.layers.spnego.SPNEGOSSP` 

19 - :class:`~scapy.layers.msrpce.msnrpc.NetlogonSSP` 

20 

21.. note:: 

22 You will find more complete documentation for this layer over at 

23 `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html>`_ 

24""" 

25 

26import abc 

27 

28from dataclasses import dataclass 

29from enum import IntEnum, IntFlag 

30 

31from scapy.asn1.asn1 import ( 

32 ASN1_SEQUENCE, 

33 ASN1_Class_UNIVERSAL, 

34 ASN1_Codecs, 

35) 

36from scapy.asn1.ber import BERcodec_SEQUENCE 

37from scapy.asn1.mib import conf # loads conf.mib 

38from scapy.asn1fields import ( 

39 ASN1F_OID, 

40 ASN1F_PACKET, 

41 ASN1F_SEQUENCE, 

42) 

43from scapy.asn1packet import ASN1_Packet 

44from scapy.fields import ( 

45 FieldLenField, 

46 LEIntEnumField, 

47 PacketField, 

48 StrLenField, 

49) 

50from scapy.packet import Packet 

51 

52# Type hints 

53from typing import ( 

54 Any, 

55 List, 

56 Optional, 

57 Tuple, 

58) 

59 

60# https://datatracker.ietf.org/doc/html/rfc1508#page-48 

61 

62 

63class ASN1_Class_GSSAPI(ASN1_Class_UNIVERSAL): 

64 name = "GSSAPI" 

65 APPLICATION = 0x60 

66 

67 

68class ASN1_GSSAPI_APPLICATION(ASN1_SEQUENCE): 

69 tag = ASN1_Class_GSSAPI.APPLICATION 

70 

71 

72class BERcodec_GSSAPI_APPLICATION(BERcodec_SEQUENCE): 

73 tag = ASN1_Class_GSSAPI.APPLICATION 

74 

75 

76class ASN1F_GSSAPI_APPLICATION(ASN1F_SEQUENCE): 

77 ASN1_tag = ASN1_Class_GSSAPI.APPLICATION 

78 

79 

80# GSS API Blob 

81# https://datatracker.ietf.org/doc/html/rfc4121 

82 

83# Filled by providers 

84_GSSAPI_OIDS = {} 

85_GSSAPI_SIGNATURE_OIDS = {} 

86 

87# section 4.1 

88 

89 

90class GSSAPI_BLOB(ASN1_Packet): 

91 ASN1_codec = ASN1_Codecs.BER 

92 ASN1_root = ASN1F_GSSAPI_APPLICATION( 

93 ASN1F_OID("MechType", "1.3.6.1.5.5.2"), 

94 ASN1F_PACKET( 

95 "innerToken", 

96 None, 

97 None, 

98 next_cls_cb=lambda pkt: _GSSAPI_OIDS.get(pkt.MechType.val, conf.raw_layer), 

99 ), 

100 ) 

101 

102 @classmethod 

103 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

104 if _pkt and len(_pkt) >= 1: 

105 if ord(_pkt[:1]) & 0xA0 >= 0xA0: 

106 from scapy.layers.spnego import SPNEGO_negToken 

107 

108 # XXX: sometimes the token is raw, we should look from 

109 # the session what to use here. For now: hardcode SPNEGO 

110 # (THIS IS A VERY STRONG ASSUMPTION) 

111 return SPNEGO_negToken 

112 if _pkt[:7] == b"NTLMSSP": 

113 from scapy.layers.ntlm import NTLM_Header 

114 

115 # XXX: if no mechTypes are provided during SPNEGO exchange, 

116 # Windows falls back to a plain NTLM_Header. 

117 return NTLM_Header.dispatch_hook(_pkt=_pkt, *args, **kargs) 

118 return cls 

119 

120 

121# Same but to store the signatures (e.g. DCE/RPC) 

122 

123 

124class GSSAPI_BLOB_SIGNATURE(ASN1_Packet): 

125 ASN1_codec = ASN1_Codecs.BER 

126 ASN1_root = ASN1F_GSSAPI_APPLICATION( 

127 ASN1F_OID("MechType", "1.3.6.1.5.5.2"), 

128 ASN1F_PACKET( 

129 "innerToken", 

130 None, 

131 None, 

132 next_cls_cb=lambda pkt: _GSSAPI_SIGNATURE_OIDS.get( 

133 pkt.MechType.val, conf.raw_layer 

134 ), # noqa: E501 

135 ), 

136 ) 

137 

138 @classmethod 

139 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

140 if _pkt and len(_pkt) >= 2: 

141 # Sometimes the token is raw. Detect that with educated 

142 # heuristics. 

143 if _pkt[:2] in [b"\x04\x04", b"\x05\x04"]: 

144 from scapy.layers.kerberos import KRB_InnerToken 

145 

146 return KRB_InnerToken 

147 elif len(_pkt) >= 4 and _pkt[:4] == b"\x01\x00\x00\x00": 

148 from scapy.layers.ntlm import NTLMSSP_MESSAGE_SIGNATURE 

149 

150 return NTLMSSP_MESSAGE_SIGNATURE 

151 return cls 

152 

153 

154# RFC2744 sect 3.9 - Status Values 

155 

156GSS_S_COMPLETE = 0 

157 

158# These errors are encoded into the 32-bit GSS status code as follows: 

159# MSB LSB 

160# |------------------------------------------------------------| 

161# | Calling Error | Routine Error | Supplementary Info | 

162# |------------------------------------------------------------| 

163# Bit 31 24 23 16 15 0 

164 

165GSS_C_CALLING_ERROR_OFFSET = 24 

166GSS_C_ROUTINE_ERROR_OFFSET = 16 

167GSS_C_SUPPLEMENTARY_OFFSET = 0 

168 

169# Calling errors: 

170 

171GSS_S_CALL_INACCESSIBLE_READ = 1 << GSS_C_CALLING_ERROR_OFFSET 

172GSS_S_CALL_INACCESSIBLE_WRITE = 2 << GSS_C_CALLING_ERROR_OFFSET 

173GSS_S_CALL_BAD_STRUCTURE = 3 << GSS_C_CALLING_ERROR_OFFSET 

174 

175# Routine errors: 

176 

177GSS_S_BAD_MECH = 1 << GSS_C_ROUTINE_ERROR_OFFSET 

178GSS_S_BAD_NAME = 2 << GSS_C_ROUTINE_ERROR_OFFSET 

179GSS_S_BAD_NAMETYPE = 3 << GSS_C_ROUTINE_ERROR_OFFSET 

180GSS_S_BAD_BINDINGS = 4 << GSS_C_ROUTINE_ERROR_OFFSET 

181GSS_S_BAD_STATUS = 5 << GSS_C_ROUTINE_ERROR_OFFSET 

182GSS_S_BAD_SIG = 6 << GSS_C_ROUTINE_ERROR_OFFSET 

183GSS_S_BAD_MIC = GSS_S_BAD_SIG 

184GSS_S_NO_CRED = 7 << GSS_C_ROUTINE_ERROR_OFFSET 

185GSS_S_NO_CONTEXT = 8 << GSS_C_ROUTINE_ERROR_OFFSET 

186GSS_S_DEFECTIVE_TOKEN = 9 << GSS_C_ROUTINE_ERROR_OFFSET 

187GSS_S_DEFECTIVE_CREDENTIAL = 10 << GSS_C_ROUTINE_ERROR_OFFSET 

188GSS_S_CREDENTIALS_EXPIRED = 11 << GSS_C_ROUTINE_ERROR_OFFSET 

189GSS_S_CONTEXT_EXPIRED = 12 << GSS_C_ROUTINE_ERROR_OFFSET 

190GSS_S_FAILURE = 13 << GSS_C_ROUTINE_ERROR_OFFSET 

191GSS_S_BAD_QOP = 14 << GSS_C_ROUTINE_ERROR_OFFSET 

192GSS_S_UNAUTHORIZED = 15 << GSS_C_ROUTINE_ERROR_OFFSET 

193GSS_S_UNAVAILABLE = 16 << GSS_C_ROUTINE_ERROR_OFFSET 

194GSS_S_DUPLICATE_ELEMENT = 17 << GSS_C_ROUTINE_ERROR_OFFSET 

195GSS_S_NAME_NOT_MN = 18 << GSS_C_ROUTINE_ERROR_OFFSET 

196 

197# Supplementary info bits: 

198 

199GSS_S_CONTINUE_NEEDED = 1 << (GSS_C_SUPPLEMENTARY_OFFSET + 0) 

200GSS_S_DUPLICATE_TOKEN = 1 << (GSS_C_SUPPLEMENTARY_OFFSET + 1) 

201GSS_S_OLD_TOKEN = 1 << (GSS_C_SUPPLEMENTARY_OFFSET + 2) 

202GSS_S_UNSEQ_TOKEN = 1 << (GSS_C_SUPPLEMENTARY_OFFSET + 3) 

203GSS_S_GAP_TOKEN = 1 << (GSS_C_SUPPLEMENTARY_OFFSET + 4) 

204 

205# Address families (RFC2744 sect 3.11) 

206 

207_GSS_ADDRTYPE = { 

208 0: "GSS_C_AF_UNSPEC", 

209 1: "GSS_C_AF_LOCAL", 

210 2: "GSS_C_AF_INET", 

211 3: "GSS_C_AF_IMPLINK", 

212 4: "GSS_C_AF_PUP", 

213 5: "GSS_C_AF_CHAOS", 

214 6: "GSS_C_AF_NS", 

215 7: "GSS_C_AF_NBS", 

216 8: "GSS_C_AF_ECMA", 

217 9: "GSS_C_AF_DATAKIT", 

218 10: "GSS_C_AF_CCITT", 

219 11: "GSS_C_AF_SNA", 

220 12: "GSS_C_AF_DECnet", 

221 13: "GSS_C_AF_DLI", 

222 14: "GSS_C_AF_LAT", 

223 15: "GSS_C_AF_HYLINK", 

224 16: "GSS_C_AF_APPLETALK", 

225 17: "GSS_C_AF_BSC", 

226 18: "GSS_C_AF_DSS", 

227 19: "GSS_C_AF_OSI", 

228 21: "GSS_C_AF_X25", 

229 255: "GSS_C_AF_NULLADDR", 

230} 

231 

232 

233# GSS Structures 

234 

235 

236class GssBufferDesc(Packet): 

237 name = "gss_buffer_desc" 

238 fields_desc = [ 

239 FieldLenField("length", None, length_of="value", fmt="<I"), 

240 StrLenField("value", "", length_from=lambda pkt: pkt.length), 

241 ] 

242 

243 def default_payload_class(self, payload): 

244 return conf.padding_layer 

245 

246 

247class GssChannelBindings(Packet): 

248 name = "gss_channel_bindings_struct" 

249 fields_desc = [ 

250 LEIntEnumField("initiator_addrtype", 0, _GSS_ADDRTYPE), 

251 PacketField("initiator_address", GssBufferDesc(), GssBufferDesc), 

252 LEIntEnumField("acceptor_addrtype", 0, _GSS_ADDRTYPE), 

253 PacketField("acceptor_address", GssBufferDesc(), GssBufferDesc), 

254 PacketField("application_data", None, GssBufferDesc), 

255 ] 

256 

257 

258# --- The base GSSAPI SSP base class 

259 

260 

261class GSS_C_FLAGS(IntFlag): 

262 """ 

263 Authenticator Flags per RFC2744 req_flags 

264 """ 

265 

266 GSS_C_DELEG_FLAG = 0x01 

267 GSS_C_MUTUAL_FLAG = 0x02 

268 GSS_C_REPLAY_FLAG = 0x04 

269 GSS_C_SEQUENCE_FLAG = 0x08 

270 GSS_C_CONF_FLAG = 0x10 # confidentiality 

271 GSS_C_INTEG_FLAG = 0x20 # integrity 

272 # RFC4757 

273 GSS_C_DCE_STYLE = 0x1000 

274 GSS_C_IDENTIFY_FLAG = 0x2000 

275 GSS_C_EXTENDED_ERROR_FLAG = 0x4000 

276 

277 

278class SSP: 

279 """ 

280 The general SSP class 

281 """ 

282 

283 auth_type = 0x00 

284 

285 def __init__(self, **kwargs): 

286 if kwargs: 

287 raise ValueError("Unknown SSP parameters: " + ",".join(list(kwargs))) 

288 

289 def __repr__(self): 

290 return "<%s>" % self.__class__.__name__ 

291 

292 class CONTEXT: 

293 """ 

294 A Security context i.e. the 'state' of the secure negotiation 

295 """ 

296 

297 __slots__ = ["state", "flags", "passive"] 

298 

299 def __init__(self, req_flags: Optional[GSS_C_FLAGS] = None): 

300 if req_flags is None: 

301 # Default 

302 req_flags = ( 

303 GSS_C_FLAGS.GSS_C_EXTENDED_ERROR_FLAG 

304 | GSS_C_FLAGS.GSS_C_MUTUAL_FLAG 

305 ) 

306 self.flags = req_flags 

307 self.passive = False 

308 

309 def clifailure(self): 

310 # This allows to reset the client context without discarding it. 

311 pass 

312 

313 def __repr__(self): 

314 return "[Default SSP]" 

315 

316 class STATE(IntEnum): 

317 """ 

318 An Enum that contains the states of an SSP 

319 """ 

320 

321 @abc.abstractmethod 

322 def GSS_Init_sec_context( 

323 self, Context: CONTEXT, val=None, req_flags: Optional[GSS_C_FLAGS] = None 

324 ): 

325 """ 

326 GSS_Init_sec_context: client-side call for the SSP 

327 """ 

328 raise NotImplementedError 

329 

330 @abc.abstractmethod 

331 def GSS_Accept_sec_context(self, Context: CONTEXT, val=None): 

332 """ 

333 GSS_Accept_sec_context: server-side call for the SSP 

334 """ 

335 raise NotImplementedError 

336 

337 # Passive 

338 

339 @abc.abstractmethod 

340 def GSS_Passive(self, Context: CONTEXT, val=None): 

341 """ 

342 GSS_Passive: client/server call for the SSP in passive mode 

343 """ 

344 raise NotImplementedError 

345 

346 def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False): 

347 """ 

348 GSS_Passive_set_Direction: used to swap the direction in passive mode 

349 """ 

350 pass 

351 

352 # MS additions (*Ex functions) 

353 

354 @dataclass 

355 class WRAP_MSG: 

356 conf_req_flag: bool 

357 sign: bool 

358 data: bytes 

359 

360 @abc.abstractmethod 

361 def GSS_WrapEx( 

362 self, Context: CONTEXT, msgs: List[WRAP_MSG], qop_req: int = 0 

363 ) -> Tuple[List[WRAP_MSG], Any]: 

364 """ 

365 GSS_WrapEx 

366 

367 :param Context: the SSP context 

368 :param qop_req: int (0 specifies default QOP) 

369 :param msgs: list of WRAP_MSG 

370 

371 :returns: (data, signature) 

372 """ 

373 raise NotImplementedError 

374 

375 @abc.abstractmethod 

376 def GSS_UnwrapEx( 

377 self, Context: CONTEXT, msgs: List[WRAP_MSG], signature 

378 ) -> List[WRAP_MSG]: 

379 """ 

380 :param Context: the SSP context 

381 :param msgs: list of WRAP_MSG 

382 :param signature: the signature 

383 

384 :raises ValueError: if MIC failure. 

385 :returns: data 

386 """ 

387 raise NotImplementedError 

388 

389 @dataclass 

390 class MIC_MSG: 

391 sign: bool 

392 data: bytes 

393 

394 @abc.abstractmethod 

395 def GSS_GetMICEx( 

396 self, Context: CONTEXT, msgs: List[MIC_MSG], qop_req: int = 0 

397 ) -> Any: 

398 """ 

399 GSS_GetMICEx 

400 

401 :param Context: the SSP context 

402 :param qop_req: int (0 specifies default QOP) 

403 :param msgs: list of VERIF_MSG 

404 

405 :returns: signature 

406 """ 

407 raise NotImplementedError 

408 

409 @abc.abstractmethod 

410 def GSS_VerifyMICEx(self, Context: CONTEXT, msgs: List[MIC_MSG], signature) -> None: 

411 """ 

412 :param Context: the SSP context 

413 :param msgs: list of VERIF_MSG 

414 :param signature: the signature 

415 

416 :raises ValueError: if MIC failure. 

417 """ 

418 raise NotImplementedError 

419 

420 @abc.abstractmethod 

421 def MaximumSignatureLength(self, Context: CONTEXT): 

422 """ 

423 Returns the Maximum Signature length. 

424 

425 This will be used in auth_len in DceRpc5, and is necessary for 

426 PFC_SUPPORT_HEADER_SIGN to work properly. 

427 """ 

428 raise NotImplementedError 

429 

430 # RFC 2743 

431 

432 # sect 2.3.1 

433 

434 def GSS_GetMIC(self, Context: CONTEXT, message: bytes, qop_req: int = 0): 

435 return self.GSS_GetMICEx( 

436 Context, 

437 [ 

438 self.MIC_MSG( 

439 sign=True, 

440 data=message, 

441 ) 

442 ], 

443 qop_req=qop_req, 

444 ) 

445 

446 # sect 2.3.2 

447 

448 def GSS_VerifyMIC(self, Context: CONTEXT, message: bytes, signature): 

449 self.GSS_VerifyMICEx( 

450 Context, 

451 [ 

452 self.MIC_MSG( 

453 sign=True, 

454 data=message, 

455 ) 

456 ], 

457 signature, 

458 ) 

459 

460 # sect 2.3.3 

461 

462 def GSS_Wrap( 

463 self, 

464 Context: CONTEXT, 

465 input_message: bytes, 

466 conf_req_flag: bool, 

467 qop_req: int = 0, 

468 ): 

469 _msgs, signature = self.GSS_WrapEx( 

470 Context, 

471 [ 

472 self.WRAP_MSG( 

473 conf_req_flag=conf_req_flag, 

474 sign=True, 

475 data=input_message, 

476 ) 

477 ], 

478 qop_req=qop_req, 

479 ) 

480 return _msgs[0].data, signature 

481 

482 # sect 2.3.4 

483 

484 def GSS_Unwrap(self, Context: CONTEXT, input_message: bytes, signature): 

485 return self.GSS_UnwrapEx( 

486 Context, 

487 [ 

488 self.WRAP_MSG( 

489 conf_req_flag=True, 

490 sign=True, 

491 data=input_message, 

492 ) 

493 ], 

494 signature, 

495 )[0].data 

496 

497 # MISC 

498 

499 def NegTokenInit2(self): 

500 """ 

501 Server-Initiation 

502 See [MS-SPNG] sect 3.2.5.2 

503 """ 

504 return None, None 

505 

506 def canMechListMIC(self, Context: CONTEXT): 

507 """ 

508 Returns whether or not mechListMIC can be computed 

509 """ 

510 return False 

511 

512 def getMechListMIC(self, Context, input): 

513 """ 

514 Compute mechListMIC 

515 """ 

516 return bytes(self.GSS_GetMIC(Context, input)) 

517 

518 def verifyMechListMIC(self, Context, otherMIC, input): 

519 """ 

520 Verify mechListMIC 

521 """ 

522 return self.GSS_VerifyMIC(Context, input, otherMIC) 

523 

524 def LegsAmount(self, Context: CONTEXT): 

525 """ 

526 Returns the amount of 'legs' (how MS calls it) of the SSP. 

527 

528 i.e. 2 for Kerberos, 3 for NTLM and Netlogon 

529 """ 

530 return 2