我正在尝试将数据从 C++ 代码连续发送到 Python 代码。我使用udp套接字发送数据。由于它是简单的传感器代码,因此发送速率比接收速率更快。所以发送的数据会累积在socket中。当我尝试读取数据时,它返回旧数据。如何从socket读取最新数据或者在发送新数据时删除旧数据?
如何从socket读取最新数据或删除旧数据 新数据何时发送?
从套接字读取一个数据包并将其放入缓冲区。继续从套接字读取数据包,每次将每个数据包放入缓冲区(替换之前缓冲区中的任何数据包数据),直到没有更多数据可供读取 - 非阻塞 I/O 模式对于以下情况很有用这是因为,当套接字的传入数据缓冲区中的数据用完时,非阻塞
recv()
将抛出一个带有代码 EWOULDBLOCK
的 socket.error 异常。读取所有数据后,缓冲区中剩下的都是最新数据,因此请继续使用该数据。
草图/示例代码如下(未经测试,可能包含错误):
sock = socket.socket(family, socket.SOCK_DGRAM)
[... bind socket, etc... ]
# Receive-UDP-data event-loop begins here
sock.setblocking(False)
while True:
newestData = None
keepReceiving = True
while keepReceiving:
try:
data, fromAddr = sock.recvfrom(2048)
if data:
newestData = data
except socket.error as why:
if why.args[0] == EWOULDBLOCK:
keepReceiving = False
else:
raise why
if (newestData):
# code to handle/parse (newestData) here
来自发布者的数据进入缓冲区,较慢的订阅者以先进先出的方式逐条读取缓冲区。要使其充当后进先出(也许这个描述不准确,因为在您的情况下我们只关心最后的数据),您可以使用
asyncio
create_datagram_endpoint()
修改 UDP 协议,以便订阅者清除当前缓冲区队列并从新队列接收最新数据。
以下是一个示例,我在 macOS 11.4 上使用 Python 3.9.6 进行了测试。
UdpProtocol.py是自定义的UDP协议对象。
import asyncio
class UdpProtocol:
def __init__(self):
self.packets = asyncio.Queue()
def connection_made(self, transport):
print("connection made")
def datagram_received(self, data, addr):
# clear the current queue and the accumulated data
self.packets._queue.clear()
# put latest data to the queue
self.packets.put_nowait((data, addr))
def connection_lost(self, transport):
print("connection lost")
def error_received(self, exc):
pass
async def recvfrom(self):
# get data from the queue
return await self.packets.get()
这是出版商。
import asyncio
from UdpProtocol import UdpProtocol
async def main():
server_address = ("127.0.0.1", 8000)
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
UdpProtocol, local_addr=None, remote_addr=server_address)
idx = 0
while True:
transport.sendto(str(idx).encode(), server_address)
print(idx)
idx += 1
await asyncio.sleep(0.1)
if __name__ == "__main__":
asyncio.run(main())
这是订阅者。
import asyncio
from UdpProtocol import UdpProtocol
async def main():
server_address = ("127.0.0.1", 8000)
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
UdpProtocol, local_addr=server_address, remote_addr=None)
while True:
data, addr = await protocol.recvfrom()
print(data, addr)
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
我使用了[这里]的想法(在Python中读取UDP数据包的缓冲区大小)并调整python套接字中的接收缓冲区。还有一些需要注意的其他要点也在线程中进行了讨论。
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
这对我的申请很有效。至于我的应用程序,我将数据从LabVIEW发送到python,并且数据不应该累积。就我而言,丢失数据包不会影响我的应用程序,因此我基本上减少了接收缓冲区并使套接字超时以执行此操作。
收到数据包后,请阅读该数据包并发回确认。让发送方等到收到确认后才发送下一个数据包。这样就可以防止接收端数据包的堆积。