我有一个数据库框架,在另一个 Flask 应用程序中用作包。它有几个文件,例如 db_framework.py (这就像入口点,包含可以通过导入此包使用的函数)、包含表/模型(其 SQLAlchemy)的 BiosCluster.py 文件,然后是 utils.py包含实用函数,如 insert_data 和 insert_validated data 。 现在,我尝试通过将其导入到我的 Flask 应用程序中来使用该包,并尝试通过创建端点来使用其中的方法。 现在我尝试插入一些数据,发送 POST 请求和正文,我在邮递员和邮递员控制台中收到 200 OK 以及数据已成功插入(此行位于发布的 api.py 代码中)但是在在 splunk 日志的 db 端,我看到“数据插入失败”(这在我发布的 utils.py 文件中的记录器消息之一中)。我无法弄清楚出了什么问题
db_framework.py:
from sqlalchemy import func
from sqlalchemy.orm import sessionmaker
import pandas as pd
import logging
import datetime
from .bios_cluster import *
from . import utils
logging.basicConfig(format='%(asctime)s %(filename)s %(funcName)s %(lineno)d %(message)s', filename='example.log', level=logging.DEBUG)
class DbPipeline():
def __init__(self,engine):
self.engine = engine
self.session = sessionmaker(bind=self.engine)
def insert_to_cluster(self, df: pd.DataFrame):
table_name = BiosCluster
df_json = df.to_dict(orient='records')
is_data_valid = validate_insert_data_bios_cluster(df_json, table_name, self.session)
utils.insert_validated_data(is_data_valid, self.session, table_name, df_json)
BiosCluster.py:
from sqlalchemy import Column, DateTime, Integer, String, Text, Sequence, Float, BigInteger, create_engine, Boolean, func, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
import sqlalchemy as db
import getpass
import logging
from . import constants
logger = logging.getLogger(__name__)
Base = declarative_base()
def get_current_user():
return getpass.getuser()
class BiosCluster(Base):
__tablename__ = 'bios_cluster'
__table_args__ = {"schema": "something"}
cluster_label_cd = Column(Integer, primary_key=True)
cluster_name = Column(String)
cluster_display_name = Column(String)
client_market = Column(String)
is_active = Column(Boolean)
created_dt = Column(DateTime, default=func.now())
created_by = Column(String, default = get_current_user)
updated_dt = Column(DateTime)
updated_by = Column(String)
# Validate data to insert in Table
def validate_insert_data_bios_cluster(data, table_name, session):
session = session()
query = db.select([table_name])
print(f'query is = {query}')
bios_cluster_data = session.execute(query).fetchall()
print(f'smartbios cluster data is printed successfully')
# Check for cluster_label_cd unique primary key
cluster_label_cd = [smartbio_cluster_row.cluster_label_cd for smartbio_cluster_row in smartbios_cluster_data]
cluster_label_cd = set(cluster_label_cd)
cluster_label_cd_to_insert = {dd['cluster_label_cd'] for dd in data}
# Check for cluster name exist with the same name
cluster_display_name = [
bio_cluster_row.cluster_display_name
for bio_cluster_row in bios_cluster_data if bio_cluster_row.is_active is True
]
cluster_display_name = set(cluster_display_name)
cluster_display_name_to_insert = {dd['cluster_display_name'] for dd in data}
if cluster_label_cd_to_insert.isdisjoint(cluster_label_cd):
if cluster_display_name_to_insert.isdisjoint(cluster_display_name):
return True
cluster_display_name_exist = cluster_display_name_to_insert.intersection(cluster_display_name)
logger.error(f"Following cluster display name already exist in the DB: {cluster_display_name_exist}")
return False
cluster_label_cd_exist = cluster_label_cd_to_insert.intersection(cluster_label_cd)
logger.error(f"Following cluster label cd are already exist : {cluster_label_cd_exist}")
return False
utils.py:
from .bios_cluster import *
import logging
logger = logging.getLogger(__name__)
def insert_data(session, table_name, data):
data_inserted = False
session = session()
try:
with session.begin():
data_list = []
for d in data:
data_list.append(table_name(**d))
session.add_all(data_list)
session.commit()
data_inserted = True
except Exception as e:
logger.error(f"Data Insertion failed in table : {table_name} {e}")
return data_inserted
def insert_validated_data(is_data_valid, session, table_name, df_json):
if is_data_valid:
is_data_inserted = insert_data(session, table_name, df_json)
if is_data_inserted:
logger.info(f"Data got inserted successfully in the table {table_name}")
else:
logger.error(f"Data insertion failed in the table : {table_name}")
else:
logger.error(f"Some mismatch in the input data of {table_name}")
这是我试图用来访问此包的功能和实用程序的 api,api.py
@restAPI.route("/api/biosdbfw",methods=['POST','PUT'])
def biosdbfw_operations():
import pandas as pd
# db_object = DbPipeline()
try:
logger.info("bios accessed")
engine = get_scheduling_db_engine()
db_object = DbPipeline(engine=engine)
# db_object.engine = engine.connect()
logger.info("DB connection line passed")
data = request.json
print(data)
logger.info("Data Printed")
operation = request.args.get('operation')
print(operation)
if operation == 'insertToBiosCluster':
df = pd.DataFrame([data])
db_object.insert_to_bios_cluster(df)
logger.info("Data Inserted...")
return Response(json.dumps("Data Inserted into bios_cluster successfully "), status=200, mimetype="application/json")
else:
return Response(json.dumps("Invalid Operation"), status=400, mimetype="application/json")
# return jsonify({'error': 'Invalid operation'}),400
except Exception as e:
logger.exception(str(e))
return Response(json.dumps("Internal Server Error Occured"), status=500, mimetype="application/json")
在 utils.py 中,记录了异常,但上游没有引发或返回错误。