我在邮递员上收到 200 OK,但在 Splunk 日志中收到数据插入失败,为什么会发生这种情况?

问题描述 投票:0回答:1

我有一个数据库框架,在另一个 Flask 应用程序中用作包。它有几个文件,例如 db_framework.py (这就像入口点,包含可以通过导入此包使用的函数)、包含表/模型(其 SQLAlchemy)的 BiosCluster.py 文件,然后是 utils.py包含实用函数,如 insert_data 和 insert_validated data 。 现在,我尝试通过将其导入到我的 Flask 应用程序中来使用该包,并尝试通过创建端点来使用其中的方法。 现在我尝试插入一些数据,发送 POST 请求和正文,我在邮递员和邮递员控制台中收到 200 OK 以及数据已成功插入(此行位于发布的 api.py 代码中)但是在在 splunk 日志的 db 端,我看到“数据插入失败”(这在我发布的 utils.py 文件中的记录器消息之一中)。我无法弄清楚出了什么问题

db_framework.py:

from sqlalchemy import func
from sqlalchemy.orm import sessionmaker
import pandas as pd
import logging
import datetime
from .bios_cluster import *
from . import utils

logging.basicConfig(format='%(asctime)s %(filename)s %(funcName)s %(lineno)d %(message)s', filename='example.log', level=logging.DEBUG)
class DbPipeline():
    def __init__(self,engine):
        self.engine = engine
        self.session = sessionmaker(bind=self.engine)
    def insert_to_cluster(self, df: pd.DataFrame):
        table_name = BiosCluster
        df_json = df.to_dict(orient='records')
        is_data_valid = validate_insert_data_bios_cluster(df_json, table_name, self.session)
        utils.insert_validated_data(is_data_valid, self.session, table_name, df_json)

BiosCluster.py:

from sqlalchemy import Column, DateTime, Integer, String, Text, Sequence, Float, BigInteger, create_engine, Boolean, func, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
import sqlalchemy as db
import getpass
import logging
from . import constants
logger = logging.getLogger(__name__)
Base = declarative_base()
def get_current_user():
    return getpass.getuser()
class BiosCluster(Base):
    __tablename__ = 'bios_cluster'
    __table_args__ = {"schema": "something"}

    cluster_label_cd = Column(Integer, primary_key=True)
    cluster_name = Column(String)
    cluster_display_name = Column(String)
    client_market = Column(String)
    is_active = Column(Boolean)
    created_dt = Column(DateTime, default=func.now())
    created_by = Column(String, default = get_current_user)
    updated_dt = Column(DateTime)
    updated_by = Column(String)

# Validate data to insert in Table
def validate_insert_data_bios_cluster(data, table_name, session):
    session = session()
    query = db.select([table_name])
    print(f'query is = {query}')
    bios_cluster_data = session.execute(query).fetchall()
    print(f'smartbios cluster data is printed successfully')
    # Check for cluster_label_cd unique primary key
    cluster_label_cd = [smartbio_cluster_row.cluster_label_cd for smartbio_cluster_row in smartbios_cluster_data]
    cluster_label_cd = set(cluster_label_cd)
    cluster_label_cd_to_insert = {dd['cluster_label_cd'] for dd in data}
    # Check for cluster name exist with the same name
    cluster_display_name = [
        bio_cluster_row.cluster_display_name
        for bio_cluster_row in bios_cluster_data if bio_cluster_row.is_active is True
        ]
    cluster_display_name = set(cluster_display_name)
    cluster_display_name_to_insert = {dd['cluster_display_name'] for dd in data}
    if cluster_label_cd_to_insert.isdisjoint(cluster_label_cd):
        if cluster_display_name_to_insert.isdisjoint(cluster_display_name):
            return True
        cluster_display_name_exist = cluster_display_name_to_insert.intersection(cluster_display_name)
        logger.error(f"Following cluster display name already exist in the DB: {cluster_display_name_exist}")
        return False
    cluster_label_cd_exist = cluster_label_cd_to_insert.intersection(cluster_label_cd)
    logger.error(f"Following cluster label cd are already exist : {cluster_label_cd_exist}")
    return False

utils.py:

from .bios_cluster import *

import logging
logger = logging.getLogger(__name__)
def insert_data(session, table_name, data):
    data_inserted = False
    session = session()
    try:
        with session.begin():
            data_list = []
            for d in data:
                data_list.append(table_name(**d))
            session.add_all(data_list)
        session.commit()
        data_inserted = True
    except Exception as e:
        logger.error(f"Data Insertion failed in table : {table_name} {e}")
    return data_inserted

def insert_validated_data(is_data_valid, session, table_name, df_json):
    if is_data_valid:
        is_data_inserted = insert_data(session, table_name, df_json)
        if is_data_inserted:
            logger.info(f"Data got inserted successfully in the table {table_name}")
        else:
            logger.error(f"Data insertion failed in the table : {table_name}")
    else:
        logger.error(f"Some mismatch in the input data of {table_name}")

这是我试图用来访问此包的功能和实用程序的 api,api.py

@restAPI.route("/api/biosdbfw",methods=['POST','PUT'])
def biosdbfw_operations():
   import pandas as pd
   # db_object = DbPipeline()
   try:
      logger.info("bios accessed")
      engine = get_scheduling_db_engine()
      db_object = DbPipeline(engine=engine)
      # db_object.engine = engine.connect()
      logger.info("DB connection line passed")
      data = request.json
      print(data)
      logger.info("Data Printed")
      operation = request.args.get('operation')
      print(operation)
      if operation == 'insertToBiosCluster':
         df = pd.DataFrame([data])
         db_object.insert_to_bios_cluster(df)
         logger.info("Data Inserted...")
         return Response(json.dumps("Data Inserted into bios_cluster successfully "), status=200, mimetype="application/json")
    else:
       return Response(json.dumps("Invalid Operation"), status=400, mimetype="application/json")
         # return jsonify({'error': 'Invalid operation'}),400
   except Exception as e:
      logger.exception(str(e))
      return Response(json.dumps("Internal Server Error Occured"), status=500, mimetype="application/json")

python api flask sqlalchemy postman
1个回答
0
投票

在 utils.py 中,记录了异常,但上游没有引发或返回错误。

© www.soinside.com 2019 - 2024. All rights reserved.