这是我创建的代码,用于通过和 http api 将 sql 响应返回为 json,使用 flask 创建 api 和 pyodbc 以连接 sql db。 api 按预期工作,但是当负载增加时,api 响应速度减慢到 20-30 秒,对于单个请求它只需要 250 毫秒,有什么方法可以优化代码以在高负载期间获得更好的性能
import pyodbc
import json
import datetime
import ast
from flask import Flask, jsonify
from flask import Flask
from flask_caching import Cache
cache = Cache(config={'CACHE_TYPE': 'simple'})
# Set connection parameters
server = 'fff.database.windows.net'
database = 'fffff'
username = 'dfdfd'
password = 'dgdgdgdg$#'
port = '1433'
# Create a connection string
conn_string = f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server},{port};DATABASE={database};UID={username};PWD={password}'
app = Flask(__name__)
@app.route('/get_data/<Myuser>')
def get_data(Myuser):
# Connect to the database
conn = pyodbc.connect(conn_string)
# Retrieve idu value
cursor = conn.cursor()
cursor.execute(f"SELECT idu FROM indirect.usr WHERE cn='{Myuser}'")
row = cursor.fetchone()
if row is not None:
idu = row[0]
else:
raise Exception("No matching idu found for the specified cn.")
# Retrieve data from various tables
if idu is not None:
queries = [
("SELECT * FROM indirect.usr WHERE idu='%s'", {}),
("SELECT STRING_AGG(dddDGID, ',') as dddDGID from indirect.usr_dddDGID WHERE idu='%s' GROUP BY idu", {}),
("SELECT STRING_AGG(dddKIM, ',') as dddKIM from indirect.usr_dddKIM WHERE idu='%s' GROUP BY idu", {}),
("SELECT STRING_AGG(description, ',') as description from indirect.usr_description WHERE idu='%s' GROUP BY idu", {}),
("select STRING_AGG(MyOrgBrands, ',') as MyOrgBrands from [indirect].[usr_MyOrgBrands] WHERE idu='%s' GROUP BY idu", {}),
("select STRING_AGG(MyOrgCustomerPrograms, ',') as MyOrgCustomerPrograms from [indirect].[usr_MyOrgCustomerPrograms] WHERE idu='%s' GROUP BY idu", {}),
("select STRING_AGG(MyOrgDealerPrograms, ',') as MyOrgDealerPrograms from [indirect].[usr_MyOrgDealerPrograms] WHERE idu='%s' GROUP BY idu", {}),
("select STRING_AGG(MyOrgDealerTypes, ',') as MyOrgDealerTypes from [indirect].[usr_MyOrgDealerTypes] WHERE idu='%s' GROUP BY idu", {}),
("select STRING_AGG(MyOrgDistributorPrograms, ',') as MyOrgDistributorPrograms from [indirect].[usr_MyOrgDistributorPrograms] WHERE idu='%s' GROUP BY idu", {}),
("select STRING_AGG(MyOrgPilotPrograms, ',') as MyOrgPilotPrograms from [indirect].[usr_MyOrgPilotPrograms] WHERE idu='%s' GROUP BY idu", {}),
("select STRING_AGG(MyuserAdditionalBusinessFunctionalRoles, ',') as MyuserAdditionalBusinessFunctionalRoles from [indirect].[usr_MyuserAdditionalBusinessFunctionalRoles] WHERE idu='%s' GROUP BY idu", {}),
("select STRING_AGG(MyuserApplicationRoles, ',') as MyuserApplicationRoles FROM [indirect].[usr_MyuserApplicationRoles] WHERE idu ='%s' GROUP BY idu", {}),
("select STRING_AGG(MyuserVisibilities, ',') as MyuserVisibilities from [indirect].[usr_MyuserVisibilities] WHERE idu='%s' GROUP BY idu", {}),
("select STRING_AGG(l, ',') as l from [indirect].[usr_l] WHERE idu='%s' GROUP BY idu", {}),
("select STRING_AGG(mail, ',') as mail from [indirect].[usr_mail] WHERE idu='%s' GROUP BY idu", {}),
("select STRING_AGG(objectClass, ',') as objectClass from [indirect].[usr_objectClass] WHERE idu='%s' GROUP BY idu", {}),
("select STRING_AGG(postalCode, ',') as postalCode from [indirect].[usr_postalCode] WHERE idu='%s' GROUP BY idu", {}),
("select STRING_AGG(st, ',') as st from [indirect].[usr_st] WHERE idu='%s' GROUP BY idu", {}),
("select STRING_AGG(street, ',') as street from [indirect].[usr_street] WHERE idu='%s' GROUP BY idu", {}),
("select STRING_AGG(telephoneNumber, ',') as telephoneNumber from [indirect].[usr_telephoneNumber] WHERE idu='%s' GROUP BY idu", {})
]
results = []
for query, default_row in queries:
cursor.execute(query % idu)
rows = cursor.fetchall()
result = [dict(zip([column[0] for column in cursor.description], row)) for row in rows] if rows else [dict(zip([column[0] for column in cursor.description], [None]*len(cursor.description)))]
results.append(result)
#print(result)
for result in results:
for row in result:
for key, value in row.items():
if isinstance(value, datetime.datetime):
row[key] = value.isoformat()
json_data = json.dumps(results)
#print(json_data)
values = [item for lst in results for item in lst]
json_data1 = json.dumps(values)
#print(json_data1)
data = json.loads(json_data1)
# Merge all dictionaries into a single dictionary
merged_dict = {}
for dictionary in data:
merged_dict.update(dictionary)
# Convert merged dictionary to JSON
merged_json = json.dumps(merged_dict)
conn.close()