Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/msrpce/ept.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

44 statements  

1# SPDX-License-Identifier: GPL-2.0-or-later 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter 

5 

6""" 

7EPT map (EndPoinT mapper) 

8""" 

9 

10import uuid 

11 

12from scapy.config import conf 

13from scapy.fields import ( 

14 ByteEnumField, 

15 ConditionalField, 

16 FieldLenField, 

17 IPField, 

18 LEShortField, 

19 MultipleTypeField, 

20 PacketListField, 

21 ShortField, 

22 StrLenField, 

23 UUIDEnumField, 

24) 

25from scapy.packet import Packet 

26from scapy.layers.dcerpc import ( 

27 DCE_RPC_INTERFACES_NAMES_rev, 

28 DCE_RPC_INTERFACES_NAMES, 

29 DCE_RPC_PROTOCOL_IDENTIFIERS, 

30 DCE_RPC_TRANSFER_SYNTAXES, 

31) 

32 

33from scapy.layers.msrpce.raw.ept import * # noqa: F401, F403 

34 

35 

36# [C706] Appendix L 

37 

38# "For historical reasons, this cannot be done using the standard 

39# NDR encoding rules for marshalling and unmarshalling. 

40# A special encoding is required." - Appendix L 

41 

42 

43class octet_string_t(Packet): 

44 fields_desc = [ 

45 FieldLenField("count", None, fmt="<H", length_of="value"), 

46 StrLenField("value", b"", length_from=lambda pkt: pkt.count), 

47 ] 

48 

49 def default_payload_class(self, _): 

50 return conf.padding_layer 

51 

52 

53def _uuid_res(x): 

54 # Look in both DCE_RPC_INTERFACES_NAMES and DCE_RPC_TRANSFER_SYNTAXES 

55 dct = DCE_RPC_INTERFACES_NAMES.copy() 

56 dct.update(DCE_RPC_TRANSFER_SYNTAXES) 

57 return dct.get(x) 

58 

59 

60def _uuid_res_rev(x): 

61 # Same but reversed 

62 dct = DCE_RPC_INTERFACES_NAMES_rev.copy() 

63 dct.update({v: k for k, v in DCE_RPC_TRANSFER_SYNTAXES.items()}) 

64 return dct.get(x) 

65 

66 

67class prot_and_addr_t(Packet): 

68 fields_desc = [ 

69 # --- LHS 

70 LEShortField( 

71 "lhs_length", 

72 0, 

73 ), 

74 ByteEnumField( 

75 "protocol_identifier", 

76 0, 

77 DCE_RPC_PROTOCOL_IDENTIFIERS, 

78 ), 

79 # 0x0 

80 ConditionalField( 

81 StrLenField("oid", "", length_from=lambda pkt: pkt.lhs_length - 1), 

82 lambda pkt: pkt.protocol_identifier == 0x0, 

83 ), 

84 # 0xD 

85 ConditionalField( 

86 UUIDEnumField( 

87 "uuid", 

88 uuid.UUID("8a885d04-1ceb-11c9-9fe8-08002b104860"), 

89 ( 

90 # Those are dynamic 

91 _uuid_res, 

92 _uuid_res_rev, 

93 ), 

94 uuid_fmt=UUIDEnumField.FORMAT_LE, 

95 ), 

96 lambda pkt: pkt.protocol_identifier == 0xD, 

97 ), 

98 ConditionalField( 

99 LEShortField("version", 0), lambda pkt: pkt.protocol_identifier == 0xD 

100 ), 

101 # Other 

102 ConditionalField( 

103 StrLenField("lhs", "", length_from=lambda pkt: pkt.lhs_length - 1), 

104 lambda pkt: pkt.protocol_identifier not in [0x0, 0x7, 0xD], 

105 ), 

106 # --- RHS 

107 LEShortField( 

108 "rhs_length", 

109 None, 

110 ), 

111 MultipleTypeField( 

112 [ 

113 ( 

114 # (big-endian) 

115 ShortField("rhs", 0), 

116 lambda pkt: pkt.protocol_identifier in [0x7, 0x8, 0x1F], 

117 "port", 

118 ), 

119 ( 

120 # (big-endian) 

121 IPField("rhs", 0), 

122 lambda pkt: pkt.protocol_identifier == 0x9, 

123 "addr", 

124 ), 

125 ( 

126 LEShortField("rhs", 5), 

127 lambda pkt: pkt.protocol_identifier in [0xA, 0xB, 0xD], 

128 "minor version", 

129 ), 

130 ( 

131 StrLenField("rhs", "", length_from=lambda pkt: pkt.rhs_length), 

132 lambda pkt: pkt.protocol_identifier == 0xF, 

133 "named pipe", 

134 ), 

135 ( 

136 StrLenField("rhs", "", length_from=lambda pkt: pkt.rhs_length), 

137 lambda pkt: pkt.protocol_identifier == 0x11, 

138 "netbios name", 

139 ), 

140 ], 

141 StrLenField("rhs", "", length_from=lambda pkt: pkt.rhs_length), 

142 ), 

143 ] 

144 

145 def default_payload_class(self, _): 

146 return conf.padding_layer 

147 

148 

149class protocol_tower_t(Packet): 

150 fields_desc = [ 

151 FieldLenField("count", None, fmt="<H", count_of="floors"), 

152 PacketListField( 

153 "floors", 

154 [prot_and_addr_t()], 

155 prot_and_addr_t, 

156 count_from=lambda pkt: pkt.count, 

157 ), 

158 ] 

159 

160 def _summary(self): 

161 if len(self.floors) < 4: 

162 raise ValueError("Malformed protocol_tower_t (not enough floors)") 

163 if self.floors[0].protocol_identifier != 0xD: 

164 raise ValueError("Malformed protocol_tower_t (bad floor 1)") 

165 if self.floors[1].protocol_identifier != 0xD: 

166 raise ValueError("Malformed protocol_tower_t (bad floor 2)") 

167 if self.floors[2].protocol_identifier in [0xA, 0xB]: # Connection oriented/less 

168 endpoint = "%s:%s" % ( 

169 self.floors[3].sprintf("%protocol_identifier%"), 

170 ":".join( 

171 x.rhs.decode() if isinstance(x.rhs, bytes) else str(x.rhs) 

172 for x in self.floors[3:][::-1] 

173 ), 

174 ) 

175 elif self.floors[2].protocol_identifier == 0xC: # NCALRPC 

176 endpoint = "%s:%s" % ( 

177 self.floors[2].sprintf("%protocol_identifier%"), 

178 self.floors[3].rhs.decode(), 

179 ) 

180 else: 

181 raise ValueError( 

182 "Unknown RPC transport: %s" % self.floors[2].protocol_identifier 

183 ) 

184 return ( 

185 self.floors[0].sprintf("%uuid% (%version%.%r,rhs%)"), 

186 endpoint, 

187 ) 

188 

189 def mysummary(self): 

190 try: 

191 return "%s %s" % self._summary() 

192 except ValueError as ex: 

193 return str(ex)