我想要执行的是使用链接到某个数据源的 int-jdbc:inbound-channel-adapter 来消费消息,然后更新 consumer_table (或偏移表),该表将针对特定键保留,查询的具体日期
<int-jdbc:inbound-channel-adapter
...
query="someSelect"
update="someUpdate"
update-sql-parameter-source-factory="someCustomSqlParameterSourceFactory">
</int-jdbc:inbound-channel-adapter>
组ID | 最新_咨询 |
---|---|
一些组ID | 2023-11-01 19:30 |
一些其他组ID | 2023-12-04 19:30 |
id | 优先 | 有效负载 | 创建日期 |
---|---|---|---|
1 | 空 | [斑点] | 2023-12-01 18:20:00.000 |
2 | 1 | [斑点] | 2023-12-05 18:30:00.000 |
3 | 1 | [斑点] | 2023-12-10 18:27:00.000 |
基本上,我会有2个疑问:
SELECT NOW() as now_,id,payload,priority
FROM myDb.messages m
WHERE m.creation_date >
(select o.latest_consultation from myDb.offsets o where o.group_id = 'someGroupId')
并且
REPLACE INTO myDb.offsets (group_id,latest_consultation) values ('someGroupId', :now_)
这里的问题是 :now_ 表达式将被评估为集合
here 对此进行了很好的解释,并从解决方案开始。
但通过上述链接的技巧,它将指向一个静态方法,该方法将无法访问输入 data ,因此无法访问 now_ 值(或者不是以我理解的方式)。
诚然,这样我可以提供运行时计算的时间戳,但它可能与 now_ 值略有不同。它可能很短,但每小时有数十亿条消息,这有时可能会导致同时丢失一些消息。因此,我需要它是now_
因此,我试图找到一种方法将 select 查询的确切时间作为单个元素(而不是集合)传递到 update 查询
有什么(新)想法吗?
编辑1 我尝试了 Artem Bilan 的解决方案:使用 update-per-row="true" 我必须将 replace into 替换为 update where... 因为 now_ 未被识别为时间戳,而是被识别为对象。基本上,我有:
PreparedStatementCallback; SQL 的未分类 SQLException [替换为 moba.offsets (group_id,latest_consultation) 值 ('瓦尔基米尔', ?) ]; SQL状态[空];错误代码[0];类型 java.lang.Object 不支持的类型
使用 update 子句,结合 update-per-row="true",就可以达到目的。然而,我比较了有和没有 update-per-row 的性能,对于 3000 条记录,我从 5 分钟缩短到 1.5 分钟......这是消息速率的一个小降级。如果可能的话,我宁愿找到一种侵入性较小的方法。
(尽管如此,我的用例实际上并不现实:我一次性使用了它们,而轮询模式将更接近于 update-per-row="true" 用例)
当您的
SELECT
返回多行并且您打算对所有行使用单个 UPDATE
时,那么您需要考虑在 in(:now_)
中使用 UPDATE
子句:https://docs.spring。 io/spring-integration/reference/jdbc/inbound-channel-adapter.html
更新查询中的参数是通过参数名称的冒号 (:) 前缀来指定的(在前面的示例中,该参数是要应用于轮询结果集中的每一行的表达式)。这是 Spring JDBC 中命名参数 JDBC 支持的标准功能,与 Spring Integration 中采用的约定(投影到轮询结果列表上)相结合。
里面的逻辑是这样的:
"#root.![" + paramName + "]"
这是一个
collection projection
:构建一个新集合,选择原始集合元素的特定属性:https://docs.spring.io/spring-framework/reference/core/expressions/language-ref/collection-projection .html
如果您仍然无法使用它,还有一个类似
update-per-row="true"
的选项。请参阅有关 JDBC 入站通道适配器的相同文档。
如果逻辑得出结论:只能从
max-rows="1"
返回一项,您也可以考虑使用 SELECT
。
另一种解决方案是使用存储过程在数据库端执行所有操作:https://docs.spring.io/spring-integration/reference/jdbc/stored-procedures.html