Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/snmp.py: 88%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

122 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7SNMP (Simple Network Management Protocol). 

8""" 

9 

10from scapy.packet import bind_layers, bind_bottom_up 

11from scapy.asn1packet import ASN1_Packet 

12from scapy.asn1fields import ASN1F_INTEGER, ASN1F_IPADDRESS, ASN1F_OID, \ 

13 ASN1F_SEQUENCE, ASN1F_SEQUENCE_OF, ASN1F_STRING, ASN1F_TIME_TICKS, \ 

14 ASN1F_enum_INTEGER, ASN1F_field, ASN1F_CHOICE, ASN1F_optional, ASN1F_NULL 

15from scapy.asn1.asn1 import ASN1_Class_UNIVERSAL, ASN1_Codecs, ASN1_NULL, \ 

16 ASN1_SEQUENCE 

17from scapy.asn1.ber import BERcodec_SEQUENCE 

18from scapy.sendrecv import sr1 

19from scapy.volatile import RandShort, IntAutoTime 

20from scapy.layers.inet import UDP, IP, ICMP 

21 

22# Import needed to initialize conf.mib 

23from scapy.asn1.mib import conf # noqa: F401 

24 

25########## 

26# SNMP # 

27########## 

28 

29# [ ASN1 class ] # 

30 

31 

32class ASN1_Class_SNMP(ASN1_Class_UNIVERSAL): 

33 name = "SNMP" 

34 PDU_GET = 0xa0 

35 PDU_NEXT = 0xa1 

36 PDU_RESPONSE = 0xa2 

37 PDU_SET = 0xa3 

38 PDU_TRAPv1 = 0xa4 

39 PDU_BULK = 0xa5 

40 PDU_INFORM = 0xa6 

41 PDU_TRAPv2 = 0xa7 

42 

43 

44class ASN1_SNMP_PDU_GET(ASN1_SEQUENCE): 

45 tag = ASN1_Class_SNMP.PDU_GET 

46 

47 

48class ASN1_SNMP_PDU_NEXT(ASN1_SEQUENCE): 

49 tag = ASN1_Class_SNMP.PDU_NEXT 

50 

51 

52class ASN1_SNMP_PDU_RESPONSE(ASN1_SEQUENCE): 

53 tag = ASN1_Class_SNMP.PDU_RESPONSE 

54 

55 

56class ASN1_SNMP_PDU_SET(ASN1_SEQUENCE): 

57 tag = ASN1_Class_SNMP.PDU_SET 

58 

59 

60class ASN1_SNMP_PDU_TRAPv1(ASN1_SEQUENCE): 

61 tag = ASN1_Class_SNMP.PDU_TRAPv1 

62 

63 

64class ASN1_SNMP_PDU_BULK(ASN1_SEQUENCE): 

65 tag = ASN1_Class_SNMP.PDU_BULK 

66 

67 

68class ASN1_SNMP_PDU_INFORM(ASN1_SEQUENCE): 

69 tag = ASN1_Class_SNMP.PDU_INFORM 

70 

71 

72class ASN1_SNMP_PDU_TRAPv2(ASN1_SEQUENCE): 

73 tag = ASN1_Class_SNMP.PDU_TRAPv2 

74 

75 

76# [ BER codecs ] # 

77 

78class BERcodec_SNMP_PDU_GET(BERcodec_SEQUENCE): 

79 tag = ASN1_Class_SNMP.PDU_GET 

80 

81 

82class BERcodec_SNMP_PDU_NEXT(BERcodec_SEQUENCE): 

83 tag = ASN1_Class_SNMP.PDU_NEXT 

84 

85 

86class BERcodec_SNMP_PDU_RESPONSE(BERcodec_SEQUENCE): 

87 tag = ASN1_Class_SNMP.PDU_RESPONSE 

88 

89 

90class BERcodec_SNMP_PDU_SET(BERcodec_SEQUENCE): 

91 tag = ASN1_Class_SNMP.PDU_SET 

92 

93 

94class BERcodec_SNMP_PDU_TRAPv1(BERcodec_SEQUENCE): 

95 tag = ASN1_Class_SNMP.PDU_TRAPv1 

96 

97 

98class BERcodec_SNMP_PDU_BULK(BERcodec_SEQUENCE): 

99 tag = ASN1_Class_SNMP.PDU_BULK 

100 

101 

102class BERcodec_SNMP_PDU_INFORM(BERcodec_SEQUENCE): 

103 tag = ASN1_Class_SNMP.PDU_INFORM 

104 

105 

106class BERcodec_SNMP_PDU_TRAPv2(BERcodec_SEQUENCE): 

107 tag = ASN1_Class_SNMP.PDU_TRAPv2 

108 

109 

110# [ ASN1 fields ] # 

111 

112class ASN1F_SNMP_PDU_GET(ASN1F_SEQUENCE): 

113 ASN1_tag = ASN1_Class_SNMP.PDU_GET 

114 

115 

116class ASN1F_SNMP_PDU_NEXT(ASN1F_SEQUENCE): 

117 ASN1_tag = ASN1_Class_SNMP.PDU_NEXT 

118 

119 

120class ASN1F_SNMP_PDU_RESPONSE(ASN1F_SEQUENCE): 

121 ASN1_tag = ASN1_Class_SNMP.PDU_RESPONSE 

122 

123 

124class ASN1F_SNMP_PDU_SET(ASN1F_SEQUENCE): 

125 ASN1_tag = ASN1_Class_SNMP.PDU_SET 

126 

127 

128class ASN1F_SNMP_PDU_TRAPv1(ASN1F_SEQUENCE): 

129 ASN1_tag = ASN1_Class_SNMP.PDU_TRAPv1 

130 

131 

132class ASN1F_SNMP_PDU_BULK(ASN1F_SEQUENCE): 

133 ASN1_tag = ASN1_Class_SNMP.PDU_BULK 

134 

135 

136class ASN1F_SNMP_PDU_INFORM(ASN1F_SEQUENCE): 

137 ASN1_tag = ASN1_Class_SNMP.PDU_INFORM 

138 

139 

140class ASN1F_SNMP_PDU_TRAPv2(ASN1F_SEQUENCE): 

141 ASN1_tag = ASN1_Class_SNMP.PDU_TRAPv2 

142 

143 

144# [ SNMP Packet ] # 

145 

146 

147SNMP_error = {0: "no_error", 

148 1: "too_big", 

149 2: "no_such_name", 

150 3: "bad_value", 

151 4: "read_only", 

152 5: "generic_error", 

153 6: "no_access", 

154 7: "wrong_type", 

155 8: "wrong_length", 

156 9: "wrong_encoding", 

157 10: "wrong_value", 

158 11: "no_creation", 

159 12: "inconsistent_value", 

160 13: "resource_unavailable", 

161 14: "commit_failed", 

162 15: "undo_failed", 

163 16: "authorization_error", 

164 17: "not_writable", 

165 18: "inconsistent_name", 

166 } 

167 

168SNMP_trap_types = {0: "cold_start", 

169 1: "warm_start", 

170 2: "link_down", 

171 3: "link_up", 

172 4: "auth_failure", 

173 5: "egp_neigh_loss", 

174 6: "enterprise_specific", 

175 } 

176 

177 

178class SNMPvarbind(ASN1_Packet): 

179 ASN1_codec = ASN1_Codecs.BER 

180 ASN1_root = ASN1F_SEQUENCE( 

181 ASN1F_OID("oid", "1.3"), 

182 ASN1F_optional( 

183 ASN1F_field("value", ASN1_NULL(0)) 

184 ), 

185 

186 # exceptions in responses 

187 ASN1F_optional(ASN1F_NULL("noSuchObject", None, implicit_tag=0x80)), 

188 ASN1F_optional(ASN1F_NULL("noSuchInstance", None, implicit_tag=0x81)), 

189 ASN1F_optional(ASN1F_NULL("endOfMibView", None, implicit_tag=0x82)), 

190 ) 

191 

192 

193class SNMPget(ASN1_Packet): 

194 ASN1_codec = ASN1_Codecs.BER 

195 ASN1_root = ASN1F_SNMP_PDU_GET(ASN1F_INTEGER("id", 0), 

196 ASN1F_enum_INTEGER("error", 0, SNMP_error), 

197 ASN1F_INTEGER("error_index", 0), 

198 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501 

199 ) 

200 

201 

202class SNMPnext(ASN1_Packet): 

203 ASN1_codec = ASN1_Codecs.BER 

204 ASN1_root = ASN1F_SNMP_PDU_NEXT(ASN1F_INTEGER("id", 0), 

205 ASN1F_enum_INTEGER("error", 0, SNMP_error), 

206 ASN1F_INTEGER("error_index", 0), 

207 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501 

208 ) 

209 

210 

211class SNMPresponse(ASN1_Packet): 

212 ASN1_codec = ASN1_Codecs.BER 

213 ASN1_root = ASN1F_SNMP_PDU_RESPONSE(ASN1F_INTEGER("id", 0), 

214 ASN1F_enum_INTEGER("error", 0, SNMP_error), # noqa: E501 

215 ASN1F_INTEGER("error_index", 0), 

216 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501 

217 ) 

218 

219 

220class SNMPset(ASN1_Packet): 

221 ASN1_codec = ASN1_Codecs.BER 

222 ASN1_root = ASN1F_SNMP_PDU_SET(ASN1F_INTEGER("id", 0), 

223 ASN1F_enum_INTEGER("error", 0, SNMP_error), 

224 ASN1F_INTEGER("error_index", 0), 

225 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501 

226 ) 

227 

228 

229class SNMPtrapv1(ASN1_Packet): 

230 ASN1_codec = ASN1_Codecs.BER 

231 ASN1_root = ASN1F_SNMP_PDU_TRAPv1(ASN1F_OID("enterprise", "1.3"), 

232 ASN1F_IPADDRESS("agent_addr", "0.0.0.0"), 

233 ASN1F_enum_INTEGER("generic_trap", 0, SNMP_trap_types), # noqa: E501 

234 ASN1F_INTEGER("specific_trap", 0), 

235 ASN1F_TIME_TICKS("time_stamp", IntAutoTime()), # noqa: E501 

236 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501 

237 ) 

238 

239 

240class SNMPbulk(ASN1_Packet): 

241 ASN1_codec = ASN1_Codecs.BER 

242 ASN1_root = ASN1F_SNMP_PDU_BULK(ASN1F_INTEGER("id", 0), 

243 ASN1F_INTEGER("non_repeaters", 0), 

244 ASN1F_INTEGER("max_repetitions", 0), 

245 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501 

246 ) 

247 

248 

249class SNMPinform(ASN1_Packet): 

250 ASN1_codec = ASN1_Codecs.BER 

251 ASN1_root = ASN1F_SNMP_PDU_INFORM(ASN1F_INTEGER("id", 0), 

252 ASN1F_enum_INTEGER("error", 0, SNMP_error), # noqa: E501 

253 ASN1F_INTEGER("error_index", 0), 

254 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501 

255 ) 

256 

257 

258class SNMPtrapv2(ASN1_Packet): 

259 ASN1_codec = ASN1_Codecs.BER 

260 ASN1_root = ASN1F_SNMP_PDU_TRAPv2(ASN1F_INTEGER("id", 0), 

261 ASN1F_enum_INTEGER("error", 0, SNMP_error), # noqa: E501 

262 ASN1F_INTEGER("error_index", 0), 

263 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501 

264 ) 

265 

266 

267class SNMP(ASN1_Packet): 

268 ASN1_codec = ASN1_Codecs.BER 

269 ASN1_root = ASN1F_SEQUENCE( 

270 ASN1F_enum_INTEGER("version", 1, {0: "v1", 1: "v2c", 2: "v2", 3: "v3"}), # noqa: E501 

271 ASN1F_STRING("community", "public"), 

272 ASN1F_CHOICE("PDU", SNMPget(), 

273 SNMPget, SNMPnext, SNMPresponse, SNMPset, 

274 SNMPtrapv1, SNMPbulk, SNMPinform, SNMPtrapv2) 

275 ) 

276 

277 def answers(self, other): 

278 return (isinstance(self.PDU, SNMPresponse) and 

279 isinstance(other.PDU, (SNMPget, SNMPnext, SNMPset)) and 

280 self.PDU.id == other.PDU.id) 

281 

282 

283bind_bottom_up(UDP, SNMP, sport=161) 

284bind_bottom_up(UDP, SNMP, dport=161) 

285bind_bottom_up(UDP, SNMP, sport=162) 

286bind_bottom_up(UDP, SNMP, dport=162) 

287bind_layers(UDP, SNMP, sport=161, dport=161) 

288 

289 

290def snmpwalk(dst, oid="1", community="public"): 

291 try: 

292 while True: 

293 r = sr1(IP(dst=dst) / UDP(sport=RandShort()) / SNMP(community=community, PDU=SNMPnext(varbindlist=[SNMPvarbind(oid=oid)])), timeout=2, chainCC=1, verbose=0, retry=2) # noqa: E501 

294 if r is None: 

295 print("No answers") 

296 break 

297 if ICMP in r: 

298 print(repr(r)) 

299 break 

300 print("%-40s: %r" % (r[SNMPvarbind].oid.val, r[SNMPvarbind].value)) 

301 oid = r[SNMPvarbind].oid 

302 

303 except KeyboardInterrupt: 

304 pass