防止使用Spark Java将重复数据加载到DB2中

问题描述 投票:-2回答:1

[我有一个用Java编写的Spark流传输程序,我试图在其中使用来自Kafka主题的消息,转换为数据框并加载到DB2数据库。

我需要根据数据在DB2中执行某些操作(添加/删除/更新)。例如输入数据框具有:

RegID|Timestamp        |Action|Product
r1111|20200101:22:15:58|ADD   |alpha
r2222|20200101:22:16:58|DELETE|beta
r3333|20200101:22:15:58|UPDATE|coke

假设r1111 RegID已添加到数据库中。现在,相同的RegID可能会以不同的(稍后)时间戳再次到达。

例如

RegID|Timestamp        |Action|Product
r1111|20200101:23:00:00|ADD   |alpha
r4444|20200101:23:01:58|ADD   |beta

如何确保不再对r1111重复该操作?

因此,要求在以后的同一时间RegID中,不应在DB2中重复操作(添加/删除/更新)。

N.B。:这是Kafka Spark流媒体作业

请帮助。

java dataframe apache-spark db2 spark-streaming
1个回答
0
投票

使用MERGE statement代替INSERT

MERGE INTO MYTABLE T USING
(VALUES ('r1111', TIMESTAMP('2020-01-01-23.00.00'), 'alpha')) 
V (RegID, Timestamp, Product) ON V.RegID = T.RegID
WHEN     MATCHED THEN UPDATE SET (RegID, Timestamp, Product) =      (V.RegID, V.Timestamp, V.Product)
WHEN NOT MATCHED THEN INSERT     (RegID, Timestamp, Product) VALUES (V.RegID, V.Timestamp, V.Product)
;

这仅是示例。当然,您应该使用准备好的语句,例如:VALUES (cast(? as varchar(10)), cast(? as timestamp), cast(? as varchar(20)))。数据类型应与相应的表列匹配。

© www.soinside.com 2019 - 2024. All rights reserved.