1# SPDX-License-Identifier: GPL-2.0-or-later
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Gabriel Potter
5
6"""
7EPT map (EndPoinT mapper)
8"""
9
10import uuid
11
12from scapy.config import conf
13from scapy.fields import (
14 ByteEnumField,
15 ConditionalField,
16 FieldLenField,
17 IPField,
18 LEShortField,
19 MultipleTypeField,
20 PacketListField,
21 ShortField,
22 StrLenField,
23 UUIDEnumField,
24)
25from scapy.packet import Packet
26from scapy.layers.dcerpc import (
27 DCE_RPC_INTERFACES_NAMES,
28 DCE_RPC_INTERFACES_NAMES_rev,
29 DCE_RPC_TRANSFER_SYNTAXES,
30)
31
32from scapy.layers.msrpce.raw.ept import * # noqa: F401, F403
33
34
35# [C706] Appendix L
36
37# "For historical reasons, this cannot be done using the standard
38# NDR encoding rules for marshalling and unmarshalling.
39# A special encoding is required." - Appendix L
40
41
42class octet_string_t(Packet):
43 fields_desc = [
44 FieldLenField("count", None, fmt="<H", length_of="value"),
45 StrLenField("value", b"", length_from=lambda pkt: pkt.count),
46 ]
47
48 def default_payload_class(self, _):
49 return conf.padding_layer
50
51
52def _uuid_res(x):
53 # Look in both DCE_RPC_INTERFACES_NAMES and DCE_RPC_TRANSFER_SYNTAXES
54 dct = DCE_RPC_INTERFACES_NAMES.copy()
55 dct.update(DCE_RPC_TRANSFER_SYNTAXES)
56 return dct.get(x)
57
58
59def _uuid_res_rev(x):
60 # Same but reversed
61 dct = DCE_RPC_INTERFACES_NAMES_rev.copy()
62 dct.update({v: k for k, v in DCE_RPC_TRANSFER_SYNTAXES.items()})
63 return dct.get(x)
64
65
66class prot_and_addr_t(Packet):
67 fields_desc = [
68 # --- LHS
69 LEShortField(
70 "lhs_length",
71 0,
72 ),
73 # [C706] Appendix I with names from Appendix B
74 ByteEnumField(
75 "protocol_identifier",
76 0,
77 {
78 0x0: "OSI OID", # Special
79 0x0D: "UUID", # Special
80 # Transports
81 # 0x2: "DNA Session Control",
82 # 0x3: "DNA Session Control V3",
83 # 0x4: "DNA NSP Transport",
84 # 0x5: "OSI TP4",
85 0x06: "NCADG_OSI_CLSN", # [C706]
86 0x07: "NCACN_IP_TCP", # [C706]
87 0x08: "NCADG_IP_UDP", # [C706]
88 0x09: "IP", # [C706]
89 0x0A: "RPC connectionless protocol", # [C706]
90 0x0B: "RPC connection-oriented protocol", # [C706]
91 0x0C: "NCALRPC",
92 0x0F: "NCACN_NP", # [MS-RPCE]
93 0x11: "NCACN_NB", # [C706]
94 0x12: "NCACN_NB_NB", # [MS-RPCE]
95 0x13: "NCACN_SPX", # [C706]
96 0x14: "NCADG_IPX", # [C706]
97 0x16: "NCACN_AT_DSP", # [C706]
98 0x17: "NCADG_AT_DSP", # [C706]
99 0x19: "NCADG_NB", # [C706]
100 0x1A: "NCACN_VNS_SPP", # [C706]
101 0x1B: "NCADG_VNS_IPC", # [C706]
102 0x1F: "NCACN_HTTP", # [MS-RPCE]
103 },
104 ),
105 # 0x0
106 ConditionalField(
107 StrLenField("oid", "", length_from=lambda pkt: pkt.lhs_length - 1),
108 lambda pkt: pkt.protocol_identifier == 0x0,
109 ),
110 # 0xD
111 ConditionalField(
112 UUIDEnumField(
113 "uuid",
114 uuid.UUID("8a885d04-1ceb-11c9-9fe8-08002b104860"),
115 (
116 # Those are dynamic
117 _uuid_res,
118 _uuid_res_rev,
119 ),
120 uuid_fmt=UUIDEnumField.FORMAT_LE,
121 ),
122 lambda pkt: pkt.protocol_identifier == 0xD,
123 ),
124 ConditionalField(
125 LEShortField("version", 0), lambda pkt: pkt.protocol_identifier == 0xD
126 ),
127 # Other
128 ConditionalField(
129 StrLenField("lhs", "", length_from=lambda pkt: pkt.lhs_length - 1),
130 lambda pkt: pkt.protocol_identifier not in [0x0, 0x7, 0xD],
131 ),
132 # --- RHS
133 LEShortField(
134 "rhs_length",
135 None,
136 ),
137 MultipleTypeField(
138 [
139 (
140 # (big-endian)
141 ShortField("rhs", 0),
142 lambda pkt: pkt.protocol_identifier in [0x7, 0x8, 0x1F],
143 "port",
144 ),
145 (
146 # (big-endian)
147 IPField("rhs", 0),
148 lambda pkt: pkt.protocol_identifier == 0x9,
149 "addr",
150 ),
151 (
152 LEShortField("rhs", 5),
153 lambda pkt: pkt.protocol_identifier in [0xA, 0xB, 0xD],
154 "minor version",
155 ),
156 (
157 StrLenField("rhs", "", length_from=lambda pkt: pkt.rhs_length),
158 lambda pkt: pkt.protocol_identifier == 0xF,
159 "named pipe",
160 ),
161 (
162 StrLenField("rhs", "", length_from=lambda pkt: pkt.rhs_length),
163 lambda pkt: pkt.protocol_identifier == 0x11,
164 "netbios name",
165 ),
166 ],
167 StrLenField("rhs", "", length_from=lambda pkt: pkt.rhs_length),
168 ),
169 ]
170
171 def default_payload_class(self, _):
172 return conf.padding_layer
173
174
175class protocol_tower_t(Packet):
176 fields_desc = [
177 FieldLenField("count", None, fmt="<H", count_of="floors"),
178 PacketListField(
179 "floors",
180 [prot_and_addr_t()],
181 prot_and_addr_t,
182 count_from=lambda pkt: pkt.count,
183 ),
184 ]
185
186 def _summary(self):
187 if len(self.floors) < 4:
188 raise ValueError("Malformed protocol_tower_t (not enough floors)")
189 if self.floors[0].protocol_identifier != 0xD:
190 raise ValueError("Malformed protocol_tower_t (bad floor 1)")
191 if self.floors[1].protocol_identifier != 0xD:
192 raise ValueError("Malformed protocol_tower_t (bad floor 2)")
193 if self.floors[2].protocol_identifier in [0xA, 0xB]: # Connection oriented/less
194 endpoint = "%s:%s" % (
195 self.floors[3].sprintf("%protocol_identifier%"),
196 ":".join(
197 x.rhs.decode() if isinstance(x.rhs, bytes) else str(x.rhs)
198 for x in self.floors[3:][::-1]
199 ),
200 )
201 elif self.floors[2].protocol_identifier == 0xC: # NCALRPC
202 endpoint = "%s:%s" % (
203 self.floors[2].sprintf("%protocol_identifier%"),
204 self.floors[3].rhs.decode(),
205 )
206 else:
207 raise ValueError(
208 "Unknown RPC transport: %s" % self.floors[2].protocol_identifier
209 )
210 return (
211 self.floors[0].sprintf("%uuid% (%version%.%r,rhs%)"),
212 endpoint,
213 )
214
215 def mysummary(self):
216 try:
217 return "%s %s" % self._summary()
218 except ValueError as ex:
219 return str(ex)