1# SPDX-License-Identifier: GPL-2.0-or-later
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Gabriel Potter
5
6"""
7EPT map (EndPoinT mapper)
8"""
9
10import uuid
11
12from scapy.config import conf
13from scapy.fields import (
14 ByteEnumField,
15 ConditionalField,
16 FieldLenField,
17 IPField,
18 LEShortField,
19 MultipleTypeField,
20 PacketListField,
21 ShortField,
22 StrLenField,
23 UUIDEnumField,
24)
25from scapy.packet import Packet
26from scapy.layers.dcerpc import (
27 DCE_RPC_INTERFACES_NAMES_rev,
28 DCE_RPC_INTERFACES_NAMES,
29 DCE_RPC_PROTOCOL_IDENTIFIERS,
30 DCE_RPC_TRANSFER_SYNTAXES,
31)
32
33from scapy.layers.msrpce.raw.ept import * # noqa: F401, F403
34
35
36# [C706] Appendix L
37
38# "For historical reasons, this cannot be done using the standard
39# NDR encoding rules for marshalling and unmarshalling.
40# A special encoding is required." - Appendix L
41
42
43class octet_string_t(Packet):
44 fields_desc = [
45 FieldLenField("count", None, fmt="<H", length_of="value"),
46 StrLenField("value", b"", length_from=lambda pkt: pkt.count),
47 ]
48
49 def default_payload_class(self, _):
50 return conf.padding_layer
51
52
53def _uuid_res(x):
54 # Look in both DCE_RPC_INTERFACES_NAMES and DCE_RPC_TRANSFER_SYNTAXES
55 dct = DCE_RPC_INTERFACES_NAMES.copy()
56 dct.update(DCE_RPC_TRANSFER_SYNTAXES)
57 return dct.get(x)
58
59
60def _uuid_res_rev(x):
61 # Same but reversed
62 dct = DCE_RPC_INTERFACES_NAMES_rev.copy()
63 dct.update({v: k for k, v in DCE_RPC_TRANSFER_SYNTAXES.items()})
64 return dct.get(x)
65
66
67class prot_and_addr_t(Packet):
68 fields_desc = [
69 # --- LHS
70 LEShortField(
71 "lhs_length",
72 0,
73 ),
74 ByteEnumField(
75 "protocol_identifier",
76 0,
77 DCE_RPC_PROTOCOL_IDENTIFIERS,
78 ),
79 # 0x0
80 ConditionalField(
81 StrLenField("oid", "", length_from=lambda pkt: pkt.lhs_length - 1),
82 lambda pkt: pkt.protocol_identifier == 0x0,
83 ),
84 # 0xD
85 ConditionalField(
86 UUIDEnumField(
87 "uuid",
88 uuid.UUID("8a885d04-1ceb-11c9-9fe8-08002b104860"),
89 (
90 # Those are dynamic
91 _uuid_res,
92 _uuid_res_rev,
93 ),
94 uuid_fmt=UUIDEnumField.FORMAT_LE,
95 ),
96 lambda pkt: pkt.protocol_identifier == 0xD,
97 ),
98 ConditionalField(
99 LEShortField("version", 0), lambda pkt: pkt.protocol_identifier == 0xD
100 ),
101 # Other
102 ConditionalField(
103 StrLenField("lhs", "", length_from=lambda pkt: pkt.lhs_length - 1),
104 lambda pkt: pkt.protocol_identifier not in [0x0, 0x7, 0xD],
105 ),
106 # --- RHS
107 LEShortField(
108 "rhs_length",
109 None,
110 ),
111 MultipleTypeField(
112 [
113 (
114 # (big-endian)
115 ShortField("rhs", 0),
116 lambda pkt: pkt.protocol_identifier in [0x7, 0x8, 0x1F],
117 "port",
118 ),
119 (
120 # (big-endian)
121 IPField("rhs", 0),
122 lambda pkt: pkt.protocol_identifier == 0x9,
123 "addr",
124 ),
125 (
126 LEShortField("rhs", 5),
127 lambda pkt: pkt.protocol_identifier in [0xA, 0xB, 0xD],
128 "minor version",
129 ),
130 (
131 StrLenField("rhs", "", length_from=lambda pkt: pkt.rhs_length),
132 lambda pkt: pkt.protocol_identifier == 0xF,
133 "named pipe",
134 ),
135 (
136 StrLenField("rhs", "", length_from=lambda pkt: pkt.rhs_length),
137 lambda pkt: pkt.protocol_identifier == 0x11,
138 "netbios name",
139 ),
140 ],
141 StrLenField("rhs", "", length_from=lambda pkt: pkt.rhs_length),
142 ),
143 ]
144
145 def default_payload_class(self, _):
146 return conf.padding_layer
147
148
149class protocol_tower_t(Packet):
150 fields_desc = [
151 FieldLenField("count", None, fmt="<H", count_of="floors"),
152 PacketListField(
153 "floors",
154 [prot_and_addr_t()],
155 prot_and_addr_t,
156 count_from=lambda pkt: pkt.count,
157 ),
158 ]
159
160 def _summary(self):
161 if len(self.floors) < 4:
162 raise ValueError("Malformed protocol_tower_t (not enough floors)")
163 if self.floors[0].protocol_identifier != 0xD:
164 raise ValueError("Malformed protocol_tower_t (bad floor 1)")
165 if self.floors[1].protocol_identifier != 0xD:
166 raise ValueError("Malformed protocol_tower_t (bad floor 2)")
167 if self.floors[2].protocol_identifier in [0xA, 0xB]: # Connection oriented/less
168 endpoint = "%s:%s" % (
169 self.floors[3].sprintf("%protocol_identifier%"),
170 ":".join(
171 x.rhs.decode() if isinstance(x.rhs, bytes) else str(x.rhs)
172 for x in self.floors[3:][::-1]
173 ),
174 )
175 elif self.floors[2].protocol_identifier == 0xC: # NCALRPC
176 endpoint = "%s:%s" % (
177 self.floors[2].sprintf("%protocol_identifier%"),
178 self.floors[3].rhs.decode(),
179 )
180 else:
181 raise ValueError(
182 "Unknown RPC transport: %s" % self.floors[2].protocol_identifier
183 )
184 return (
185 self.floors[0].sprintf("%uuid% (%version%.%r,rhs%)"),
186 endpoint,
187 )
188
189 def mysummary(self):
190 try:
191 return "%s %s" % self._summary()
192 except ValueError as ex:
193 return str(ex)