[我有一个用Java编写的Spark流传输程序,我试图在其中使用来自Kafka主题的消息,转换为数据框并加载到DB2数据库。
我需要根据数据在DB2中执行某些操作(添加/删除/更新)。例如输入数据框具有:
RegID|Timestamp |Action|Product
r1111|20200101:22:15:58|ADD |alpha
r2222|20200101:22:16:58|DELETE|beta
r3333|20200101:22:15:58|UPDATE|coke
假设r1111
RegID
已添加到数据库中。现在,相同的RegID
可能会以不同的(稍后)时间戳再次到达。
例如
RegID|Timestamp |Action|Product
r1111|20200101:23:00:00|ADD |alpha
r4444|20200101:23:01:58|ADD |beta
如何确保不再对r1111
重复该操作?
因此,要求在以后的同一时间RegID
中,不应在DB2中重复操作(添加/删除/更新)。
N.B。:这是Kafka Spark流媒体作业
请帮助。
使用MERGE statement代替INSERT
。
MERGE INTO MYTABLE T USING
(VALUES ('r1111', TIMESTAMP('2020-01-01-23.00.00'), 'alpha'))
V (RegID, Timestamp, Product) ON V.RegID = T.RegID
WHEN MATCHED THEN UPDATE SET (RegID, Timestamp, Product) = (V.RegID, V.Timestamp, V.Product)
WHEN NOT MATCHED THEN INSERT (RegID, Timestamp, Product) VALUES (V.RegID, V.Timestamp, V.Product)
;
这仅是示例。当然,您应该使用准备好的语句,例如:VALUES (cast(? as varchar(10)), cast(? as timestamp), cast(? as varchar(20)))
。数据类型应与相应的表列匹配。