我正在尝试使用 pyodbc 将 SQL 表合并到数据框对象。
如果目标(SQL)表中没有匹配项,合并将删除行,问题是 pyodbc 似乎逐行应用合并,并且通过这样做,当到达最后一行时,所有行都会更新或之前插入的将被删除,因为它们与当前行的键不匹配。
即使将标签 fast_executemany 设置为 True,这也无法使用execute_many(我希望它会这样,因为我认为我的数据帧会被批量合并为一个整体)。
我尝试通过简单的执行进行合并,并创建与数据框中的单元格一样多的占位符,这会起作用,但它非常丑陋,我很确定我们可以拥有的占位符是有限的在一份声明中。
我编写了一个简单的代码来说明这一点:
import pandas as pd
import pyodbc
connection = pyodbc.connect(xxxxxxx, autocommit=True)
cursor = connection.cursor()
cursor.fast_executemany = True
cursor.execute("DELETE FROM [dbo].[MyTable]")
merge = """MERGE [dbo].[MyTable] TARGET USING (VALUES (?,?,?,?)) as SOURCE (Country,Region,Volume,NbClients)
ON TARGET.Country = SOURCE.Country AND TARGET.Region = SOURCE.Region
WHEN MATCHED THEN UPDATE SET TARGET.Volume = SOURCE.Volume, TARGET.NbClients = SOURCE.NbClients
WHEN NOT MATCHED BY TARGET THEN INSERT (Country,Region,Volume,NbClients) VALUES (SOURCE.Country, SOURCE.Region, SOURCE.Volume, SOURCE.NbClients)
WHEN NOT MATCHED BY SOURCE THEN DELETE;
"""
res = pd.DataFrame([['France', 'Finistere', 19488788.334505435, 13], ['France', 'Savoie', 11282506.25, 75], ['Maroc', 'Casablanca', 12454559.801253136, 15], ['Perou', 'Quito', 13059125.212926773, 49]])
# Won't work, only the last row of the dataframe will be in the table
cursor.executemany(merge, res.values.tolist())
res = pd.DataFrame([['France', 'Vendee', 1488788.3254, 56], ['Maroc', 'Casablanca', 42454559.801253136, 36]])
# Won't work, only the last row of the dataframe will be in the table
cursor.executemany(merge, res.values.tolist())
merge_flat = """MERGE [dbo].[MyTable] TARGET USING (VALUES (?,?,?,?), (?,?,?,?)) as SOURCE (Country,Region,Volume,NbClients)
ON TARGET.Country = SOURCE.Country AND TARGET.Region = SOURCE.Region
WHEN MATCHED THEN UPDATE SET TARGET.Volume = SOURCE.Volume, TARGET.NbClients = SOURCE.NbClients
WHEN NOT MATCHED BY TARGET THEN INSERT (Country,Region,Volume,NbClients) VALUES (SOURCE.Country, SOURCE.Region, SOURCE.Volume, SOURCE.NbClients)
WHEN NOT MATCHED BY SOURCE THEN DELETE;"""
params = [item for sublist in res.values.tolist() for item in sublist]
# Will work but need to have a placeholder for each cell
cursor.execute(merge_flat, *params)
表格就像这样:
CREATE TABLE [dbo].[MyTable](
[Country] [varchar](30) NOT NULL,
[Region] [varchar](50) NOT NULL,
[Volume] [numeric](24, 6) NULL,
[NbClients] [numeric](24, 10) NULL,
CONSTRAINT [PK_MyTable] PRIMARY KEY CLUSTERED
(
[Country] ASC,
[Region] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
到目前为止,最好的解决方案似乎是执行删除/插入,或者可能使用临时表,将我的数据插入其中并对其进行合并,但我不敢相信应该如此简单的东西不起作用。
我错过了什么吗?
感谢您的帮助。
如果您使用的是 SQL Server 2016 或更高版本,那么您可以使用
OPENJSON
,如下所示:
import pandas as pd
import pyodbc
connection = pyodbc.connect(
"DSN=mssql_199;UID=scott;PWD=tiger^5HHH", autocommit=True
)
cursor = connection.cursor()
# set up test environment
cursor.execute("DROP TABLE IF EXISTS [dbo].[MyTable]")
cursor.execute(
"""\
CREATE TABLE [dbo].[MyTable](
[Country] [varchar](30) NOT NULL,
[Region] [varchar](50) NOT NULL,
[Volume] [numeric](24, 6) NULL,
[NbClients] [numeric](24, 10) NULL,
CONSTRAINT [PK_MyTable] PRIMARY KEY CLUSTERED
(
[Country] ASC,
[Region] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
"""
)
# example code starts here
merge = """\
MERGE [dbo].[MyTable] WITH (HOLDLOCK) as TARGET
USING
OPENJSON(?)
WITH (
Country varchar(30) '$.Country',
Region varchar(50) '$.Region',
Volume numeric(24, 6) '$.Volume',
NbClients numeric(24, 10) '$.NbClients'
)
as SOURCE
ON TARGET.Country = SOURCE.Country AND TARGET.Region = SOURCE.Region
WHEN MATCHED THEN
UPDATE SET
TARGET.Volume = SOURCE.Volume,
TARGET.NbClients = SOURCE.NbClients
WHEN NOT MATCHED BY TARGET THEN
INSERT (Country,Region,Volume,NbClients)
VALUES (SOURCE.Country, SOURCE.Region, SOURCE.Volume, SOURCE.NbClients)
WHEN NOT MATCHED BY SOURCE THEN DELETE;
"""
res = pd.DataFrame(
[
("France", "Vendee", 1488788.3254, 56),
("Maroc", "Casablanca", 42454559.801253136, 36),
],
columns=["Country", "Region", "Volume", "NbClients"],
)
cursor.execute(merge, res.to_json(orient="records"))
# check results
print(cursor.execute("SELECT * FROM MyTable").fetchall())
"""
[
('France', 'Vendee', Decimal('1488788.325400'), Decimal('56.0000000000')),
('Maroc', 'Casablanca', Decimal('42454559.801253'), Decimal('36.0000000000'))
]
"""