forked from secdev/scapy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtftp.uts
124 lines (96 loc) · 3.92 KB
/
tftp.uts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
% Regression tests for TFTP
# More information at http://www.secdev.org/projects/UTscapy/
+ Automatons
~ linux
= Utilities
~ linux
legacy_select_objects = None
def patch_select_objects(classname):
global legacy_select_objects
legacy_select_objects = select_objects
def mock_select_objects(inputs, remain):
test = [s for s in inputs if isinstance(s, classname)]
if test:
if len(test[0].packets):
return test
else:
inputs = [s for s in inputs if not isinstance(s, classname)]
return legacy_select_objects(inputs, remain)
scapy.automaton.select_objects = mock_select_objects
class MockTFTPSocket(object):
packets = []
def recv(self, n):
pkt = self.packets.pop(0)
return pkt
def send(self, *args, **kargs):
pass
= TFTP_read() automaton
~ linux
class MockReadSocket(MockTFTPSocket):
packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=1) / ("P" * 512),
IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=2) / "<3"]
patch_select_objects(MockReadSocket)
tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807,
ll=MockReadSocket,
recvsock=MockReadSocket)
tftp_read.run()
scapy.automaton.select_objects = legacy_select_objects
assert tftp_read.res == (b"P" * 512 + b"<3")
= TFTP_write() automaton
~ linux
data_received = b""
class MockWriteSocket(MockTFTPSocket):
packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=0),
IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=1) ]
def send(self, *args, **kargs):
if len(args) and Raw in args[0]:
global data_received
data_received += args[0][Raw].load
tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807,
ll=MockWriteSocket,
recvsock=MockWriteSocket)
patch_select_objects(MockWriteSocket)
tftp_write.run()
scapy.automaton.select_objects = legacy_select_objects
assert data_received == (b"P" * 767 + b"Scapy <3")
= TFTP_WRQ_server() automaton
~ linux
class MockWRQSocket(MockTFTPSocket):
packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt"),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 512),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"]
tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807,
ll=MockWRQSocket,
recvsock=MockWRQSocket)
patch_select_objects(MockWRQSocket)
assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 512 + b"<3"))
scapy.automaton.select_objects = legacy_select_objects
= TFTP_RRQ_server() automaton
~ linux
sent_data = "P" * 512 + "<3"
import tempfile
filename = tempfile.mktemp(suffix=".txt")
fdesc = open(filename, "w")
fdesc.write(sent_data)
fdesc.close()
received_data = ""
class MockRRQSocket(MockTFTPSocket):
packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename="scapy.txt") / TFTP_Options(),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename=filename[5:]) / TFTP_Options(),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=1),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=2) ]
def send(self, *args, **kargs):
if len(args):
pkt = args[0]
if TFTP_DATA in pkt:
global received_data
received_data += pkt[Raw].load.decode("utf-8")
tftp_rrq = TFTP_RRQ_server(ip="1.2.3.4", sport=0x2807, dir="/tmp/", serve_one=True,
ll=MockRRQSocket,
recvsock=MockRRQSocket)
patch_select_objects(MockRRQSocket)
tftp_rrq.run()
scapy.automaton.select_objects = legacy_select_objects
assert received_data == sent_data
import os
os.unlink(filename)