避免udp套接字中积累数据或从udp套接字读取最新数据

问题描述 投票:0回答:4

我正在尝试将数据从 C++ 代码连续发送到 Python 代码。我使用udp套接字发送数据。由于它是简单的传感器代码,因此发送速率比接收速率更快。所以发送的数据会累积在socket中。当我尝试读取数据时,它返回旧数据。如何从socket读取最新数据或者在发送新数据时删除旧数据?

python sockets udp recvfrom
4个回答
2
投票

如何从socket读取最新数据或删除旧数据 新数据何时发送?

从套接字读取一个数据包并将其放入缓冲区。继续从套接字读取数据包,每次将每个数据包放入缓冲区(替换之前缓冲区中的任何数据包数据),直到没有更多数据可供读取 - 非阻塞 I/O 模式对于以下情况很有用这是因为,当套接字的传入数据缓冲区中的数据用完时,非阻塞

recv()
将抛出一个带有代码
EWOULDBLOCK
的 socket.error 异常。读取所有数据后,缓冲区中剩下的都是最新数据,因此请继续使用该数据。

草图/示例代码如下(未经测试,可能包含错误):

  sock = socket.socket(family, socket.SOCK_DGRAM)

  [... bind socket, etc... ]

  # Receive-UDP-data event-loop begins here
  sock.setblocking(False)
  while True:
     newestData = None

     keepReceiving = True
     while keepReceiving:
        try:
           data, fromAddr = sock.recvfrom(2048)
           if data:
              newestData = data
        except socket.error as why:
           if why.args[0] == EWOULDBLOCK:
              keepReceiving = False
           else:
              raise why

     if (newestData):
        # code to handle/parse (newestData) here

1
投票

来自发布者的数据进入缓冲区,较慢的订阅者以先进先出的方式逐条读取缓冲区。要使其充当后进先出(也许这个描述不准确,因为在您的情况下我们只关心最后的数据),您可以使用

asyncio
create_datagram_endpoint()
修改 UDP 协议,以便订阅者清除当前缓冲区队列并从新队列接收最新数据。

以下是一个示例,我在 macOS 11.4 上使用 Python 3.9.6 进行了测试。

UdpProtocol.py是自定义的UDP协议对象。

import asyncio
class UdpProtocol:
    def __init__(self):
        self.packets = asyncio.Queue()

    def connection_made(self, transport):
        print("connection made")

    def datagram_received(self, data, addr):
        # clear the current queue and the accumulated data
        self.packets._queue.clear()
        # put latest data to the queue
        self.packets.put_nowait((data, addr))

    def connection_lost(self, transport):
        print("connection lost")

    def error_received(self, exc):
        pass

    async def recvfrom(self):
        # get data from the queue
        return await self.packets.get()

这是出版商。

import asyncio
from UdpProtocol import UdpProtocol

async def main():
    server_address = ("127.0.0.1", 8000)
    loop = asyncio.get_running_loop()
    transport, protocol = await loop.create_datagram_endpoint(
        UdpProtocol, local_addr=None, remote_addr=server_address)

    idx = 0
    while True:
        transport.sendto(str(idx).encode(), server_address)
        print(idx)
        idx += 1
        await asyncio.sleep(0.1)

if __name__ == "__main__":
    asyncio.run(main())

这是订阅者。

import asyncio
from UdpProtocol import UdpProtocol

async def main():
    server_address = ("127.0.0.1", 8000)
    loop = asyncio.get_running_loop()
    transport, protocol = await loop.create_datagram_endpoint(
        UdpProtocol, local_addr=server_address, remote_addr=None)
    while True:
        data, addr = await protocol.recvfrom()
        print(data, addr)
        await asyncio.sleep(1)

if __name__ == "__main__":
    asyncio.run(main())

0
投票

我使用了[这里]的想法(在Python中读取UDP数据包的缓冲区大小)并调整python套接字中的接收缓冲区。还有一些需要注意的其他要点也在线程中进行了讨论。

sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)

这对我的申请很有效。至于我的应用程序,我将数据从LabVIEW发送到python,并且数据不应该累积。就我而言,丢失数据包不会影响我的应用程序,因此我基本上减少了接收缓冲区并使套接字超时以执行此操作。


0
投票

收到数据包后,请阅读该数据包并发回确认。让发送方等到收到确认后才发送下一个数据包。这样就可以防止接收端数据包的堆积。

© www.soinside.com 2019 - 2024. All rights reserved.