我是新的 Apache Kafka,并从 Kafka 主题中阅读了以下消息:
{
"ordertime": 1497014222380,
"orderid": 18,
"itemid": "Item_184"
}
我只需要更新
ordertime
字段,但不知道如何将消息传递到这个 Kafka 主题。那么,我应该在 Apache Camel 中做这样的事情吗?:
Producer<String, String> producer = null;
Properties props = new Properties();
props.put("ordertime", "123245545454");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
try {
producer = new KafkaProducer<String, String>(props);
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
如果您使用Camel,那么您可以使用处理器来拦截和转换数据
https://camel.apache.org/manual/processor.html
您的其他选项在https://camel.apache.org/components/3.21.x/eips/transform-eip.html
中提到否则,在 Kafka 中,您可以使用 Kafka Streams 执行相同的操作,使用
map()
函数
https://kafka.apache.org/35/documentation/streams/developer-guide/dsl-api.html
您通常会创建一个映射到 json 有效负载的 bean
public class Order {
private Long ordertime;
private Long orderid;
private Long itemid;
public Order(Long ordertime, Long orderid, Long itemid) { ... }
// getters
}
然后你会使用
Producer<String, Order>
(假设你的键类型是 String)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer");
props.put("schema.registry.url", "http://127.0.0.1:8081");
Producer<String, Order> producer = new KafkaProducer<>(props);
...
Order order = new Order(1, 2, 3);
String key = "someString";
producer.send(key, order);