如何调试kafkastreams代码?

问题描述 投票:0回答:3

有人设法使用 IntelliJ IDEA 调试用 Java 8 编写的 kafkastreams 代码吗?我正在运行一个简单的linesplit.java代码,它从一个主题获取流并将其拆分并将其发送到另一个主题,但我不知道在哪里保留调试指针来调试流经linesplit.java的每条消息。

Linesplit.java

public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    final StreamsBuilder builder = new StreamsBuilder();



    // ------- use the code below for Java 8 and uncomment the above ---

    builder.stream("streams-input")
           .flatMapValues(value -> Arrays.asList(value.toString().split("\\W+")))
           .to("streams-output");

     //  -----------------------------------------------------------------

    final Topology topology = builder.build();
    final KafkaStreams streams = new KafkaStreams(topology, props);
    final CountDownLatch latch = new CountDownLatch(1);

    // attach shutdown handler to catch control-c
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
        @Override
        public void run() {
            streams.close();
            latch.countDown();
        }
    });

    try {
        streams.start();
        latch.await();
    } catch (Throwable e) {
        System.exit(1);
    }
    System.exit(0);
}
java apache-kafka apache-kafka-streams
3个回答
3
投票

你尝试过偷看吗?

您的示例如下(使用 peek 函数):

builder
       .stream("streams-input")
       .peek((k, v) -> log.info("Observed event: {}", v))
       .flatMapValues(value -> Arrays.asList(value.toString().split("\\W+"))).
       .peek((k, v) -> log.info("Transformed event: {}", v))
       .to("streams-output");

我没有运行代码,但这就是我通常的做法。


0
投票

对于此类函数,IntelliJ 建议设置内部断点,

让我给你举个例子:

[![如何在 lambda 上设置内部断点][1]] [1]:https://i.stack.imgur.com/m7d7b.png


0
投票

断点应该在 StreamThread/StreamTask 上

© www.soinside.com 2019 - 2024. All rights reserved.