我正在尝试构建一个在 ICMP Echo 数据包的数据部分中隐藏秘密消息的网络客户端,以及一个捕获 Echo 数据包并解密/打印秘密消息的网络服务器。
我的服务器/侦听器似乎在我的 Windows 本地计算机上接收并正确打印了从客户端发送的消息,但它确实在我的 Linux 虚拟机上运行,我不确定为什么。另外,当我在wireshark上过滤icmp时,我看不到任何流量
以下是我的客户代码:
import scapy.all as scapy
from scapy.layers.inet import ICMP, IP
import random
# from cryptography.fernet import Fernet
# from encryption import KEY # shared key for decryption
from encryption import load_key
from cryptography.fernet import Fernet
key = 'ZmDfcTF7_60GrrY167zsiPd67pEvs0aGOv2oasOM1Pg='
# load_key()
f = Fernet(key)
class CovertHeader(scapy.Packet):
'''
This class defines a header for use with your encrypted message.
You may define additional header fields if you want or need to. The seqNum
field is used to help reorder messages, and len is the length of data
following this header.
'''
name = "Covert Comms Channel Header"
fields_desc = [scapy.ByteField("ID", 0), # ID used to pair packets
scapy.ByteField("len", 0), # length of data
scapy.BitField("final", 0, 1), # bit set to 1 for final packet
scapy.BitField("seqNum", 0, 15)] # sequence number of current message
def sendPacket(msg, seqNum, id, msgLength, payload):
'''
Send packet is called to send message.
msg: 100 byte chunks of the message
seqNum: the current sequence number
id: identifier used to group packets at receiver
msgLength: used to determine when the last packet is sent
payload: actual size of payload - padding
'''
pkt = IP(src=IP_SRC, dst=IP_DST, proto=PROTO_ICMP)
pkt = pkt / ICMP(type=ECHO_ICMP_TYPE, code=ECHO_ICMP_CODE)
pkt = pkt / CovertHeader()
pkt = pkt / msg
print('The current sequence number is: ', seqNum)
pkt[CovertHeader].seqNum = seqNum
pkt[CovertHeader].ID = id
pkt[CovertHeader].len = 128 + payload
if msgLength == 0:
pkt[CovertHeader].final = 1
sendSock.send(pkt)
print(f'Sent packet with payload length: {payload} \nID: {id}')
# IPs, protocol, type, and code
IP_SRC = '127.0.0.1'
IP_DST = input("Enter Destination Address [Recommend 127.0.0.1]: ")
PROTO_ICMP = 1
ECHO_ICMP_TYPE = 8
ECHO_ICMP_CODE = 0
scapy.conf.L3socket = scapy.conf.L3socket()
sendSock = scapy.conf.L3socket
msg = input('Enter your message: ')
# f = Fernet(KEY)
encMsg = msg.encode()
print('message length: ', len(encMsg), 'bytes')
msgList = []
done = False
chunkStart = 0
chunkEnd = 80
while not done:
# split into 80 byte chunks
#msgChunk = encMsg[chunkStart:chunkEnd]
#chunkStart = chunkStart + 80
#chunkEnd = chunkEnd + 80
#msgList.append(msgChunk)
#if chunkStart > len(encMsg):
# done = True
# break
#elif chunkEnd > len(encMsg):
# msgList.append(encMsg[chunkStart:])
# done = True
# apply padding if not 80 bytes
while not done:
msgChunk = encMsg[chunkStart:chunkEnd]
chunkStart = chunkStart + 80
chunkEnd = chunkEnd + 80
# Check if the chunk is less than 80 bytes and pad it if necessary
if len(msgChunk) < 80:
msgChunk = msgChunk.ljust(80, b'\x00')
msgList.append(msgChunk)
if chunkStart > len(encMsg):
done = True
msgLength = len(encMsg)
seqNum = random.randint(0, 32766)
id = random.randint(1, 255)
for i in range(len(msgList)):
curMsg = msgList[i]
szPayload = len(msgList[i])
msgLength = 1 if i < len(msgList) - 1 else 0
sendPacket(curMsg, seqNum, id, msgLength, szPayload)
# Function to receive and process packets
def receivePackets():
def process_packet(pkt):
if pkt.haslayer(CovertHeader):
print("Received response: ", pkt[CovertHeader].load.decode())
scapy.sniff(filter="icmp", prn=process_packet, count=5, timeout=10)
# Function to handle image data
def handle_image(image_path):
with open(image_path, "rb") as image_file:
image_data = image_file.read()
chunks = [image_data[i:i+80] for i in range(0, len(image_data), 80)]
return chunks
# Main function to send messages or images
def main():
choice = input("Send message (M) or image (I)? ")
if choice.lower() == 'm':
msg = input('Enter your message: ')
encMsg = f.encrypt(msg.encode())
chunkStart, chunkEnd = 0, 80
seqNum = random.randint(0, 32766)
id = random.randint(1, 255)
while chunkStart < len(encMsg):
curMsg = encMsg[chunkStart:chunkEnd]
szPayload = len(curMsg)
msgLength = 0 if chunkEnd >= len(encMsg) else 1
sendPacket(curMsg, seqNum, id, msgLength, szPayload)
chunkStart += 80
chunkEnd += 80
seqNum += 1 # Increment sequence number for each packet
elif choice.lower() == 'i':
image_path = input('Enter [ABSOLUTE]: ')
image_chunks = handle_image(image_path)
seqNum = random.randint(0, 32766)
id = random.randint(1, 255)
for i, chunk in enumerate(image_chunks):
szPayload = len(chunk)
msgLength = 0 if i == len(image_chunks) - 1 else 1
sendPacket(chunk, seqNum, id, msgLength, szPayload)
seqNum += 1 # Increment sequence number for each packet
if __name__ == "__main__":
main()
receivePackets()
下面是我的监听器/服务器代码:
import scapy.all as scapy
from scapy.layers.inet import ICMP, IP
# from cryptography.fernet import Fernet
# from encryption import KEY
from encryption import load_key
from cryptography.fernet import Fernet
#key = 'tkXHvFr4PCJGFfbPPxStZ6MYUj30D1x5aMOrIkvt2tk'
#key = Fernet.generate_key()
key = 'ZmDfcTF7_60GrrY167zsiPd67pEvs0aGOv2oasOM1Pg='
f = Fernet(key)
class CovertHeader(scapy.Packet):
'''
This class defines a header for use with your encrypted message.
'''
name = "Covert Comms Channel Header"
fields_desc = [scapy.ByteField("ID", 0),
scapy.ByteField("len", 0),
scapy.BitField("final", 0, 1),
scapy.BitField("seqNum", 0, 15)]
finalMsg = ""
# f = Fernet(KEY)
def process_icmp_only(pkt):
'''
Filter the packets one at a time as received.
'''
if pkt.haslayer(ICMP): # Updated to use the directly imported ICMP
if pkt[ICMP].type == 8:
pkt[ICMP].decode_payload_as(CovertHeader)
print('Received covert ICMP packet, processing')
global finalMsg
finalMsg = finalMsg + pkt[CovertHeader].load.decode()
if pkt[CovertHeader].final == 1:
print(finalMsg)
finalMsg = ""
scapy.conf.L3socket = scapy.conf.L3socket()
rcvSock = scapy.conf.L3socket
print("The Server is currently running...")
while True:
try:
rcvSock.sniff(count=1, prn=process_icmp_only)
except KeyboardInterrupt as e:
exit()
except:
continue
# Function to send a response back to the client
def sendResponse(response, dest_ip):
response_pkt = IP(dst=dest_ip) / ICMP() / CovertHeader() / response.encode() # Using the directly imported ICMP
scapy.send(response_pkt)
# Main function to process and respond to messages or images
def main():
def process_packet(pkt):
if pkt.haslayer(ICMP) and pkt.haslayer(CovertHeader): # Using the directly imported ICMP
print("Received message: ", pkt[CovertHeader].load.decode())
sendResponse("Ack: Message received", pkt[IP].src)
print("Listener Running...")
scapy.sniff(filter="icmp", prn=process_packet, count=10, timeout=20)
if __name__ == "__main__":
main()
Wireshark 中的 ICMP 过滤: 在 Wireshark 中看不到 ICMP 流量的问题可能是由于 ICMP 数据包经常被防火墙阻止所致。确保 Linux 虚拟机上的防火墙没有阻止 ICMP 流量。
ICMP ID 字段: 在 sendPacket 函数中,将 CovertHeader 的 ID 字段设置为随机值。但是,在服务器端的 process_icmp_only 函数中,您没有检查接收到的数据包是否具有预期的 ID。您可能需要添加对 ID 字段的检查,以确保您正在处理正确的数据包。
在 process_icmp_only 函数中添加此检查:
def process_icmp_only(pkt):
if pkt.haslayer(ICMP) and pkt.haslayer(CovertHeader):
if pkt[ICMP].type == 8 and pkt[CovertHeader].ID == expected_id:
# ... rest of the code
嗅探超时:在服务器的主要功能中,您使用的 scapy.sniff 超时为 20 秒,计数为 10。这意味着它最多只能捕获 10 个数据包或在退出前等待 20 秒。根据您的要求调整超时和计数值。
更改此行:
scapy.sniff(filter="icmp", prn=process_packet, count=10, timeout=20)
对于这样的事情:
scapy.sniff(filter="icmp", prn=process_packet, count=0, timeout=None)
This will make it wait indefinitely until interrupted.
数据包发送:客户端发送带有有效负载的数据包,但服务器发送回空有效负载作为确认。如果您打算确认收到的消息,则应在响应中包含确认消息。
修改 sendResponse 函数以包含有意义的确认消息:
def sendResponse(response, dest_ip):
response_pkt = IP(dst=dest_ip) / ICMP() / CovertHeader() / response.encode()
scapy.send(response_pkt)
然后,在 process_packet 函数中,您可以打印确认消息:
def process_packet(pkt):
if pkt.haslayer(ICMP) and pkt.haslayer(CovertHeader):
if pkt[ICMP].type == 8 and pkt[CovertHeader].ID == expected_id:
print("Received message: ", pkt[CovertHeader].load.decode())
sendResponse("Ack: Message received", pkt[IP].src)
确保解决这些问题,这应该有助于解决您面临的问题。此外,请检查在 Linux 虚拟机上执行期间可能引发的任何错误消息或异常。