我有一个主题,比如说“topic_soure”。消息采用 json 格式。
所有消息的顶级字段都相同,但“数据”字段可能有不同的模型。
我不知道“数据”字段模型到底是什么。但每个“数据”字段模型在顶层都有一个“优先”字段。
示例:
{
"id": "39f8582e-91e1-41b0-8590-f83f0a4b756a",
"time": "2023-12-14T10:45:36.913Z",
"data": {
"priority": 1,
"sub_data":{
"id":"some id",
"value": "some value"
}
}
}
{
"id": "39f8582e-91e1-41b0-8590-f83f0a4b756a",
"time": "2023-12-14T10:45:36.913Z",
"data": {
"priority": 2,
"related_data":{
"id":"some id2",
"value": "some value2"
}
}
}
据我了解,AVRO 和 SchemaRegistry 无法在我的环境中使用。
我需要通过字段“priority”将“topic_soure”分成两个主题。
为了解决这个问题,我创建了带有支持主题“topic_soure”的流;
create stream soure_stream ( \
id varchar, \
time varchar, \
data varchar \
) with (key_format='kafka', kafka_topic = 'topic_soure', value_format='json');
然后,我为每个优先级创建两个带有接收器主题的流
create stream soure_stream_priority_1 \
with (kafka_topic = 'topic_soure_priority_1', format='JSON')\
as select * from soure_stream \
where extractjsonfield(data, '$.priority') = 1;
create stream soure_stream_priority_2 \
with (kafka_topic = 'topic_soure_priority_2', format='JSON')\
as select * from soure_stream \
where extractjsonfield(data, '$.priority') = '2';
效果很好。
但是,结果主题消息模型发生了变化。
{
"ID": "39f8582e-91e1-41b0-8590-f83f0a4b756a",
"TIME": "2023-12-14T10:45:36.913Z",
"DATA": "{\"priority\":1,\"sub_data\":{\"id\":\"some id\",\"value\":\"some value\"}}"
}
“data”字段变成转义的 json 字符串。 我需要消息保持原样不变。
有什么方法可以转换“数据”字段格式吗? 或者..我可以使用 ksqldb/streams 以另一种方式解决这个问题吗?
如果您正在寻找使用 kafka-streams java API 解决该问题,答案可能是这样的:
final KStream<String, String> initialStream = streamsBuilder.stream("topic_soure", Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> dataWithPeriority1 = initialStream.filter((key, value) -> {
return isPeriorityEqualTo(value, 1);
});
final KStream<String, String> dataWithPeriority2 = initialStream.filter((key, value) -> {
return isPeriorityEqualTo(value, 2);
});
dataWithPeriority1.to("periority-one-output-topic");
dataWithPeriority1.to("periority-two-output-topic");
private boolean isPeriorityEqualTo(final String message, final int expectedPeriority) {
final JSONObject messageAsJson = new JSONObject(message);
final JSONObject data = messageAsJson.getJSONObject("data");
final String periority = data.getString("periority");
// do that checking part and enhance it based on what you need
int dataPeriority = 0;
if (periority.contains("1"))
dataPeriority = 1;
else
dataPeriority = 2;
return dataPeriority==expectedPeriority;
}
步骤: