zeromq 运行一段时间后失败

问题描述 投票:0回答:1

我当前运行一个流程以及一个 API。这两者需要进行通信,更具体地说是进程的 API。为此,我使用 Zeromq 的 Python 库。我的代码如下:

server.py

from zmq import POLLIN, ROUTER
from zmq.asyncio import Context, Poller
from zmq.auth.asyncio import AsyncioAuthenticator

    async def start(self):
        """Starts the zmq server asyncronously and handles incoming requests"""
        context = Context()

        auth = AsyncioAuthenticator(context)
        auth.start()
        auth.configure_plain(domain="*", passwords={"user": IPC_TOKEN})
        auth.allow("127.0.0.1")

        socket = context.socket(ROUTER)
        socket.plain_server = True
        socket.bind("tcp://*:5555")

        poller = Poller()
        poller.register(socket, POLLIN)

        while True:
            socks = dict(await poller.poll())

            if socket in socks and socks[socket] == POLLIN:
                message = await socket.recv_multipart()
                identity, request = message
                decoded = loads(request.decode())
                res = await getattr(self, decoded["route"])(decoded["data"])
                if res:
                    await socket.send_multipart([identity, dumps(res).encode()])
                else:
                    await socket.send_multipart([identity, b'{"status":"ok"}'])

client.py

from zmq import DEALER, POLLIN
from zmq.asyncio import Context, Poller

async def make_request(route: str, data: dict) -> dict:
    context = Context.instance()
    socket = context.socket(DEALER)
    socket.identity = uuid.uuid4().hex.encode('utf-8')
    socket.plain_username = b"user"
    socket.plain_password = IPC_TOKEN.encode("UTF-8")
    socket.connect("tcp://localhost:5555")

    request = json.dumps({"route": route, "data": data}).encode('utf-8')
    socket.send(request)

    poller = Poller()
    poller.register(socket, POLLIN)

    while True:
        events = dict(await poller.poll())
        if socket in events and events[socket] == POLLIN:
            multipart = json.loads((await socket.recv_multipart())[0].decode())
            socket.close()
            context.term()
            return multipart

这对于前几个请求来说工作得很好,但是在一定数量的请求之后,此代码会默默地失败(没有错误),并且我没有得到 IPC 响应。当我将使用此代码作为客户端的 API 卷曲到我的第二个进程时,请求超时。

我认为这是因为进程没有正确关闭或堵塞,但我不知道如何解决这个问题。在此代码运行一段时间后停止工作后,我在服务器上运行了 lsof 命令:

如何防止这些连接在一段时间后超时?

python rest zeromq hypercorn
1个回答
0
投票

使用

ROUTER
套接字,身份和实际消息之间有一个空帧:https://zguide.zeromq.org/docs/chapter3/#ROUTER-Broker-and-REQ-Workers

不确定为什么它会在前几次尝试中起作用,但我认为您需要更改代码才能执行以下操作:

identity, empty, request = message
...
if res:
    await socket.send_multipart([identity, b'', dumps(res).encode()])
else:
    await socket.send_multipart([identity, b'', b'{"status":"ok"}'])
© www.soinside.com 2019 - 2024. All rights reserved.