我需要合并包含通常“pandas._libs.tslibs.timestamps.Timestamp”类型数据的数据帧。 这个df具有以下结构
my_test_date = pd.to_datetime(1674009901958454, unit='us')
df = pd.DataFrame(data = {'payment_id_pay': [1, 2],
'DT_START': [my_test_date, my_test_date],
'DT_END': [my_test_date, my_test_date]})
df 看起来像
自从我使用 pandas 以来,我使用 sqlalchemy 将其合并到我的 Oracle DB:
from sqlalchemy import create_engine, text
sql = f'''
MERGE INTO mytable
USING dual
ON (payment_id_pay = :1)
WHEN MATCHED THEN UPDATE SET DT_START = :2, DT_END = :3
WHEN NOT MATCHED THEN INSERT (payment_id_pay, DT_START, DT_END)
VALUES (:1, :2, :3)
'''
engine = create_engine(f'oracle+oracledb://login:pass@host:1521/?service_name=service_name')
with engine.connect() as conn:
conn.execute(text(sql), [dict(zip([f'{i+1}' for i in range(len(df.columns))], i)) for i in df.values.tolist()])
conn.commit()
engine.dispose()
列表推导式是执行方法的参数
结果是,在 Oracle DB 中的表中,所有时间戳都被截断为秒,所有毫秒 = 000000
我在cx_Oracle github上发现了这个问题https://github.com/oracle/python-cx_Oracle/issues/161
由于我没有找到任何明确的文档,我可以在其中找到如何使用 SQLAlchemy 使用cursor.setinputsizes(cx_Oracle.TIMESTAMP),所以我按照本示例中的方式进行操作Python cx_Oracle Insert Timestamp With Milliseconds 我尝试了 3 种不同的方法:SQLAlchemy、纯 oracledb 和纯 cx_Oracle
第一个:按照 github 问题中推荐的纯 cx_Oracle:
import cx_Oracle
sql = f'''
MERGE INTO mytable
USING dual
ON (payment_id_pay = :1)
WHEN MATCHED THEN UPDATE SET DT_START = :2, DT_END = :3
WHEN NOT MATCHED THEN INSERT (payment_id_pay, DT_START, DT_END)
VALUES (:1, :2, :3)
'''
conn = cx_Oracle.connect(user=user, password=password, dsn=dsn)
conn.cursor().setinputsizes(None, cx_Oracle.TIMESTAMP, cx_Oracle.TIMESTAMP)
conn.cursor().executemany(sql, [dict(zip([f'{i+1}' for i in range(len(df.columns))], i)) for i in df.values.tolist()])
conn.commit()
conn.close()
第二:我尝试了相同的操作,但使用的是 oracledb 而不是 cx_Oracle:
import oracledb
sql = f'''
MERGE INTO mytable
USING dual
ON (payment_id_pay = :1)
WHEN MATCHED THEN UPDATE SET DT_START = :2, DT_END = :3
WHEN NOT MATCHED THEN INSERT (payment_id_pay, DT_START, DT_END)
VALUES (:1, :2, :3)
'''
conn = oracledb.connect(user=user, password=password, host="host", port=1521, service_name="name")
conn.cursor().setinputsizes(None, oracledb.TIMESTAMP, oracledb.TIMESTAMP)
conn.cursor().executemany(sql, [dict(zip([f'{i+1}' for i in range(len(df.columns))], i)) for i in df.values.tolist()])
conn.commit()
conn.close()
3d:我没有找到如何在SQLAlchemy中使用setinputsizes()方法,但我发现这个类和方法存在“method sqlalchemy.engine.interfaces.DBAPICursor.setinputsizes(sizes: Sequence[Any]) → None” 我尝试了以下方法:
import sqlalchemy
from sqlalchemy.engine.interfaces import DBAPICursor
sql = f'''
MERGE INTO mytable
USING dual
ON (payment_id_pay = :1)
WHEN MATCHED THEN UPDATE SET DT_START = :2, DT_END = :3
WHEN NOT MATCHED THEN INSERT (payment_id_pay, DT_START, DT_END)
VALUES (:1, :2, :3)
'''
engine = sqlalchemy.create_engine(f'oracle+oracledb://login:pass@host:1521/?service_name=service_name')
with engine.connect() as conn:
DBAPICursor.setinputsizes(oracledb.TIMESTAMP, oracledb.TIMESTAMP)
conn.execute(text(sql), [dict(zip([f'{i+1}' for i in range(len(df.columns))], i)) for i in df.values.tolist()])
conn.commit()
engine.dispose()
上面的所有代码都运行没有问题,并给了我绝对相同的结果:日期时间在我的数据库中被截断为秒
我还尝试将数据从 pandas Timestamp 转换为 datetime.datetime,没有任何改变
我使用:SQLAlchemy 2.0.25, oracledb 1.4.2, CX-Oracle 8.3.0
数据库:Oracle Database 19c 企业版版本 19.0.0.0.0 - 生产
如何在不转换的情况下插入数据? 谢谢!
对于文本 SQL,请使用
p1
而不是 1
,.setinputsizes()
声明日期时间/时间戳参数import datetime
import oracledb
import sqlalchemy as sa
engine = sa.create_engine(
"oracle+oracledb://scott:[email protected]/?service_name=xepdb1"
)
with engine.begin() as conn:
sql = f"""\
MERGE INTO mytable
USING dual
ON (payment_id_pay = :p1)
WHEN MATCHED THEN UPDATE SET DT_START = :p2, DT_END = :p3
WHEN NOT MATCHED THEN INSERT (payment_id_pay, DT_START, DT_END)
VALUES (:p1, :p2, :p3)
"""
test_data = [
{
"p1": 1,
"p2": datetime.datetime(2012, 1, 1, 12, 0, 0, 123000),
"p3": datetime.datetime(2013, 1, 1, 12, 0, 0, 456000),
},
{
"p1": 2,
"p2": datetime.datetime(2022, 1, 1, 12, 0, 0, 123000),
"p3": datetime.datetime(2023, 1, 1, 12, 0, 0, 456000),
},
]
cursor = conn.connection.cursor()
cursor.setinputsizes(p2=oracledb.TIMESTAMP, p3=oracledb.TIMESTAMP)
cursor.executemany(sql, test_data)