Kafka streams Exception: org.apache.kafka.streams.errors.StreamsException - Deserialization异常处理程序。

问题描述 投票:0回答:1

我正试图用kafka流实现一个简单的计数器。它接受一个键值,如果相同的键值来了,就添加新的值。

这是我目前写的代码

package exercises;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;

public class LiveCounter {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "live-counter-2");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
        properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName());
        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> textLine = builder.stream("counter-in");

        KStream<String, Long> words = textLine
                .filter((k, v) -> {
                    try {
                        String[] arr = v.split(" ");
                        Long.parseLong(arr[1]);
                        return true;
                    } catch (NumberFormatException e) {
                        return false;
                    }
                })
                .selectKey((k, v) -> v.split(" ")[0])
                .mapValues((k, v) -> Long.parseLong(v.split(" ")[1]))
                .groupByKey()
                .reduce(Long::sum)
                .toStream();
        words.to("counter-out", Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        streams.start();

        System.out.println(streams.toString());

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

我们可以看到,拓扑结构很简单。它接受一个文本行并改变键,然后最后用reduce函数添加键组的所有值。

我试过流到mapValues为止,它是工作的。但是这个代码块造成了问题

.groupByKey()
.reduce(Long::sum)
.toStream();

这是堆栈跟踪

live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] ERROR org.apache.kafka.streams.errors.LogAndFailExceptionHandler - Exception caught during Deserialization, taskId: 0_0, topic: counter-in, partition: 0, offset: 1
org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:175)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:162)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:765)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:943)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:764)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] Shutting down
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67] State transition from RUNNING to ERROR
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67] All stream threads have died. The instance will be in error state and should be closed.
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] Shutdown complete
Exception in thread "live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:175)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:162)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:765)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:943)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:764)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
[Thread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67] State transition from ERROR to PENDING_SHUTDOWN
[kafka-streams-close-thread] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] Informed to shut down
[kafka-streams-close-thread] INFO org.apache.kafka.streams.KafkaStreams - stream-client [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67] State transition from PENDING_SHUTDOWN to NOT_RUNNING
[Thread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67] Streams client stopped completely

任何帮助将被感激。先谢谢你

java apache-kafka apache-kafka-streams kafka-producer-api confluent-platform
1个回答
1
投票

既然你的输入题目。counter-in,由以下信息组成 Strings,而你的默认值序列器是 LongSerde你需要告诉Kafka Streams将值反序列化为一个字符串,如下所示。

KStream<String, String> textLine = builder.stream("counter-in", Consumed.with(Serdes.String(), Serdes.String()));
© www.soinside.com 2019 - 2024. All rights reserved.