我正在创建一个应用程序,该应用程序使用来自kafka的数据并对其进行一些处理。由于此应用程序的工作规模很大,因此我将Java 8 Executor服务与并发批处理侦听器(max.poll.records = 20)一起使用。以下是带有注释的代码片段:
//Below method is invoked by spring kafka concurrent batch listener
public void onMessage(List<ConsumerRecord<String,String>> records , Acknowledgement ack)
{
CompletableFuture.runAsync(() -> records.stream().forEach(record ->
doSomeProcessingAsynchronously()),new ThreadPoolExecutor(10,10,0,TimeUnit.MILLISECONDS. new
LinkedBlockingQueue<>(20)));
//I want to execute all consumed records first only then flow should come after this line
//and acknowledge complete batch
ack.acknowledge();
}
关于如何完成此工作的任何建议?在此先感谢
添加
CountDownLatch latch = new CountDownLatch(records.size());
然后在lambda中为每个已完成的任务将其递减。
确认boolean ok = latch.await(60, TimeUnit.SECONDS);
之前。
((检查超时并采取适当措施)。
records.stream().forEach(...
就是说,您不是并行运行处理;您只需在不同的线程上依次运行所有20个。
您需要将forEach移到外面。