在 Python 中将 WebSocket 与 Celery 和 FastAPI 集成

问题描述 投票:0回答:1

我正在将 WebSocket 与 Python 中的 Celery 和 FastAPI 集成,但面临 AttributeError 和 EncodeError 等问题。需要帮助解决这些错误以实现无缝集成。感谢任何建议或指导。

@user_router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    connection = await connect_robust("amqp://guest:guest@localhost//")
    async with connection:
        channel = await connection.channel()
        queue = await channel.declare_queue('websocket_queue', durable=True)

        async for message in queue:
            data = message.body.decode('utf-8')
            print("Received message:", data)
            await websocket.send_text(data)
            await message.ack()
def sync_send_websocket_message(message):
    try:
        print("Sending message:", message)
        connection = asyncio.get_event_loop().run_until_complete(connect_robust("amqp://guest:guest@localhost//"))
        channel = asyncio.get_event_loop().run_until_complete(connection.channel())
        asyncio.get_event_loop().run_until_complete(channel.default_exchange.publish(
            message=message,
            routing_key='websocket_queue'
        ))
    except Exception as e:
        print("Error sending WebSocket message:", e)
        raise e

@celery.task
def send_websocket_message(message):
    sync_send_websocket_message(message)
@user_router.post("/testing_websocket")
def websocket_testing_route(input: dict):
    try:
        task.send_websocket_message.delay("Hello from Celery!")

        return {"message": input}
    except Exception as e:
        logger.error("Error occurred in websocket_testing_route: %s", e)
        raise HTTPException(status_code=500, detail="Internal Server Error")
websocket rabbitmq celery python-asyncio fastapi
1个回答
0
投票
def sync_send_websocket_message(message):
    try:
        print("Sending message:", message)
        connection = asyncio.get_event_loop().run_until_complete(connect_robust("amqp://guest:guest@localhost//"))
        channel = asyncio.get_event_loop().run_until_complete(connection.channel())
        message_body = json.dumps({"message": message})  # Convert message to JSON string
        asyncio.get_event_loop().run_until_complete(channel.default_exchange.publish(
            Message(body=message_body.encode()),  # Ensure message has a body attribute
            routing_key='websocket_queue'
        ))
    except Exception as e:
        print("Error sending WebSocket message:", e)
        raise e

@celery.task
def send_websocket_message(message):
    sync_send_websocket_message(message)

@user_router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    connection = await connect_robust("amqp://guest:guest@localhost//")
    async with connection:
        channel = await connection.channel()
        queue = await channel.declare_queue('websocket_queue', durable=True)

        async for message in queue:
            data = message.body.decode('utf-8')
            print("Received message:", data)
            await websocket.send_text(data)
            await message.ack()


@user_router.post("/testing_websocket")
def websocket_testing_route(input: dict):
    try:
        task.send_websocket_message.delay("Hello from Celery!")

        return {"message": input}
    except Exception as e:
        logger.error("Error occurred in websocket_testing_route: %s", e)
        raise HTTPException(status_code=500, detail="Internal Server Error")

补充说明:

  • 确保 Celery 工作环境可以通过网络访问 WebSocket 服务器。
  • 验证 WebSocket 服务器正在运行并且可以从 Celery 工作环境访问。
  • 检查 WebSocket 服务器日志是否有任何指示连接问题的错误或警告。

通过遵循这些准则并进行必要的调整,您可以在 Python 应用程序中将 WebSocket 通信与 Celery 和 FastAPI 无缝集成。

© www.soinside.com 2019 - 2024. All rights reserved.