手动确认Kafka Batch监听器。多线程我做错了什么?

问题描述 投票:0回答:1

我正在创建一个应用程序,该应用程序使用来自kafka的数据并对其进行一些处理。由于此应用程序的工作规​​模很大,因此我将Java 8 Executor服务与并发批处理侦听器(max.poll.records = 20)一起使用。以下是带有注释的代码片段:

    //Below method is invoked by spring kafka concurrent batch listener
public void onMessage(List<ConsumerRecord<String,String>> records , Acknowledgement ack)
{
    CompletableFuture.runAsync(() -> records.stream().forEach(record -> 
    doSomeProcessingAsynchronously()),new ThreadPoolExecutor(10,10,0,TimeUnit.MILLISECONDS. new 
    LinkedBlockingQueue<>(20)));
    //I want to execute all consumed records first only then flow should come after this line
    //and acknowledge complete batch
    ack.acknowledge();
}

关于如何完成此工作的任何建议?在此先感谢

spring-boot apache-kafka kafka-consumer-api executorservice spring-kafka
1个回答
0
投票

添加

CountDownLatch latch = new CountDownLatch(records.size());

然后在lambda中为每个已完成的任务将其递减。

确认boolean ok = latch.await(60, TimeUnit.SECONDS);之前。

((检查超时并采取适当措施)。

records.stream().forEach(...

就是说,您不是并行运行处理;您只需在不同的线程上依次运行所有20个。

您需要将forEach移到外面。

© www.soinside.com 2019 - 2024. All rights reserved.