有什么方法可以使用kafka-avro-console-consumer消耗一个旧模式的消息?我设法使用我们的模式注册表来消费主题中的消息,但它似乎总是使用最新的oneenter代码,在这里。
./kafka-avro-console-consumer --topic mytopic --partition 14 --offset 791197 --max-messages 1 --bootstrap-server mybootstrapserver.aws.confluent.cloud:9092 --property schema.registry.url=http://schemaregistry.mydomain --consumer.config consumer.properties
有没有办法指定一个特定版本的模式?
生产者指定模式,同时向主题发送消息。您将在消费者中收到完全相同的消息。如果你收到的消息是最新的模式格式,这意味着生产者正在用相同的Avro版本格式生产该消息。
让我们看看下面的例子。
Schema v1:
{
"type": "record",
"namespace": "com.example",
"name": "Customer",
"fields": [
{ "name": "first_name", "type": "string" },
{ "name": "last_name", "type": "string" }
]
}
Schema v2
{
"type": "record",
"namespace": "com.example",
"name": "Customer",
"fields": [
{ "name": "first_name", "type": "string" },
{ "name": "last_name", "type": "string" },
{ "name": "extra", "type": ["null","string"], "default":null }
]
}
而你已经发送了以下3条信息
{
"value": {"first_name": "raja", "last_name": "bhowmick"}
}
与v1
{
"value": {"first_name": "raja", "last_name": "bhowmick", "extra": { "string":"extra" }}
}
与v2
{
"value": {"first_name": "raja", "last_name": "bhowmick"}
}
与v1
然后你会收到这样的信息
[
{
"topic": "test",
"key": null,
"value": {
"first_name": "raja",
"last_name": "bhowmick"
},
"partition": 0,
"offset": 3
},
{
"topic": "test",
"key": null,
"value": {
"first_name": "raja",
"last_name": "bhowmick",
"extra": {
"string": "extra"
}
},
"partition": 0,
"offset": 4
},
{
"topic": "test",
"key": null,
"value": {
"first_name": "raja",
"last_name": "bhowmick"
},
"partition": 0,
"offset": 5
}
]