SQLAlchemy + oracledb/cx_Oracle 插入将时间戳截断为秒

问题描述 投票:0回答:1

我需要合并包含通常“pandas._libs.tslibs.timestamps.Timestamp”类型数据的数据帧。 这个df具有以下结构

my_test_date = pd.to_datetime(1674009901958454, unit='us')

df = pd.DataFrame(data = {'payment_id_pay': [1, 2],
                          'DT_START': [my_test_date, my_test_date],
                          'DT_END': [my_test_date, my_test_date]})

df 看起来像

自从我使用 pandas 以来,我使用 sqlalchemy 将其合并到我的 Oracle DB:

from sqlalchemy import create_engine, text

sql = f'''
MERGE INTO mytable
    USING dual
    ON (payment_id_pay = :1)
    WHEN MATCHED THEN UPDATE SET DT_START = :2, DT_END = :3
    WHEN NOT MATCHED THEN INSERT (payment_id_pay, DT_START, DT_END) 
    VALUES (:1, :2, :3)
'''
engine = create_engine(f'oracle+oracledb://login:pass@host:1521/?service_name=service_name')
with engine.connect() as conn:      
    conn.execute(text(sql), [dict(zip([f'{i+1}' for i in range(len(df.columns))], i)) for i in df.values.tolist()])
    conn.commit()
engine.dispose()

列表推导式是执行方法的参数

结果是,在 Oracle DB 中的表中,所有时间戳都被截断为秒,所有毫秒 = 000000

我在cx_Oracle github上发现了这个问题https://github.com/oracle/python-cx_Oracle/issues/161

由于我没有找到任何明确的文档,我可以在其中找到如何使用 SQLAlchemy 使用cursor.setinputsizes(cx_Oracle.TIMESTAMP),所以我按照本示例中的方式进行操作Python cx_Oracle Insert Timestamp With Milliseconds 我尝试了 3 种不同的方法:SQLAlchemy、纯 oracledb 和纯 cx_Oracle

第一个:按照 github 问题中推荐的纯 cx_Oracle:

import cx_Oracle

sql = f'''
MERGE INTO mytable
    USING dual
    ON (payment_id_pay = :1)
    WHEN MATCHED THEN UPDATE SET DT_START = :2, DT_END = :3
    WHEN NOT MATCHED THEN INSERT (payment_id_pay, DT_START, DT_END) 
    VALUES (:1, :2, :3)
'''
conn = cx_Oracle.connect(user=user, password=password, dsn=dsn)
conn.cursor().setinputsizes(None, cx_Oracle.TIMESTAMP, cx_Oracle.TIMESTAMP)
conn.cursor().executemany(sql, [dict(zip([f'{i+1}' for i in range(len(df.columns))], i)) for i in df.values.tolist()])
conn.commit()
conn.close()

第二:我尝试了相同的操作,但使用的是 oracledb 而不是 cx_Oracle:

import oracledb

sql = f'''
MERGE INTO mytable
    USING dual
    ON (payment_id_pay = :1)
    WHEN MATCHED THEN UPDATE SET DT_START = :2, DT_END = :3
    WHEN NOT MATCHED THEN INSERT (payment_id_pay, DT_START, DT_END) 
    VALUES (:1, :2, :3)
'''

conn = oracledb.connect(user=user, password=password, host="host", port=1521, service_name="name")
conn.cursor().setinputsizes(None, oracledb.TIMESTAMP, oracledb.TIMESTAMP)
conn.cursor().executemany(sql, [dict(zip([f'{i+1}' for i in range(len(df.columns))], i)) for i in df.values.tolist()])
conn.commit()
conn.close()

3d:我没有找到如何在SQLAlchemy中使用setinputsizes()方法,但我发现这个类和方法存在“method sqlalchemy.engine.interfaces.DBAPICursor.setinputsizes(sizes: Sequence[Any]) → None” 我尝试了以下方法:

import sqlalchemy
from sqlalchemy.engine.interfaces import DBAPICursor

sql = f'''
MERGE INTO mytable
   USING dual
   ON (payment_id_pay = :1)
   WHEN MATCHED THEN UPDATE SET DT_START = :2, DT_END = :3
   WHEN NOT MATCHED THEN INSERT (payment_id_pay, DT_START, DT_END) 
   VALUES (:1, :2, :3)
'''
engine = sqlalchemy.create_engine(f'oracle+oracledb://login:pass@host:1521/?service_name=service_name')
with engine.connect() as conn:   
   DBAPICursor.setinputsizes(oracledb.TIMESTAMP, oracledb.TIMESTAMP)
   conn.execute(text(sql), [dict(zip([f'{i+1}' for i in range(len(df.columns))], i)) for i in df.values.tolist()])
   conn.commit()
engine.dispose()

上面的所有代码都运行没有问题,并给了我绝对相同的结果:日期时间在我的数据库中被截断为秒

我还尝试将数据从 pandas Timestamp 转换为 datetime.datetime,没有任何改变

我使用:SQLAlchemy 2.0.25, oracledb 1.4.2, CX-Oracle 8.3.0

数据库:Oracle Database 19c 企业版版本 19.0.0.0.0 - 生产

如何在不转换的情况下插入数据? 谢谢!

python sqlalchemy cx-oracle oracle19c python-oracledb
1个回答
0
投票

对于文本 SQL,请使用

  • 参数占位符是有效的 Python 名称,例如
    p1
    而不是
    1
    ,
  • oracledb 游标,以及
  • oracledb 的
    .setinputsizes()
    声明日期时间/时间戳参数
import datetime

import oracledb
import sqlalchemy as sa

engine = sa.create_engine(
    "oracle+oracledb://scott:[email protected]/?service_name=xepdb1"
)

with engine.begin() as conn:
    sql = f"""\
    MERGE INTO mytable
        USING dual
        ON (payment_id_pay = :p1)
        WHEN MATCHED THEN UPDATE SET DT_START = :p2, DT_END = :p3
        WHEN NOT MATCHED THEN INSERT (payment_id_pay, DT_START, DT_END) 
        VALUES (:p1, :p2, :p3)
    """
    test_data = [
        {
            "p1": 1,
            "p2": datetime.datetime(2012, 1, 1, 12, 0, 0, 123000),
            "p3": datetime.datetime(2013, 1, 1, 12, 0, 0, 456000),
        },
        {
            "p1": 2,
            "p2": datetime.datetime(2022, 1, 1, 12, 0, 0, 123000),
            "p3": datetime.datetime(2023, 1, 1, 12, 0, 0, 456000),
        },
    ]
    cursor = conn.connection.cursor()
    cursor.setinputsizes(p2=oracledb.TIMESTAMP, p3=oracledb.TIMESTAMP)
    cursor.executemany(sql, test_data)
© www.soinside.com 2019 - 2024. All rights reserved.