用python接收UDP包,丢包

问题描述 投票:0回答:1

我在 python 中使用 UDP 时遇到了很多数据包丢失的情况。我知道如果我不想丢包,我应该使用 TCP,但我对发送方没有(完全)控制权。

这是一台使用 UDP 多播每秒发送 15 张图像的相机。

下面你可以看到我现在编写的代码。 它使用多处理来允许生产者和消费者函数并行工作。生产者函数捕获数据包,消费者函数处理它们并将图像写入 .bmp 文件。

我编写了一个 PacketStream 类,它将字节从包写入 .bmp 文件。

相机发送新图像时,首先发送一个数据包,第一个字节 = 0x01。这包含有关图像的信息。 然后发送 612 个数据包,第一个字节 = 0x02。它们包含图像中的字节(508 字节/包)。

由于每秒发送 15 个图像,因此每秒发送约 9000 个数据包。尽管这种情况发生的速度更快,每个图像的突发速率约为 22 个数据包/毫秒。

我可以使用tcpdump或wireshark完美接收所有数据包。 但使用下面的代码,数据包会丢失。 我的 Windows 7 电脑应该能够处理这个问题吗?我还在树莓派 3 上使用它,并且或多或少丢失了相同数量的数据包。因此我认为这是代码的问题。

我尝试了很多不同的东西,比如用线程代替多处理,用管道代替队列。

我还尝试使用

增加套接字缓冲区
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 3000000)

无济于事。

这在Python中可能吗?

提前致谢,

import time
from multiprocessing import Process, Queue
import socket
import struct
from PIL import Image


class PacketStream:
    def __init__(self, output_path):
        self.output_path = output_path
        self.data_buffer = ''
        self.img_id = -1  # -1 = waiting for start of new image

    def process(self, data):
        message_id = data[0]
        if message_id == '\x01':
            self.wrap_up_last_image()
            self.img_id = ord(data[3])
            self.data_buffer = ''
        if message_id == '\x02':
            self.data_buffer += data[6:]

    def wrap_up_last_image(self):
        if self.img_id > 0:
            n_bytes = len(self.data_buffer)
            if n_bytes == 307200:
                global i
                write_image(self.output_path + str(i).zfill(7) + '_' + str(self.img_id).zfill(3) + '.bmp',
                            self.data_buffer)
                i += 1
            else:
                print 'Image lost: %s bytes missing.' % (307200 - n_bytes)


def write_image(path, data):
    im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1)
    im.save(path)
    print time.time(), path


def producer(q):
    # setup socket
    MCAST_GRP = '239.255.83.71'
    MCAST_PORT = 2271
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('', MCAST_PORT))
    mreq = struct.pack('4sl', socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
    while True:
        q.put(sock.recv(512))


def consumer(q):
    packet_stream = PacketStream('D:/bmpdump/')
    while True:
        data = q.get()
        packet_stream.process(data)

i = 0
if __name__ == '__main__':
    q = Queue()

    t1 = Process(target=producer, args=(q,))
    t1.daemon = True  # so they stop when the main prog stops
    t1.start()
    t2 = Process(target=consumer, args=(q,))
    t2.daemon = True
    t2.start()

    time.sleep(10.0)

    print 'Program finished.'

编辑

感谢所有的建议。

1)我已经尝试过线程+队列,还有''.join(),似乎没有太大区别。我现在很确定问题是生产者线程没有获得足够的优先级。我找不到如何使用 Python 来增加这个?这可能吗?

2) 使用下面的代码我只损失了大约 10%。处理器处于~25%(在树莓派上)关键是在数据包流暂停时消耗数据,即当最后一个数据包到达时

import time
import socket
import struct
from PIL import Image


def write_image(path, data):
    im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1)
    im.save(path)
    print time.time(), path

def consume(data_buffer):
    img_id = ord(data_buffer[0][1])
    real_data_buffer = [data[6:] for data in data_buffer]
    data_string = ''.join(real_data_buffer)

    global i
    write_image('/media/pi/exthdd_02/bmpdump/' + str(i).zfill(7) + '_' + str(img_id).zfill(3) + '.bmp', data_string)
    i += 1

def producer(sock):
    print 'Producer start'
    data_buffer = []
    while True:
        data = sock.recvfrom(512)[0]
        if data[0] == '\x01':
            data_buffer = []
        else:
            data_buffer.append(data)
        if len(data_buffer) == 612:
            consume(data_buffer)


# image counter
i = 0

# setup socket
MCAST_GRP = '239.255.83.71'
MCAST_PORT = 2271
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((MCAST_GRP, MCAST_PORT))
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 30000000)

producer(sock)
python sockets networking udp packet-loss
1个回答
0
投票

一些改进代码的建议,但首先有一个问题:您是否测量过可能会减慢速度的因素?例如,您是否查看过系统的 CPU 使用情况。如果达到 100%,则很可能是丢包的原因。如果它大部分时间都处于空闲状态,则说明还有其他问题发生,并且问题与代码的性能无关。

现在,一些改进代码的建议:

  • 处理 UDP 套接字时,使用
    socket.recvfrom
    代替
    sock.recv
  • 不要将多处理与进程一起使用,如果我们谈论每秒约 9000 次调用,则将数据从一个进程发送到另一个进程所必须发生的序列化很可能会成为性能瓶颈。尝试使用 threads
    threading
    +
    queue
    模块)。但由于您没有提供任何观察到的数字,所以很难说真的。
  • 在接收数据包时不要使用字符串连接来建立接收者的缓冲区。这会创建大量新的临时字符串对象并始终复制数据。相反,将每个数据包附加到一个列表中,当您收到所有数据时,
    "".join(packets)
    最后将它们全部放在一起。
© www.soinside.com 2019 - 2024. All rights reserved.