Kafka Streams - 暂停和恢复

问题描述 投票:0回答:1

我有一个流应用程序,它消耗消息的速度通常很快,但有时需要很长时间(最多 30 分钟)。该行为是不确定的,我们在处理消息之前并不知道。我们在 Kubernetes 中部署应用程序,其中 Pod 下降和扩展是正常的。当所有 Pod 长时间陷入处理消息时,任何新的 Pod 扩展或缩减都会启动重新平衡,这需要花费大量时间,有时由于轮询线程被阻塞而永远无法完成。

我正在尝试暂停和恢复 Kafka 流,理论上它不应该阻止轮询,因此它不应该阻止重新平衡。实现以下目标的理想编程模型是什么?我知道使用 Kafka 消费者模型(而不是流)时这是相当容易完成的

  • 开始处理消息
  • 生成一个新线程来处理消息
  • 暂停 kafka 流,这会解锁轮询并且不消耗任何新消息
  • 处理完消息后
    • 将消息转发到接收器(输出主题)
    • 提交偏移量并恢复流

我尝试过以下方法。它有多个问题:

  • 当kafka流暂停时,转发将永远挂起
  • 如果我恢复流然后转发,它不会挂起,但输出主题中没有消息

''' 公开课测试1 {

static KafkaStreams streams;
public static void main(String[] args) {
    // Set up the configuration properties
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
    config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
    config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 180000)

    Topology builder = new Topology();

    // Add a source processor node
    builder.addSource("Source", "input");

    // Add a processor node
    builder.addProcessor("Processor", MyProcessor::new, "Source");

  
    builder.addSink("sendto", "output", Serdes.String().serializer(), Serdes.String().serializer(),"Processor");

    // Build the topology
    streams = new KafkaStreams(builder, config);

    // Start the KafkaStreams instance
    streams.start();

    // Add shutdown hook to gracefully close the KafkaStreams instance
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}

static class MyProcessor implements Processor<String, String, String, String> {
    private ProcessorContext context;
    private ScheduledExecutorService executorService;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.executorService = Executors.newSingleThreadScheduledExecutor();
    }


    @Override
    public void process(Record<String, String> record) {
        System.out.println("In processing");

        streams.pause();
        executorService.schedule(() -> {
            System.out.println("Started processing..sleeping");
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("OUT processing...");

            context.forward(new Record<>("test1", "value",1l), "sendto");
            context.commit();
            streams.resume();

        }, 0, TimeUnit.SECONDS);
    }

'''

java apache-kafka kafka-consumer-api apache-kafka-streams
1个回答
0
投票

Kafka Streams 不够灵活。如果它手头有一条消息并且处理需要很长时间,它不会调用

poll()

您可能需要重新使用

KafkaConsumer
来代替,以便您可以更好地控制它。或者您尝试相应地增加
max.poll.timeout.ms
,但实际上我不会推荐它 - 30 分钟将是一个非常高的值,并且可能会导致其他问题。

使用

ScheduledExecutorService
无论如何都是一个非常糟糕的主意,可能会破坏你的应用程序。 Kafka Streams 不允许运行您自己的后台线程。

© www.soinside.com 2019 - 2024. All rights reserved.