Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/msrpce/ept.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

44 statements  

1# SPDX-License-Identifier: GPL-2.0-or-later 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter 

5 

6""" 

7EPT map (EndPoinT mapper) 

8""" 

9 

10import uuid 

11 

12from scapy.config import conf 

13from scapy.fields import ( 

14 ByteEnumField, 

15 ConditionalField, 

16 FieldLenField, 

17 IPField, 

18 LEShortField, 

19 MultipleTypeField, 

20 PacketListField, 

21 ShortField, 

22 StrLenField, 

23 UUIDEnumField, 

24) 

25from scapy.packet import Packet 

26from scapy.layers.dcerpc import ( 

27 DCE_RPC_INTERFACES_NAMES, 

28 DCE_RPC_INTERFACES_NAMES_rev, 

29 DCE_RPC_TRANSFER_SYNTAXES, 

30) 

31 

32from scapy.layers.msrpce.raw.ept import * # noqa: F401, F403 

33 

34 

35# [C706] Appendix L 

36 

37# "For historical reasons, this cannot be done using the standard 

38# NDR encoding rules for marshalling and unmarshalling. 

39# A special encoding is required." - Appendix L 

40 

41 

42class octet_string_t(Packet): 

43 fields_desc = [ 

44 FieldLenField("count", None, fmt="<H", length_of="value"), 

45 StrLenField("value", b"", length_from=lambda pkt: pkt.count), 

46 ] 

47 

48 def default_payload_class(self, _): 

49 return conf.padding_layer 

50 

51 

52def _uuid_res(x): 

53 # Look in both DCE_RPC_INTERFACES_NAMES and DCE_RPC_TRANSFER_SYNTAXES 

54 dct = DCE_RPC_INTERFACES_NAMES.copy() 

55 dct.update(DCE_RPC_TRANSFER_SYNTAXES) 

56 return dct.get(x) 

57 

58 

59def _uuid_res_rev(x): 

60 # Same but reversed 

61 dct = DCE_RPC_INTERFACES_NAMES_rev.copy() 

62 dct.update({v: k for k, v in DCE_RPC_TRANSFER_SYNTAXES.items()}) 

63 return dct.get(x) 

64 

65 

66class prot_and_addr_t(Packet): 

67 fields_desc = [ 

68 # --- LHS 

69 LEShortField( 

70 "lhs_length", 

71 0, 

72 ), 

73 # [C706] Appendix I with names from Appendix B 

74 ByteEnumField( 

75 "protocol_identifier", 

76 0, 

77 { 

78 0x0: "OSI OID", # Special 

79 0x0D: "UUID", # Special 

80 # Transports 

81 # 0x2: "DNA Session Control", 

82 # 0x3: "DNA Session Control V3", 

83 # 0x4: "DNA NSP Transport", 

84 # 0x5: "OSI TP4", 

85 0x06: "NCADG_OSI_CLSN", # [C706] 

86 0x07: "NCACN_IP_TCP", # [C706] 

87 0x08: "NCADG_IP_UDP", # [C706] 

88 0x09: "IP", # [C706] 

89 0x0A: "RPC connectionless protocol", # [C706] 

90 0x0B: "RPC connection-oriented protocol", # [C706] 

91 0x0C: "NCALRPC", 

92 0x0F: "NCACN_NP", # [MS-RPCE] 

93 0x11: "NCACN_NB", # [C706] 

94 0x12: "NCACN_NB_NB", # [MS-RPCE] 

95 0x13: "NCACN_SPX", # [C706] 

96 0x14: "NCADG_IPX", # [C706] 

97 0x16: "NCACN_AT_DSP", # [C706] 

98 0x17: "NCADG_AT_DSP", # [C706] 

99 0x19: "NCADG_NB", # [C706] 

100 0x1A: "NCACN_VNS_SPP", # [C706] 

101 0x1B: "NCADG_VNS_IPC", # [C706] 

102 0x1F: "NCACN_HTTP", # [MS-RPCE] 

103 }, 

104 ), 

105 # 0x0 

106 ConditionalField( 

107 StrLenField("oid", "", length_from=lambda pkt: pkt.lhs_length - 1), 

108 lambda pkt: pkt.protocol_identifier == 0x0, 

109 ), 

110 # 0xD 

111 ConditionalField( 

112 UUIDEnumField( 

113 "uuid", 

114 uuid.UUID("8a885d04-1ceb-11c9-9fe8-08002b104860"), 

115 ( 

116 # Those are dynamic 

117 _uuid_res, 

118 _uuid_res_rev, 

119 ), 

120 uuid_fmt=UUIDEnumField.FORMAT_LE, 

121 ), 

122 lambda pkt: pkt.protocol_identifier == 0xD, 

123 ), 

124 ConditionalField( 

125 LEShortField("version", 0), lambda pkt: pkt.protocol_identifier == 0xD 

126 ), 

127 # Other 

128 ConditionalField( 

129 StrLenField("lhs", "", length_from=lambda pkt: pkt.lhs_length - 1), 

130 lambda pkt: pkt.protocol_identifier not in [0x0, 0x7, 0xD], 

131 ), 

132 # --- RHS 

133 LEShortField( 

134 "rhs_length", 

135 None, 

136 ), 

137 MultipleTypeField( 

138 [ 

139 ( 

140 # (big-endian) 

141 ShortField("rhs", 0), 

142 lambda pkt: pkt.protocol_identifier in [0x7, 0x8, 0x1F], 

143 "port", 

144 ), 

145 ( 

146 # (big-endian) 

147 IPField("rhs", 0), 

148 lambda pkt: pkt.protocol_identifier == 0x9, 

149 "addr", 

150 ), 

151 ( 

152 LEShortField("rhs", 5), 

153 lambda pkt: pkt.protocol_identifier in [0xA, 0xB, 0xD], 

154 "minor version", 

155 ), 

156 ( 

157 StrLenField("rhs", "", length_from=lambda pkt: pkt.rhs_length), 

158 lambda pkt: pkt.protocol_identifier == 0xF, 

159 "named pipe", 

160 ), 

161 ( 

162 StrLenField("rhs", "", length_from=lambda pkt: pkt.rhs_length), 

163 lambda pkt: pkt.protocol_identifier == 0x11, 

164 "netbios name", 

165 ), 

166 ], 

167 StrLenField("rhs", "", length_from=lambda pkt: pkt.rhs_length), 

168 ), 

169 ] 

170 

171 def default_payload_class(self, _): 

172 return conf.padding_layer 

173 

174 

175class protocol_tower_t(Packet): 

176 fields_desc = [ 

177 FieldLenField("count", None, fmt="<H", count_of="floors"), 

178 PacketListField( 

179 "floors", 

180 [prot_and_addr_t()], 

181 prot_and_addr_t, 

182 count_from=lambda pkt: pkt.count, 

183 ), 

184 ] 

185 

186 def _summary(self): 

187 if len(self.floors) < 4: 

188 raise ValueError("Malformed protocol_tower_t (not enough floors)") 

189 if self.floors[0].protocol_identifier != 0xD: 

190 raise ValueError("Malformed protocol_tower_t (bad floor 1)") 

191 if self.floors[1].protocol_identifier != 0xD: 

192 raise ValueError("Malformed protocol_tower_t (bad floor 2)") 

193 if self.floors[2].protocol_identifier in [0xA, 0xB]: # Connection oriented/less 

194 endpoint = "%s:%s" % ( 

195 self.floors[3].sprintf("%protocol_identifier%"), 

196 ":".join( 

197 x.rhs.decode() if isinstance(x.rhs, bytes) else str(x.rhs) 

198 for x in self.floors[3:][::-1] 

199 ), 

200 ) 

201 elif self.floors[2].protocol_identifier == 0xC: # NCALRPC 

202 endpoint = "%s:%s" % ( 

203 self.floors[2].sprintf("%protocol_identifier%"), 

204 self.floors[3].rhs.decode(), 

205 ) 

206 else: 

207 raise ValueError( 

208 "Unknown RPC transport: %s" % self.floors[2].protocol_identifier 

209 ) 

210 return ( 

211 self.floors[0].sprintf("%uuid% (%version%.%r,rhs%)"), 

212 endpoint, 

213 ) 

214 

215 def mysummary(self): 

216 try: 

217 return "%s %s" % self._summary() 

218 except ValueError as ex: 

219 return str(ex)