如何有效地将 Python FastAPI 与 Amazon Neptune 数据库的 Gremlin Python 集成?

问题描述 投票:0回答:1

具体来说,我正在寻找有关管理连接的指导,例如是否为每个请求打开和关闭连接,或者为整个应用程序使用单个连接。我知道,如果 WebSocket 连接未正确关闭,Neptune 将在 20-25 分钟后终止空闲连接。 此外,达到并发 WebSocket 连接限制会导致 HTTP 429 错误。我可以采取哪些策略来处理连接并避免这些问题?

或者 替代策略是使用 Lambda AWS Lambda 函数,每个函数专用于不同的任务和 API,使用 Gremlin Python 连接 Amazon Neptune 数据库。

我创建了一个示例应用程序

from fastapi import FastAPI, BackgroundTasks, Depends
import asyncio
import websockets
from gremlin_python.structure.graph import Graph
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
import queue
import time

app = FastAPI()

# Neptune Configuration
neptune_endpoint = 'your-neptune-endpoint'
neptune_port = 8182

# Define the minimum and maximum number of connections in the pool
min_connections = 5
max_connections = 20

# Connection timeout in seconds
connection_timeout = 15 * 60

# Create a queue object to store the connections
connection_pool = queue.Queue(maxsize=max_connections)

# Dictionary to track the usage status of connections
connection_usage = {}

# Get an event loop object
loop = asyncio.get_event_loop()

# Dependency to get a connection from the pool or create a new one if needed
async def get_connection():
    try:
        # Try to get a connection from the queue
        ws = connection_pool.get(block=False)
        connection_usage[ws] = True  # Mark the connection as in use
        return ws
    except queue.Empty:
        # If the queue is empty, create a new connection
        ws = await create_connection()
        connection_usage[ws] = True  # Mark the connection as in use
        return ws

# Dependency to return a connection to the pool or mark it as available
async def return_connection(ws):
    if ws in connection_usage:
        connection_usage[ws] = False  # Mark the connection as available
        try:
            # Put the connection back in the queue without blocking
            connection_pool.put(ws, block=False)
        except queue.Full:
            # If the queue is full, close the connection
            await close_connection(ws)

# Dependency to close a connection
async def close_connection(ws):
    if ws in connection_usage:
        await ws.close()
        del connection_usage[ws]

# Define a function to create a websocket connection with Neptune
async def create_connection():
    ws = await asyncio.wait_for(websockets.connect(f'wss://{neptune_endpoint}:{neptune_port}/gremlin'), timeout=15)
    connection_usage[ws] = True  # Mark the connection as in use
    return ws

# Define a function to execute a Gremlin query
async def execute_query(query: str):
    ws = await get_connection()
    remote_connection = DriverRemoteConnection(ws, 'g')
    try:
        g = Graph().traversal().withRemote(remote_connection)
        result = g.V().hasLabel(query).toList()  # Modify the query as needed
        return result
    finally:
        await return_connection(ws)

# Background task to periodically check and replace connections
async def check_connection_timeouts():
    while True:
        await asyncio.sleep(60)  # Check every 1 minute
        current_time = time.time()
        for ws, in_use in connection_usage.items():
            if not in_use and current_time - ws.creation_time > connection_timeout:
                await close_connection(ws)
                new_ws = await create_connection()
                connection_pool.put(new_ws)
                new_ws.creation_time = current_time

@app.on_event("startup")
async def startup_event():
    # Fill the connection pool with the minimum number of connections
    await fill_connection_pool()
    # Start the background task to check connection timeouts
    loop.create_task(check_connection_timeouts())

@app.get('/')
async def execute_gremlin_query(query: str):
    result = await execute_query(query)
    return {"result": result}

python amazon-web-services gremlin graph-databases amazon-neptune
1个回答
0
投票

一般来说,建议在应用程序中的线程之间共享连接池(本质上是

g
),而不是每次都打开和关闭新连接。您确实可以控制创建的连接池的大小以及分配给它的工作线程的数量。

正如您所提到的,在使用 Amazon Neptune 时,有时您可能需要重新创建连接;例如由于空闲而关闭,或者发生故障转移并且服务器的 IP 地址发生变化。

如果您使用 AWS Lambda 函数,每个 Lambda 函数都需要自己的连接,但可以在多次调用中维护该连接。

© www.soinside.com 2019 - 2024. All rights reserved.