我需要知道如何管理多个连接。所以我正在制作一个黑客马拉松聊天机器人。最初,dialogflow 检测意图,然后将请求发送到 fastapi 后端,该后端向我返回fulfillmentText。 (fastapi后端向mysql发送请求返回一些信息)。这是我的代码的一部分 elif 意图 == 'isHackathonPaid': #rill_name='WUEBOTDEMODRILL' 尝试: conn = connect_to_sql() 光标 = conn.cursor()

            query_paid_status = "SELECT is_drill_paid FROM view_drills WHERE drill_cust_url = %s"
            cursor.execute(query_paid_status, (drill_cust_url,))
            paid_status = cursor.fetchone()

            if not paid_status:
                return JSONResponse(content={'fulfillmentText': "Sorry, this information is not mentioned by the organizers"})

            paid_status = paid_status[0]

            if paid_status == 1:
                return JSONResponse(content={'fulfillmentText': f"Yes,{drill_cust_url} is paid."})
            elif paid_status == 0:
                return JSONResponse(content={'fulfillmentText': f"No, {drill_cust_url} is not paid."})
                return JSONResponse(content={'fulfillmentText': "Sorry, this information is not present in db."})

        except Exception as e:
            return JSONResponse(content={'fulfillmentText': f"Error: {e}"})
    elif intent == 'phase_info':
            phases, error = get_phases(drill_cust_url)

            if error:
                return JSONResponse(content={'fulfillmentText': error})
            if not phases:
                return JSONResponse(content={'fulfillmentText': f"No phases found for hackathon {drill_cust_url}"})
            phase_info = ""
            for phase in phases:
                phase_name, phase_desc, phase_mode, phase_type, phase_start_dt, phase_end_dt, phase_judgement_start_dt, phase_judgement_end_dt = phase
                phase_info += f"Phase Name: {phase_name}\n"
                if phase_desc is not None:
                    phase_info += f"Description: {phase_desc}\n"
                if phase_mode is not None:
                    phase_info += f"Mode: {phase_mode}\n"
                if phase_type is not None:
                    phase_info += f"Type: {phase_type}\n"
                if phase_start_dt is not None:
                    phase_info += f"Start Date: {phase_start_dt}\n"
                if phase_end_dt is not None:
                    phase_info += f"End Date: {phase_end_dt}\n"
                if phase_judgement_start_dt is not None:
                    phase_info += f"Judgement Start Date: {phase_judgement_start_dt}\n"
                if phase_judgement_end_dt is not None:
                    phase_info += f"Judgement End Date: {phase_judgement_end_dt}\n\n"
            # Remove the last two newline characters from the response if they exist
            phase_info = phase_info.rstrip('\n')
            phase_info = phase_info[:-2]

            return JSONResponse(content={'fulfillmentText': f"The phases for {drill_cust_url} hackathon are: \n{phase_info}"})

        except Exception as e:
            return JSONResponse(content={'fulfillmentText': f"An error occurred while fetching phase information: {str(e)}"})

    elif intent == 'team_Size':
            # drill_name = payload['queryResult']['parameters']['hackathon_name']
            # drill_name='WUEBOTDEMODRILL'
            team_size, error = get_team_size(drill_cust_url)

            if error:
                return JSONResponse(content={'fulfillmentText': error})
            if not team_size:
                return JSONResponse(content={'fulfillmentText': f"Team size information not available for hackathon {drill_cust_url}"})
            min_team_size = team_size.get('min')
            max_team_size = team_size.get('max')

            return JSONResponse(content={'fulfillmentText': f"The team size for {drill_cust_url} hackathon is from {min_team_size} to {max_team_size} members."})

        except Exception as e:
            return JSONResponse(content={'fulfillmentText': f"An error occurred while fetching team size information: {str(e)}"})
    elif intent == 'collaboratorsInfo':
        from bs4 import BeautifulSoup
            collaborators, error = get_collaborators(drill_cust_url)

            if error:
                return JSONResponse(content={'fulfillmentText': error})
            if not collaborators:
                return JSONResponse(content={'fulfillmentText': f"No collaborators found for hackathon {drill_cust_url}"})
            collaborator_info = ""
            for collaborator in collaborators:
                collaborator_name, collaborator_type, collaborator_desc, collaborator_email, collaborator_social_links = collaborator
                # Removing HTML tags from collaborator description
                collaborator_desc_text = BeautifulSoup(collaborator_desc, "html.parser").get_text()
                collaborator_info += f"Name: {collaborator_name}\n"
                collaborator_info += f"Type: {collaborator_type}\n"
                collaborator_info += f"Description: {collaborator_desc_text}\n"
                collaborator_info += f"Email: {collaborator_email}\n"
                collaborator_info += f"Social Links: {collaborator_social_links}\n\n"
            # Remove the last two newline characters and add proper spacing
            collaborator_info = collaborator_info.rstrip('\n\n')
            collaborator_info = collaborator_info.replace('\n\n', '\n\n\n')

            return JSONResponse(content={'fulfillmentText': f"The collaborators for {drill_cust_url} hackathon are: \n\n{collaborator_info}"})

        except Exception as e:
            return JSONResponse(content={'fulfillmentText': f"An error occurred while fetching collaborator information: {str(e)}"})

我想知道是否有更好的方法来代替每次打开和关闭一个新的 SQL

我期待一种比打开和关闭每个连接更好的方法。我的意思是我读过有关池化的内容,但我不知道我读到了您需要指定的池化大小,但我不知道很多。有人可以帮我汇总一下,汇总一下大小之类的东西吗? 还有使用 sqlconnector、sqlalchemy 或 aiomysql 的库。参考生产就绪代码。我需要部署它。

database-connection fastapi dialogflow-es python-sql


您可以使用 sqlalchemy


以下是如何使用 sqlalchemy 实现连接池的基本示例:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# Create a database engine with connection pooling
engine = create_engine('mysql://user:password@host/database', pool_size=10, max_overflow=20)

Session = sessionmaker(bind=engine)

session = Session()
result = session.execute("SELECT * FROM table")


在此示例中,pool_size 指定池中保留的连接数,max_overflow 指定池耗尽时可以创建的最大附加连接数。

