Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/tftp.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

364 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7TFTP (Trivial File Transfer Protocol). 

8 

9This provides TFTP implementation and 4 small automata: 

10 - TFTP_read: read a remote file 

11 - TFTP_RRQ_server: server that answers to read requests 

12 - TFTP_write: write a remote file 

13 - TFTP_WRQ_server: server than accepts write requests 

14""" 

15 

16import os 

17import random 

18 

19from scapy.packet import Packet, bind_layers, split_bottom_up, bind_bottom_up 

20from scapy.fields import ( 

21 PacketListField, 

22 ShortEnumField, 

23 ShortField, 

24 StrNullField, 

25) 

26from scapy.automaton import ATMT, Automaton 

27from scapy.base_classes import Net 

28from scapy.config import conf 

29from scapy.sessions import IPSession 

30from scapy.volatile import RandShort 

31 

32from scapy.layers.inet import UDP, IP 

33 

34TFTP_operations = {1: "RRQ", 2: "WRQ", 3: "DATA", 4: "ACK", 5: "ERROR", 6: "OACK"} # noqa: E501 

35 

36 

37class TFTP(Packet): 

38 name = "TFTP opcode" 

39 fields_desc = [ShortEnumField("op", 1, TFTP_operations), ] 

40 

41 

42class TFTP_RRQ(Packet): 

43 name = "TFTP Read Request" 

44 fields_desc = [StrNullField("filename", ""), 

45 StrNullField("mode", "octet")] 

46 

47 def answers(self, other): 

48 return 0 

49 

50 def mysummary(self): 

51 return self.sprintf("RRQ %filename%"), [UDP] 

52 

53 

54class TFTP_WRQ(Packet): 

55 name = "TFTP Write Request" 

56 fields_desc = [StrNullField("filename", ""), 

57 StrNullField("mode", "octet")] 

58 

59 def answers(self, other): 

60 return 0 

61 

62 def mysummary(self): 

63 return self.sprintf("WRQ %filename%"), [UDP] 

64 

65 

66class TFTP_DATA(Packet): 

67 name = "TFTP Data" 

68 fields_desc = [ShortField("block", 0)] 

69 

70 def answers(self, other): 

71 return self.block == 1 and isinstance(other, TFTP_RRQ) 

72 

73 def mysummary(self): 

74 return self.sprintf("DATA %block%"), [UDP] 

75 

76 

77class TFTP_Option(Packet): 

78 fields_desc = [StrNullField("oname", ""), 

79 StrNullField("value", "")] 

80 

81 def extract_padding(self, pkt): 

82 return "", pkt 

83 

84 

85class TFTP_Options(Packet): 

86 fields_desc = [PacketListField("options", [], TFTP_Option, length_from=lambda x:None)] # noqa: E501 

87 

88 

89class TFTP_ACK(Packet): 

90 name = "TFTP Ack" 

91 fields_desc = [ShortField("block", 0)] 

92 

93 def answers(self, other): 

94 if isinstance(other, TFTP_DATA): 

95 return self.block == other.block 

96 elif isinstance(other, (TFTP_RRQ, TFTP_WRQ, TFTP_OACK)): # noqa: E501 

97 return self.block == 0 

98 return 0 

99 

100 def mysummary(self): 

101 return self.sprintf("ACK %block%"), [UDP] 

102 

103 

104TFTP_Error_Codes = {0: "Not defined", 

105 1: "File not found", 

106 2: "Access violation", 

107 3: "Disk full or allocation exceeded", 

108 4: "Illegal TFTP operation", 

109 5: "Unknown transfer ID", 

110 6: "File already exists", 

111 7: "No such user", 

112 8: "Terminate transfer due to option negotiation", 

113 } 

114 

115 

116class TFTP_ERROR(Packet): 

117 name = "TFTP Error" 

118 fields_desc = [ShortEnumField("errorcode", 0, TFTP_Error_Codes), 

119 StrNullField("errormsg", "")] 

120 

121 def answers(self, other): 

122 return isinstance(other, (TFTP_DATA, TFTP_RRQ, TFTP_WRQ, TFTP_ACK)) 

123 

124 def mysummary(self): 

125 return self.sprintf("ERROR %errorcode%: %errormsg%"), [UDP] 

126 

127 

128class TFTP_OACK(Packet): 

129 name = "TFTP Option Ack" 

130 fields_desc = [] 

131 

132 def answers(self, other): 

133 return isinstance(other, (TFTP_WRQ, TFTP_RRQ)) 

134 

135 

136bind_layers(UDP, TFTP, dport=69) 

137bind_layers(TFTP, TFTP_RRQ, op=1) 

138bind_layers(TFTP, TFTP_WRQ, op=2) 

139bind_layers(TFTP, TFTP_DATA, op=3) 

140bind_layers(TFTP, TFTP_ACK, op=4) 

141bind_layers(TFTP, TFTP_ERROR, op=5) 

142bind_layers(TFTP, TFTP_OACK, op=6) 

143bind_layers(TFTP_RRQ, TFTP_Options) 

144bind_layers(TFTP_WRQ, TFTP_Options) 

145bind_layers(TFTP_OACK, TFTP_Options) 

146 

147 

148# Automatons 

149 

150class TFTP_read(Automaton): 

151 """ 

152 TFTP automaton to read a remote file on a TFTP server. 

153 

154 :param filename: the name of the remote file to read. 

155 :param server: the host on which to read (IP or name). 

156 :param sport: (optional) the source port to use. (default: random) 

157 :param port: (optional) the TFTP port (default: 69) 

158 """ 

159 

160 def parse_args(self, filename, server, sport=None, port=69, **kargs): 

161 if "iface" not in kargs: 

162 server = str(Net(server)) 

163 kargs["iface"] = conf.route.route(server)[0] 

164 Automaton.parse_args(self, **kargs) 

165 self.filename = filename 

166 self.server = server 

167 self.port = port 

168 self.sport = sport 

169 

170 def master_filter(self, pkt): 

171 return (IP in pkt and pkt[IP].src == self.server and UDP in pkt and 

172 pkt[UDP].dport == self.my_tid and 

173 (self.server_tid is None or pkt[UDP].sport == self.server_tid)) 

174 

175 # BEGIN 

176 @ATMT.state(initial=1) 

177 def BEGIN(self): 

178 self.blocksize = 512 

179 self.my_tid = self.sport or RandShort()._fix() 

180 bind_bottom_up(UDP, TFTP, dport=self.my_tid) 

181 self.server_tid = None 

182 self.res = b"" 

183 

184 self.l3 = IP(dst=self.server) / UDP(sport=self.my_tid, dport=self.port) / TFTP() # noqa: E501 

185 self.last_packet = self.l3 / TFTP_RRQ(filename=self.filename, mode="octet") # noqa: E501 

186 self.send(self.last_packet) 

187 self.awaiting = 1 

188 

189 raise self.WAITING() 

190 

191 # WAITING 

192 @ATMT.state() 

193 def WAITING(self): 

194 pass 

195 

196 @ATMT.receive_condition(WAITING) 

197 def receive_data(self, pkt): 

198 if TFTP_DATA in pkt and pkt[TFTP_DATA].block == self.awaiting: 

199 if self.server_tid is None: 

200 self.server_tid = pkt[UDP].sport 

201 self.l3[UDP].dport = self.server_tid 

202 raise self.RECEIVING(pkt) 

203 

204 @ATMT.receive_condition(WAITING, prio=1) 

205 def receive_error(self, pkt): 

206 if TFTP_ERROR in pkt: 

207 raise self.ERROR(pkt) 

208 

209 @ATMT.timeout(WAITING, 3) 

210 def timeout_waiting(self): 

211 raise self.WAITING() 

212 

213 @ATMT.action(timeout_waiting) 

214 def retransmit_last_packet(self): 

215 self.send(self.last_packet) 

216 

217 @ATMT.action(receive_data) 

218# @ATMT.action(receive_error) 

219 def send_ack(self): 

220 self.last_packet = self.l3 / TFTP_ACK(block=self.awaiting) 

221 self.send(self.last_packet) 

222 

223 # RECEIVED 

224 @ATMT.state() 

225 def RECEIVING(self, pkt): 

226 if conf.raw_layer in pkt: 

227 recvd = pkt[conf.raw_layer].load 

228 else: 

229 recvd = b"" 

230 self.res += recvd 

231 self.awaiting += 1 

232 if len(recvd) == self.blocksize: 

233 raise self.WAITING() 

234 raise self.END() 

235 

236 # ERROR 

237 @ATMT.state(error=1) 

238 def ERROR(self, pkt): 

239 split_bottom_up(UDP, TFTP, dport=self.my_tid) 

240 return pkt[TFTP_ERROR].summary() 

241 

242 # END 

243 @ATMT.state(final=1) 

244 def END(self): 

245 split_bottom_up(UDP, TFTP, dport=self.my_tid) 

246 return self.res 

247 

248 

249class TFTP_write(Automaton): 

250 """ 

251 TFTP automaton to write a local file onto a TFTP server. 

252 

253 :param filename: the name of the remote file to write. 

254 :param data: the bytes data to write. 

255 :param server: the host on which to read (IP or name). 

256 :param sport: (optional) the source port to use. (default: random) 

257 :param port: (optional) the TFTP port (default: 69) 

258 """ 

259 

260 def parse_args(self, filename, data, server, sport=None, port=69, **kargs): 

261 if "iface" not in kargs: 

262 server = str(Net(server)) 

263 kargs["iface"] = conf.route.route(server)[0] 

264 Automaton.parse_args(self, **kargs) 

265 self.filename = filename 

266 self.server = server 

267 self.port = port 

268 self.sport = sport 

269 self.blocksize = 512 

270 self.origdata = data 

271 

272 def master_filter(self, pkt): 

273 return (IP in pkt and pkt[IP].src == self.server and UDP in pkt and 

274 pkt[UDP].dport == self.my_tid and 

275 (self.server_tid is None or pkt[UDP].sport == self.server_tid)) 

276 

277 # BEGIN 

278 @ATMT.state(initial=1) 

279 def BEGIN(self): 

280 self.data = [self.origdata[i * self.blocksize:(i + 1) * self.blocksize] 

281 for i in range(len(self.origdata) // self.blocksize + 1)] 

282 self.my_tid = self.sport or RandShort()._fix() 

283 bind_bottom_up(UDP, TFTP, dport=self.my_tid) 

284 self.server_tid = None 

285 

286 self.l3 = IP(dst=self.server) / UDP(sport=self.my_tid, dport=self.port) / TFTP() # noqa: E501 

287 self.last_packet = self.l3 / TFTP_WRQ(filename=self.filename, mode="octet") # noqa: E501 

288 self.send(self.last_packet) 

289 self.res = "" 

290 self.awaiting = 0 

291 

292 raise self.WAITING_ACK() 

293 

294 # WAITING_ACK 

295 @ATMT.state() 

296 def WAITING_ACK(self): 

297 pass 

298 

299 @ATMT.receive_condition(WAITING_ACK) 

300 def received_ack(self, pkt): 

301 if TFTP_ACK in pkt and pkt[TFTP_ACK].block == self.awaiting: 

302 if self.server_tid is None: 

303 self.server_tid = pkt[UDP].sport 

304 self.l3[UDP].dport = self.server_tid 

305 raise self.SEND_DATA() 

306 

307 @ATMT.receive_condition(WAITING_ACK) 

308 def received_error(self, pkt): 

309 if TFTP_ERROR in pkt: 

310 raise self.ERROR(pkt) 

311 

312 @ATMT.timeout(WAITING_ACK, 3) 

313 def timeout_waiting(self): 

314 raise self.WAITING_ACK() 

315 

316 @ATMT.action(timeout_waiting) 

317 def retransmit_last_packet(self): 

318 self.send(self.last_packet) 

319 

320 # SEND_DATA 

321 @ATMT.state() 

322 def SEND_DATA(self): 

323 self.awaiting += 1 

324 self.last_packet = self.l3 / TFTP_DATA(block=self.awaiting) / self.data.pop(0) # noqa: E501 

325 self.send(self.last_packet) 

326 if self.data: 

327 raise self.WAITING_ACK() 

328 raise self.END() 

329 

330 # ERROR 

331 @ATMT.state(error=1) 

332 def ERROR(self, pkt): 

333 split_bottom_up(UDP, TFTP, dport=self.my_tid) 

334 return pkt[TFTP_ERROR].summary() 

335 

336 # END 

337 @ATMT.state(final=1) 

338 def END(self): 

339 split_bottom_up(UDP, TFTP, dport=self.my_tid) 

340 

341 

342class TFTP_WRQ_server(Automaton): 

343 """ 

344 TFTP automaton to wait for incoming files 

345 

346 :param ip: (optional) the local IP to listen on. 

347 :param sport: (optional) the local port (by default: random) 

348 """ 

349 

350 def parse_args(self, ip=None, sport=None, *args, **kargs): 

351 if "iface" not in kargs and ip: 

352 ip = str(Net(ip)) 

353 kargs["iface"] = conf.route.route(ip)[0] 

354 kargs.setdefault("session", IPSession()) 

355 Automaton.parse_args(self, *args, **kargs) 

356 self.ip = ip 

357 self.sport = sport 

358 

359 def master_filter(self, pkt): 

360 return TFTP in pkt and (not self.ip or pkt[IP].dst == self.ip) 

361 

362 @ATMT.state(initial=1) 

363 def BEGIN(self): 

364 self.blksize = 512 

365 self.blk = 1 

366 self.filedata = b"" 

367 self.my_tid = self.sport or random.randint(10000, 65500) 

368 bind_bottom_up(UDP, TFTP, dport=self.my_tid) 

369 

370 @ATMT.receive_condition(BEGIN) 

371 def receive_WRQ(self, pkt): 

372 if TFTP_WRQ in pkt: 

373 raise self.WAIT_DATA().action_parameters(pkt) 

374 

375 @ATMT.action(receive_WRQ) 

376 def ack_WRQ(self, pkt): 

377 ip = pkt[IP] 

378 self.ip = ip.dst 

379 self.dst = ip.src 

380 self.filename = pkt[TFTP_WRQ].filename 

381 options = pkt.getlayer(TFTP_Options) 

382 self.l3 = IP(src=ip.dst, dst=ip.src) / UDP(sport=self.my_tid, dport=pkt.sport) / TFTP() # noqa: E501 

383 if options is None: 

384 self.last_packet = self.l3 / TFTP_ACK(block=0) 

385 self.send(self.last_packet) 

386 else: 

387 opt = [x for x in options.options if x.oname.upper() == b"BLKSIZE"] 

388 if opt: 

389 self.blksize = int(opt[0].value) 

390 self.debug(2, "Negotiated new blksize at %i" % self.blksize) 

391 self.last_packet = self.l3 / TFTP_OACK() / TFTP_Options(options=opt) # noqa: E501 

392 self.send(self.last_packet) 

393 

394 @ATMT.state() 

395 def WAIT_DATA(self): 

396 pass 

397 

398 @ATMT.timeout(WAIT_DATA, 1) 

399 def resend_ack(self): 

400 self.send(self.last_packet) 

401 raise self.WAIT_DATA() 

402 

403 @ATMT.receive_condition(WAIT_DATA) 

404 def receive_data(self, pkt): 

405 if TFTP_DATA in pkt: 

406 data = pkt[TFTP_DATA] 

407 if data.block == self.blk: 

408 raise self.DATA(data) 

409 

410 @ATMT.action(receive_data) 

411 def ack_data(self): 

412 self.last_packet = self.l3 / TFTP_ACK(block=self.blk) 

413 self.send(self.last_packet) 

414 

415 @ATMT.state() 

416 def DATA(self, data): 

417 self.filedata += data.load 

418 if len(data.load) < self.blksize: 

419 raise self.END() 

420 self.blk += 1 

421 raise self.WAIT_DATA() 

422 

423 @ATMT.state(final=1) 

424 def END(self): 

425 split_bottom_up(UDP, TFTP, dport=self.my_tid) 

426 return self.filename, self.filedata 

427 

428 

429class TFTP_RRQ_server(Automaton): 

430 """ 

431 TFTP automaton to serve local files 

432 

433 You can't use 'store' and 'dir' at the same time. 

434 

435 :param store: (optional) a dictionary that contains the file data, like 

436 {"thefile": b"data"}. 

437 :param dir: (optional) a folder that contains the data file data. 

438 :param joker: (optional) data to return when no file/data is found. 

439 :param ip: (optional) the local IP to listen on. 

440 :param sport: (optional) the local port (by default: random) 

441 :param serve_one: (optional) close after serving one client (default: False) 

442 """ 

443 

444 def parse_args(self, store=None, joker=None, dir=None, ip=None, sport=None, serve_one=False, **kargs): # noqa: E501 

445 if "iface" not in kargs and ip: 

446 ip = str(Net(ip)) 

447 kargs["iface"] = conf.route.route(ip)[0] 

448 kargs.setdefault("session", IPSession()) 

449 Automaton.parse_args(self, **kargs) 

450 if store is None: 

451 store = {} 

452 if dir is not None: 

453 self.dir = os.path.join(os.path.abspath(dir), "") 

454 else: 

455 self.dir = None 

456 self.store = store 

457 self.joker = joker 

458 self.ip = ip 

459 self.sport = sport 

460 self.serve_one = serve_one 

461 self.my_tid = self.sport or random.randint(10000, 65500) 

462 bind_bottom_up(UDP, TFTP, dport=self.my_tid) 

463 

464 def master_filter(self, pkt): 

465 return TFTP in pkt and (not self.ip or pkt[IP].dst == self.ip) 

466 

467 @ATMT.state(initial=1) 

468 def WAIT_RRQ(self): 

469 self.blksize = 512 

470 self.blk = 0 

471 

472 @ATMT.receive_condition(WAIT_RRQ) 

473 def receive_rrq(self, pkt): 

474 if TFTP_RRQ in pkt: 

475 raise self.RECEIVED_RRQ(pkt) 

476 

477 @ATMT.state() 

478 def RECEIVED_RRQ(self, pkt): 

479 ip = pkt[IP] 

480 options = pkt.getlayer(TFTP_Options) 

481 self.l3 = IP(src=ip.dst, dst=ip.src) / UDP(sport=self.my_tid, dport=ip.sport) / TFTP() # noqa: E501 

482 self.filename = pkt[TFTP_RRQ].filename.decode("utf-8", "ignore") 

483 self.blk = 1 

484 self.data = None 

485 if self.filename in self.store: 

486 self.data = self.store[self.filename] 

487 elif self.dir is not None: 

488 fn = os.path.abspath(os.path.join(self.dir, self.filename)) 

489 if fn.startswith(self.dir): # Check we're still in the server's directory # noqa: E501 

490 try: 

491 with open(fn) as fd: 

492 self.data = fd.read() 

493 except IOError: 

494 pass 

495 if self.data is None: 

496 self.data = self.joker 

497 

498 if options: 

499 opt = [x for x in options.options if x.oname.upper() == b"BLKSIZE"] 

500 if opt: 

501 self.blksize = int(opt[0].value) 

502 self.debug(2, "Negotiated new blksize at %i" % self.blksize) 

503 self.last_packet = self.l3 / TFTP_OACK() / TFTP_Options(options=opt) # noqa: E501 

504 self.send(self.last_packet) 

505 

506 @ATMT.condition(RECEIVED_RRQ) 

507 def file_in_store(self): 

508 if self.data is not None: 

509 self.blknb = len(self.data) / self.blksize + 1 

510 raise self.SEND_FILE() 

511 

512 @ATMT.condition(RECEIVED_RRQ) 

513 def file_not_found(self): 

514 if self.data is None: 

515 raise self.WAIT_RRQ() 

516 

517 @ATMT.action(file_not_found) 

518 def send_error(self): 

519 self.send(self.l3 / TFTP_ERROR(errorcode=1, errormsg=TFTP_Error_Codes[1])) # noqa: E501 

520 

521 @ATMT.state() 

522 def SEND_FILE(self): 

523 self.send(self.l3 / TFTP_DATA(block=self.blk) / self.data[(self.blk - 1) * self.blksize:self.blk * self.blksize]) # noqa: E501 

524 

525 @ATMT.timeout(SEND_FILE, 3) 

526 def timeout_waiting_ack(self): 

527 raise self.SEND_FILE() 

528 

529 @ATMT.receive_condition(SEND_FILE) 

530 def received_ack(self, pkt): 

531 if TFTP_ACK in pkt and pkt[TFTP_ACK].block == self.blk: 

532 raise self.RECEIVED_ACK() 

533 

534 @ATMT.state() 

535 def RECEIVED_ACK(self): 

536 self.blk += 1 

537 

538 @ATMT.condition(RECEIVED_ACK) 

539 def no_more_data(self): 

540 if self.blk > self.blknb: 

541 if self.serve_one: 

542 raise self.END() 

543 raise self.WAIT_RRQ() 

544 

545 @ATMT.condition(RECEIVED_ACK, prio=2) 

546 def data_remaining(self): 

547 raise self.SEND_FILE() 

548 

549 @ATMT.state(final=1) 

550 def END(self): 

551 split_bottom_up(UDP, TFTP, dport=self.my_tid)