镜像端口上的数据包嗅探 - 当 UDP 消息 > 50kb 时丢失数据包(Python、Scapy)

问题描述 投票:0回答:1

我的网络编程经验很少,并且在镜像端口上嗅探数据包时遇到了问题,其中嗅探器似乎丢失了数据包:

我有两台计算机(A 和 B)通过 UDP 以太网套接字进行通信。计算机 A 向 B 发送一个整数 (n),B 将一个经过腌制的 (n x 3) numpy 数组发送回 A。由于该数组可能会变得很大,因此计算机 A 首先发送传出数组的大小,然后再发送该数组以 1024 字节块为单位。 B 读取传入的数组大小,并继续接收,直到收到完整的消息。这一切都没有问题。

现在介绍计算机C。 C 应该“拦截”数组。我将 A 的端口镜像到 C 的端口,C 正在使用 Scapy 读取数据包。如果数据包是 8 字节,C 假定这是消息大小,然后开始连接后续数据包,直到达到该大小。这适用于最大 2000x3(约 48 kb)的阵列,但此后 C 似乎会丢失数据包。 A 和 B 仍然没有问题,只是 C 没有收集完整的消息。

潜在相关信息:

  • A和B之间的通信没有问题。数据包不会被丢弃。
  • 我的一切都与千兆以太网连接。
  • 我正在使用 Visual Studio Code SSH 从计算机 A 访问所有计算机。
  • 当我在每个 1024 字节块之后放置 time.sleep(0.001) 时,即使对于较大的数组,也不会丢失任何数据包(但是传输太慢)。
  • 托管交换机不会在其日志中报告任何错误或丢失的数据包或任何内容。
  • 对于 2500x3 阵列 C 通常会获取 59 个数据包中的 57 个。对于 5000x3 数组,C 通常会得到 116 中的 61。对于 10000x3,它会得到 235 中的 79。

这是服务器,运行在 B: 上

import numpy as np
import pickle
from struct import pack, unpack
import socket
import time

class TestUDPServer():
    def __init__(self):
        self.server_host = "192.168.0.101"
        self.server_port = 8080
        self.start_server()
        self.main_loop()

    def receive(self):
        data, client_address = self.server_socket.recvfrom(1024)
        data_r = pickle.loads(data)

        return data_r, client_address

    def send(self, var, client_address):
        pickled_data = pickle.dumps(var)

        # Send the size of the message first
        size = len(pickled_data)
        self.server_socket.sendto(pack(">Q", size), client_address)

        # Send message in chunks
        chunk_size = 1024
        for i in range(0, len(pickled_data), chunk_size):
            chunk = pickled_data[i:i + chunk_size]

            self.server_socket.sendto(chunk, client_address)
            #time.sleep(0.001)

    def start_server(self):
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.server_socket.bind((self.server_host, self.server_port))
        print(f"UDP Server listening on {self.server_host}:{self.server_port}")

    def main_loop(self):
        while True:
            pc_size, client_address = self.receive()
            if pc_size>0:
                break
            pc = np.random.random((pc_size, 3))
            print("received size:", pc_size)
            self.send(pc, client_address)

if __name__ == "__main__":
    TestUDPServer()

这是在 A 上运行的客户端:

import numpy as np
import pickle
import socket
from struct import pack, unpack


class TestUDPClient():
    def __init__(self):
        self.server_host = "192.168.0.101"
        self.server_port = 8080
        self.start_client()
        self.await_trigger()

    def receive(self):
        # Receive the size of the message first
        size_data, _ = self.client_socket.recvfrom(8)
        size = unpack(">Q", size_data)[0]

        received_data = b""
        chunk_size = 1024

        # Receive message in chunks
        n_chunks = 0
        while size > 0:
            n_chunks += 1
            chunk, _ = self.client_socket.recvfrom(min(size, chunk_size))
            if not chunk:
                break
            received_data += chunk
            size -= len(chunk)
        print("n_chunks", n_chunks)
        data_r = pickle.loads(received_data)
        return data_r

    def send(self, var):
        pickled_data = pickle.dumps(var)
        self.client_socket.sendto(pickled_data, (self.server_host, self.server_port))

    def start_client(self):
        self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        print(f"UDP Client connected to {self.server_host}:{self.server_port}")

    def await_trigger(self):
        while True:
            size = int(input("Array length? (n x 3) : ")) # User inputs integer           
            self.send(size)
            rec = self.receive()
            print("received shape:", rec.shape)

        self.client_socket.close()

if __name__ == "__main__":
    TestUDPClient()

这是在 C: 上运行的数据包嗅探器

from scapy.all import sniff, IP, TCP, UDP, ICMP
from struct import pack, unpack
import pickle
import numpy as np
import time

class PortMirror():
    def __init__(self):
        self.source_ip = '192.168.0.101'
        self.size = 0
        self.n_packets = 0
        self.data = b''
        self.array = None

        sniff(iface='enp1s0', prn=self.packet_callback, store=0, filter='udp')


    def packet_callback(self, packet):
        if packet.haslayer('IP') and packet.haslayer('UDP'): # FIlter for IP/UDP
            src_ip = packet['IP'].src
            dst_ip = packet['IP'].dst
            src_port = packet['UDP'].sport
            dst_port = packet['UDP'].dport

            if src_port!=22 and dst_port!=22 and src_ip==self.source_ip: # Filter SSH and source ip


                if len(bytes(packet['Raw'].load))==8:
                    self.size = unpack('>Q', bytes(packet['Raw'].load))[0]
                    self.n_packets = 0
                    self.data = b''

                elif len(self.data) < self.byte_size:
                    self.n_packets += 1
                    print("n_packets", self.n_packets)
                    self.data += bytes(packet['UDP'].payload)
                    
                    if len(self.data) == self.byte_size:
                        self.array = pickle.loads(self.data)
                        print("Success.", self.array.shape)
                        self.n_packets = 0
                        self.byte_size = 0

if __name__ == "__main__":
    PortMirror()
udp ethernet packet-sniffers packet-capture mirroring
1个回答
0
投票

您可能已经自己解决了这个问题,但对于其他人来说,有一个解决方案:UDP 套接字中有不同的缓冲区大小,这是系统在代码处理包之前在内部使用的缓冲区大小。如果数据包莫名其妙地丢失,则该缓冲区可能溢出,因此请尝试以下操作:

self.__server_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
#set the overall buffer to 64MB - this should help prevent the droppage of packets
UDP_buffer_size = 1024*1024*64
self.__server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, UDP_buffer_size)

您可以将该缓冲区设置为 64MB 以外的大小,极端情况可能需要更多。

但要注意:您仍然可能会溢出该缓冲区并丢弃数据包 - 这可能只会延迟几秒钟。这实际上是测试情况是否如此的好方法。如果是这样,那么您的数据包接收和处理需要更加高效并且运行得更快;您可能需要使用多重处理,并让一个进程接收数据并执行所需的最少处理,另一个进程执行其余的处理。

© www.soinside.com 2019 - 2024. All rights reserved.