我有一个流应用程序,它消耗消息的速度通常很快,但有时需要很长时间(最多 30 分钟)。该行为是不确定的,我们在处理消息之前并不知道。我们在 Kubernetes 中部署应用程序,其中 Pod 下降和扩展是正常的。当所有 Pod 长时间陷入处理消息时,任何新的 Pod 扩展或缩减都会启动重新平衡,这需要花费大量时间,有时由于轮询线程被阻塞而永远无法完成。
我正在尝试暂停和恢复 Kafka 流,理论上它不应该阻止轮询,因此它不应该阻止重新平衡。实现以下目标的理想编程模型是什么?我知道使用 Kafka 消费者模型(而不是流)时这是相当容易完成的
我尝试过以下方法。它有多个问题:
''' 公开课测试1 {
static KafkaStreams streams;
public static void main(String[] args) {
// Set up the configuration properties
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 180000)
Topology builder = new Topology();
// Add a source processor node
builder.addSource("Source", "input");
// Add a processor node
builder.addProcessor("Processor", MyProcessor::new, "Source");
builder.addSink("sendto", "output", Serdes.String().serializer(), Serdes.String().serializer(),"Processor");
// Build the topology
streams = new KafkaStreams(builder, config);
// Start the KafkaStreams instance
streams.start();
// Add shutdown hook to gracefully close the KafkaStreams instance
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
static class MyProcessor implements Processor<String, String, String, String> {
private ProcessorContext context;
private ScheduledExecutorService executorService;
@Override
public void init(ProcessorContext context) {
this.context = context;
this.executorService = Executors.newSingleThreadScheduledExecutor();
}
@Override
public void process(Record<String, String> record) {
System.out.println("In processing");
streams.pause();
executorService.schedule(() -> {
System.out.println("Started processing..sleeping");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("OUT processing...");
context.forward(new Record<>("test1", "value",1l), "sendto");
context.commit();
streams.resume();
}, 0, TimeUnit.SECONDS);
}
'''
Kafka Streams 不够灵活。如果它手头有一条消息并且处理需要很长时间,它不会调用
poll()
。
您可能需要重新使用
KafkaConsumer
来代替,以便您可以更好地控制它。或者您尝试相应地增加 max.poll.timeout.ms
,但实际上我不会推荐它 - 30 分钟将是一个非常高的值,并且可能会导致其他问题。
使用
ScheduledExecutorService
无论如何都是一个非常糟糕的主意,可能会破坏你的应用程序。 Kafka Streams 不允许运行您自己的后台线程。