Kafka。在消费者中手动生成记录会触发异常

问题描述 投票:0回答:1

我有一个消费者,从多个主题中进行投票。到目前为止,我只生产记录到这些主题与Java和一切工作正常。

我使用avro的confulent工具。

现在我试着通过终端手动生成一个主题。

我创建了一个avro-producer,它与我的其他生产者使用的模式相同。

# Produce a record with one field
kafka-avro-console-producer \
  --broker-list 127.0.0.1:9092 --topic order_created-in \
  --property schema.registry.url=http://127.0.0.1:8081 \
  --property value.schema='{"type":"record","name":"test","fields":[{"name":"name","type":"string"},{"name":"APropertie","type":{"type":"array","items":{"type":"record","name":"APropertie","fields":[{"name":"key","type":"string"},{"name":"name","type":"string"},{"name":"date","type":"string"}]}}}]}'

然后,我按照指定的模式生成一条记录。

{"name": "order_created", "APropertie": [{"key": "1", "name": "testname", "date": "testdate"}]}

该记录被正确地生成到主题中。但是......我尝试着用我的另一个生产者所使用的模式创建一个avro-producer:之后,我按照指定的模式生成一条记录:该记录被正确地生成到主题中。 我的 AvroConsumer 抛出了一个异常。

Polling
Polling
Polling
Polling
Polling
Polling
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition order_created-in-0 at offset 1. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class test specified in writer's schema whilst finding reader's schema for a SpecificRecord.

Process finished with exit code 1

有什么提示吗?谢谢!我有一个消费者,可以从多个主题中进行投票。

java apache-kafka avro
1个回答
0
投票

这与生产者消费者的配置有关。

正常的生产者有一个这样的配置。

        // normal producer
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("acks", "all");
        properties.setProperty("retries", "10");

Avro通常会添加以下属性

        // avro part
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
        properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
        properties.setProperty("confluent.value.schema.validation", "true");
        properties.setProperty("confluent.key.schema.validation", "true");

这些属性必须包含在控制台生产者中。

© www.soinside.com 2019 - 2024. All rights reserved.