如何在 Apache Camel 中更新 Kafka 主题 JSON 值?

问题描述 投票:0回答:2

我是新的 Apache Kafka,并从 Kafka 主题中阅读了以下消息:

{
    "ordertime": 1497014222380,
    "orderid": 18,
    "itemid": "Item_184"
}

我只需要更新

ordertime
字段,但不知道如何将消息传递到这个 Kafka 主题。那么,我应该在 Apache Camel 中做这样的事情吗?:

Producer<String, String> producer = null;

    Properties props = new Properties();
    props.put("ordertime", "123245545454");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    producer = new KafkaProducer<>(props);

    try {
        producer = new KafkaProducer<String, String>(props);
    } catch (Exception e) {
        e.printStackTrace();
    }
    
    producer.close();
java apache-kafka apache-camel
2个回答
0
投票

如果您使用Camel,那么您可以使用处理器来拦截和转换数据

https://camel.apache.org/manual/processor.html

您的其他选项在https://camel.apache.org/components/3.21.x/eips/transform-eip.html

中提到

否则,在 Kafka 中,您可以使用 Kafka Streams 执行相同的操作,使用

map()
函数

https://kafka.apache.org/35/documentation/streams/developer-guide/dsl-api.html


0
投票

您通常会创建一个映射到 json 有效负载的 bean

public class Order {
   private Long ordertime;
   private Long orderid;
   private Long itemid;

   public Order(Long ordertime, Long orderid, Long itemid) { ... }

   // getters
}

然后你会使用

Producer<String, Order>
(假设你的键类型是 String)

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer");
props.put("schema.registry.url", "http://127.0.0.1:8081");
Producer<String, Order> producer = new KafkaProducer<>(props);
...
Order order = new Order(1, 2, 3);
String key = "someString";
producer.send(key, order);

参见 kafka json 序列化器文档

© www.soinside.com 2019 - 2024. All rights reserved.