Kafka消费者应用程序具有严重的延迟(在高峰时段没有足够快地消耗kafka事件)。 kafka主题有120个分区,而消费者组总共有30个主机,每个主机有两个消费者,因此每个消费者使用2个kafka分区。我们使用的主机是具有32个核心的AWS C5.9xlarge实例。每个使用者都被放入一个java.lang.Thread中,并且在每个线程中,使用250个线程创建一个ThreadPool。
我们已经验证了没有CPU /内存/ IO是瓶颈。然后我们将250名工人增加到500名工人,但延迟时间仍然存在。然后我们改回250名工作人员,但每个主机从2增加到4个消费者。结果,每个消费者消耗一个kafka分区。现在问题解决了,延迟降到很低。
我的问题是,为什么在Threadpool中从250增加到500并没有帮助,但是每个主机从2个增加到4个消费者有帮助吗?
private class ConsumerThread extends Thread {
public ConsumerThread(StremProcessor processor) {
this.processor = processor;
this.consumer = new KafkaConsumer()
}
@Override
public void run() {
ExecutorService executor = Executors.newFixedThreadPool(250);
while (true) {
Data data = consumer.poll()
executor.invokeAll(getTasks(data, processor)); //processor is
}
}
}
首先:你应该在每个循环之间的while循环中加入一些延迟,以防止你的应用程序泛滥你的内存。
基本上ExecutorService.invokeAll()
方法返回Future
s列表。您可以使用它们来“控制”您的线程。
ThreadPool中的线程与java.lang.Thread有何不同?
它们没有区别,但你得到一个包装器(Future
),它可以让你在执行时控制线程。底层的Thread
就像通常的Java线程一样工作。
是因为ThreadPool中的所有线程都使用单个处理器核心吗?
没有
线程池只是reusable
池的java.lang.Thread
。通常,线程池有一个queue of tasks
,如果线程池中的任何线程是空闲的,它可以执行任务,当任务完成时,线程返回池并尝试查找是否有任何其他任务在队列中等待。
ThreadPool中的线程与java.lang.Thread有何不同?
没有区别。只有使用上的差异。
是因为ThreadPool中的所有线程都使用单个处理器核心吗?
不,它可以使用任意数量的可用处理器。
我记得ExecutorPool中的默认线程是每个处理器250个,这是否意味着ExecutorPool不够智能将250个线程分配到16个核心?
从哪里可以获得“ExecutorPool每处理器250个”的信息?我完全不明白你的问题。线程池的线程可以作为普通线程在任何核心上执行,对线程池的线程没有任何限制。