Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/snmp.py: 88%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7SNMP (Simple Network Management Protocol).
8"""
10from scapy.packet import bind_layers, bind_bottom_up
11from scapy.asn1packet import ASN1_Packet
12from scapy.asn1fields import ASN1F_INTEGER, ASN1F_IPADDRESS, ASN1F_OID, \
13 ASN1F_SEQUENCE, ASN1F_SEQUENCE_OF, ASN1F_STRING, ASN1F_TIME_TICKS, \
14 ASN1F_enum_INTEGER, ASN1F_field, ASN1F_CHOICE, ASN1F_optional, ASN1F_NULL
15from scapy.asn1.asn1 import ASN1_Class_UNIVERSAL, ASN1_Codecs, ASN1_NULL, \
16 ASN1_SEQUENCE
17from scapy.asn1.ber import BERcodec_SEQUENCE
18from scapy.sendrecv import sr1
19from scapy.volatile import RandShort, IntAutoTime
20from scapy.layers.inet import UDP, IP, ICMP
22# Import needed to initialize conf.mib
23from scapy.asn1.mib import conf # noqa: F401
25##########
26# SNMP #
27##########
29# [ ASN1 class ] #
32class ASN1_Class_SNMP(ASN1_Class_UNIVERSAL):
33 name = "SNMP"
34 PDU_GET = 0xa0
35 PDU_NEXT = 0xa1
36 PDU_RESPONSE = 0xa2
37 PDU_SET = 0xa3
38 PDU_TRAPv1 = 0xa4
39 PDU_BULK = 0xa5
40 PDU_INFORM = 0xa6
41 PDU_TRAPv2 = 0xa7
44class ASN1_SNMP_PDU_GET(ASN1_SEQUENCE):
45 tag = ASN1_Class_SNMP.PDU_GET
48class ASN1_SNMP_PDU_NEXT(ASN1_SEQUENCE):
49 tag = ASN1_Class_SNMP.PDU_NEXT
52class ASN1_SNMP_PDU_RESPONSE(ASN1_SEQUENCE):
53 tag = ASN1_Class_SNMP.PDU_RESPONSE
56class ASN1_SNMP_PDU_SET(ASN1_SEQUENCE):
57 tag = ASN1_Class_SNMP.PDU_SET
60class ASN1_SNMP_PDU_TRAPv1(ASN1_SEQUENCE):
61 tag = ASN1_Class_SNMP.PDU_TRAPv1
64class ASN1_SNMP_PDU_BULK(ASN1_SEQUENCE):
65 tag = ASN1_Class_SNMP.PDU_BULK
68class ASN1_SNMP_PDU_INFORM(ASN1_SEQUENCE):
69 tag = ASN1_Class_SNMP.PDU_INFORM
72class ASN1_SNMP_PDU_TRAPv2(ASN1_SEQUENCE):
73 tag = ASN1_Class_SNMP.PDU_TRAPv2
76# [ BER codecs ] #
78class BERcodec_SNMP_PDU_GET(BERcodec_SEQUENCE):
79 tag = ASN1_Class_SNMP.PDU_GET
82class BERcodec_SNMP_PDU_NEXT(BERcodec_SEQUENCE):
83 tag = ASN1_Class_SNMP.PDU_NEXT
86class BERcodec_SNMP_PDU_RESPONSE(BERcodec_SEQUENCE):
87 tag = ASN1_Class_SNMP.PDU_RESPONSE
90class BERcodec_SNMP_PDU_SET(BERcodec_SEQUENCE):
91 tag = ASN1_Class_SNMP.PDU_SET
94class BERcodec_SNMP_PDU_TRAPv1(BERcodec_SEQUENCE):
95 tag = ASN1_Class_SNMP.PDU_TRAPv1
98class BERcodec_SNMP_PDU_BULK(BERcodec_SEQUENCE):
99 tag = ASN1_Class_SNMP.PDU_BULK
102class BERcodec_SNMP_PDU_INFORM(BERcodec_SEQUENCE):
103 tag = ASN1_Class_SNMP.PDU_INFORM
106class BERcodec_SNMP_PDU_TRAPv2(BERcodec_SEQUENCE):
107 tag = ASN1_Class_SNMP.PDU_TRAPv2
110# [ ASN1 fields ] #
112class ASN1F_SNMP_PDU_GET(ASN1F_SEQUENCE):
113 ASN1_tag = ASN1_Class_SNMP.PDU_GET
116class ASN1F_SNMP_PDU_NEXT(ASN1F_SEQUENCE):
117 ASN1_tag = ASN1_Class_SNMP.PDU_NEXT
120class ASN1F_SNMP_PDU_RESPONSE(ASN1F_SEQUENCE):
121 ASN1_tag = ASN1_Class_SNMP.PDU_RESPONSE
124class ASN1F_SNMP_PDU_SET(ASN1F_SEQUENCE):
125 ASN1_tag = ASN1_Class_SNMP.PDU_SET
128class ASN1F_SNMP_PDU_TRAPv1(ASN1F_SEQUENCE):
129 ASN1_tag = ASN1_Class_SNMP.PDU_TRAPv1
132class ASN1F_SNMP_PDU_BULK(ASN1F_SEQUENCE):
133 ASN1_tag = ASN1_Class_SNMP.PDU_BULK
136class ASN1F_SNMP_PDU_INFORM(ASN1F_SEQUENCE):
137 ASN1_tag = ASN1_Class_SNMP.PDU_INFORM
140class ASN1F_SNMP_PDU_TRAPv2(ASN1F_SEQUENCE):
141 ASN1_tag = ASN1_Class_SNMP.PDU_TRAPv2
144# [ SNMP Packet ] #
147SNMP_error = {0: "no_error",
148 1: "too_big",
149 2: "no_such_name",
150 3: "bad_value",
151 4: "read_only",
152 5: "generic_error",
153 6: "no_access",
154 7: "wrong_type",
155 8: "wrong_length",
156 9: "wrong_encoding",
157 10: "wrong_value",
158 11: "no_creation",
159 12: "inconsistent_value",
160 13: "resource_unavailable",
161 14: "commit_failed",
162 15: "undo_failed",
163 16: "authorization_error",
164 17: "not_writable",
165 18: "inconsistent_name",
166 }
168SNMP_trap_types = {0: "cold_start",
169 1: "warm_start",
170 2: "link_down",
171 3: "link_up",
172 4: "auth_failure",
173 5: "egp_neigh_loss",
174 6: "enterprise_specific",
175 }
178class SNMPvarbind(ASN1_Packet):
179 ASN1_codec = ASN1_Codecs.BER
180 ASN1_root = ASN1F_SEQUENCE(
181 ASN1F_OID("oid", "1.3"),
182 ASN1F_optional(
183 ASN1F_field("value", ASN1_NULL(0))
184 ),
186 # exceptions in responses
187 ASN1F_optional(ASN1F_NULL("noSuchObject", None, implicit_tag=0x80)),
188 ASN1F_optional(ASN1F_NULL("noSuchInstance", None, implicit_tag=0x81)),
189 ASN1F_optional(ASN1F_NULL("endOfMibView", None, implicit_tag=0x82)),
190 )
193class SNMPget(ASN1_Packet):
194 ASN1_codec = ASN1_Codecs.BER
195 ASN1_root = ASN1F_SNMP_PDU_GET(ASN1F_INTEGER("id", 0),
196 ASN1F_enum_INTEGER("error", 0, SNMP_error),
197 ASN1F_INTEGER("error_index", 0),
198 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501
199 )
202class SNMPnext(ASN1_Packet):
203 ASN1_codec = ASN1_Codecs.BER
204 ASN1_root = ASN1F_SNMP_PDU_NEXT(ASN1F_INTEGER("id", 0),
205 ASN1F_enum_INTEGER("error", 0, SNMP_error),
206 ASN1F_INTEGER("error_index", 0),
207 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501
208 )
211class SNMPresponse(ASN1_Packet):
212 ASN1_codec = ASN1_Codecs.BER
213 ASN1_root = ASN1F_SNMP_PDU_RESPONSE(ASN1F_INTEGER("id", 0),
214 ASN1F_enum_INTEGER("error", 0, SNMP_error), # noqa: E501
215 ASN1F_INTEGER("error_index", 0),
216 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501
217 )
220class SNMPset(ASN1_Packet):
221 ASN1_codec = ASN1_Codecs.BER
222 ASN1_root = ASN1F_SNMP_PDU_SET(ASN1F_INTEGER("id", 0),
223 ASN1F_enum_INTEGER("error", 0, SNMP_error),
224 ASN1F_INTEGER("error_index", 0),
225 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501
226 )
229class SNMPtrapv1(ASN1_Packet):
230 ASN1_codec = ASN1_Codecs.BER
231 ASN1_root = ASN1F_SNMP_PDU_TRAPv1(ASN1F_OID("enterprise", "1.3"),
232 ASN1F_IPADDRESS("agent_addr", "0.0.0.0"),
233 ASN1F_enum_INTEGER("generic_trap", 0, SNMP_trap_types), # noqa: E501
234 ASN1F_INTEGER("specific_trap", 0),
235 ASN1F_TIME_TICKS("time_stamp", IntAutoTime()), # noqa: E501
236 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501
237 )
240class SNMPbulk(ASN1_Packet):
241 ASN1_codec = ASN1_Codecs.BER
242 ASN1_root = ASN1F_SNMP_PDU_BULK(ASN1F_INTEGER("id", 0),
243 ASN1F_INTEGER("non_repeaters", 0),
244 ASN1F_INTEGER("max_repetitions", 0),
245 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501
246 )
249class SNMPinform(ASN1_Packet):
250 ASN1_codec = ASN1_Codecs.BER
251 ASN1_root = ASN1F_SNMP_PDU_INFORM(ASN1F_INTEGER("id", 0),
252 ASN1F_enum_INTEGER("error", 0, SNMP_error), # noqa: E501
253 ASN1F_INTEGER("error_index", 0),
254 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501
255 )
258class SNMPtrapv2(ASN1_Packet):
259 ASN1_codec = ASN1_Codecs.BER
260 ASN1_root = ASN1F_SNMP_PDU_TRAPv2(ASN1F_INTEGER("id", 0),
261 ASN1F_enum_INTEGER("error", 0, SNMP_error), # noqa: E501
262 ASN1F_INTEGER("error_index", 0),
263 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501
264 )
267class SNMP(ASN1_Packet):
268 ASN1_codec = ASN1_Codecs.BER
269 ASN1_root = ASN1F_SEQUENCE(
270 ASN1F_enum_INTEGER("version", 1, {0: "v1", 1: "v2c", 2: "v2", 3: "v3"}), # noqa: E501
271 ASN1F_STRING("community", "public"),
272 ASN1F_CHOICE("PDU", SNMPget(),
273 SNMPget, SNMPnext, SNMPresponse, SNMPset,
274 SNMPtrapv1, SNMPbulk, SNMPinform, SNMPtrapv2)
275 )
277 def answers(self, other):
278 return (isinstance(self.PDU, SNMPresponse) and
279 isinstance(other.PDU, (SNMPget, SNMPnext, SNMPset)) and
280 self.PDU.id == other.PDU.id)
283bind_bottom_up(UDP, SNMP, sport=161)
284bind_bottom_up(UDP, SNMP, dport=161)
285bind_bottom_up(UDP, SNMP, sport=162)
286bind_bottom_up(UDP, SNMP, dport=162)
287bind_layers(UDP, SNMP, sport=161, dport=161)
290def snmpwalk(dst, oid="1", community="public"):
291 try:
292 while True:
293 r = sr1(IP(dst=dst) / UDP(sport=RandShort()) / SNMP(community=community, PDU=SNMPnext(varbindlist=[SNMPvarbind(oid=oid)])), timeout=2, chainCC=1, verbose=0, retry=2) # noqa: E501
294 if r is None:
295 print("No answers")
296 break
297 if ICMP in r:
298 print(repr(r))
299 break
300 print("%-40s: %r" % (r[SNMPvarbind].oid.val, r[SNMPvarbind].value))
301 oid = r[SNMPvarbind].oid
303 except KeyboardInterrupt:
304 pass