背景:
我有一个mysql版本8的表,如下。
id | field1 | field2 | ...... | field10 | timeStamp
_______________________________________________________________
fieldN 是浮点数据类型。 id 是唯一的,并且有索引。需要进入该表的值由网络上的不同分布式程序节点生成。不同的节点生成一个或多个字段的数据。总共,大约有 35000 个节点每分钟生成数据,因此每秒大约有 600 个输入。 (注意:一开始它是一组小得多的节点,慢慢地不断增长)。目前,对于来自节点的每个请求,我运行类似这样的 UPDATE 查询。
Node 1 eg: UPDATE table set field1=24.5, field6=44.6,timeStamp=now() where id=1
Node 2 eg: UPDATE table set field3=53.3, field9=4.2, field10=98.8, timeStamp=now() where id=2
Node 3 eg: UPDATE table set field1=55.5, field8=7.2, field9=8.8, timeStamp=now() where id=3
等等。因此,更新哪些字段/字段数取决于任何给定节点,如上所示。有些可能有 2 个字段的数据,有些只有 1 个,有些甚至 5 或 6 个,等等。
手头上的任务:
为了减少数据库连接/查询的数量(目前每个节点每分钟一个),我将所有内容放在一起以使用 Kafka 收集所有这些更新,然后批量运行它们而不是单独的查询。
一旦来自各个节点的数据进入kafka,我就不确定最好的处理方法是什么。我读了一些书,发现了以下选项。
1. Transactions,
2. CASE
3. MultiQuery.
我使用码头工人。我根据测试脚本(https://github.com/yanxurui/keepcoding/blob/master/php/mysql/multiple_update.php)运行了测试,发现CASE方法似乎是最快的。虽然 Multiquery 似乎显示此测试脚本的执行时间最短,但看起来它只是将所有这些查询转储到 mysql 中,并且 mysql 实际完成查询需要更长的时间(正如 docker stats 中的信息所观察到的)。而案例方法似乎更快。
我的解决方案
在考虑了各种方法之后,这就是我的想法。
UPDATE table set
field1= CASE when id=1 THEN 24.5
when id=3 THEN 55.5
else field1
END,
field3= CASE when id=2 THEN 53.3
else field3
END,
etc..,
timeStamp=now()
WHERE id IN (1,2,3,...)
这意味着,对于表中的每个字段,我必须迭代节点列表以查看它们是否有要报告给该字段的值,等等。
问题
您是否尝试过类似的操作(使用重复密钥):
INSERT INTO table (id,field,field2,...,field10) VALUES
(1, 24.5,null,null,null,..null),
(2, null,null,null,null,..98.8),
(3, 55.5,null,null,null,..8.8)
ON DUPLICATE KEY UPDATE
field1=IF(VALUES(field1) = NULL, field1, VALUES(field1)),
field2=IF(VALUES(field2) = NULL, field2, VALUES(field2)), ...
field10=IF(VALUES(field10) = NULL, field10, VALUES(field10)),
`timestamp` = NOW();
将 UPDATE 必须使用的数据保存到相同结构的临时表中(如果要更新列,则它包含相应的值,否则为 NULL)。然后使用
UPDATE maintable
JOIN temptable USING (id)
SET maintable.column1 = COALESCE(temptable.column1, maintable.column1),
-- ...
maintable.column10 = COALESCE(temptable.column10, maintable.column10),
`timestamp` = NOW();
此外,我建议您更改表,将
ON UPDATE CURRENT_TIMESTAMP
选项设置为 timestamp
列,并从查询中删除此列的显式更新。