Ksqldb、kafka 流。将主题消息拆分并按条件发布到不同的主题

问题描述 投票:0回答:1

我有一个主题,比如说“topic_soure”。消息采用 json 格式。

所有消息的顶级字段都相同,但“数据”字段可能有不同的模型。

我不知道“数据”字段模型到底是什么。但每个“数据”字段模型在顶层都有一个“优先”字段。

示例:

{
    "id": "39f8582e-91e1-41b0-8590-f83f0a4b756a",
    "time": "2023-12-14T10:45:36.913Z",
    "data": {
        "priority": 1,
        "sub_data":{
            "id":"some id",
            "value": "some value"
        }
    }
}
{
    "id": "39f8582e-91e1-41b0-8590-f83f0a4b756a",
    "time": "2023-12-14T10:45:36.913Z",
    "data": {
        "priority": 2,
        "related_data":{
            "id":"some id2",
            "value": "some value2"
        }
    }
}

据我了解,AVRO 和 SchemaRegistry 无法在我的环境中使用。

我需要通过字段“priority”将“topic_soure”分成两个主题。

为了解决这个问题,我创建了带有支持主题“topic_soure”的流;

create stream soure_stream ( \
   id              varchar,   \
   time            varchar,   \
   data            varchar    \
) with (key_format='kafka', kafka_topic = 'topic_soure', value_format='json');

然后,我为每个优先级创建两个带有接收器主题的流

create stream soure_stream_priority_1 \
with (kafka_topic = 'topic_soure_priority_1', format='JSON')\
as select * from soure_stream \
where extractjsonfield(data, '$.priority') = 1;

create stream soure_stream_priority_2 \
with (kafka_topic = 'topic_soure_priority_2', format='JSON')\
as select * from soure_stream \
where extractjsonfield(data, '$.priority') = '2';

效果很好。

但是,结果主题消息模型发生了变化。

{
  "ID": "39f8582e-91e1-41b0-8590-f83f0a4b756a",
  "TIME": "2023-12-14T10:45:36.913Z",
  "DATA": "{\"priority\":1,\"sub_data\":{\"id\":\"some id\",\"value\":\"some value\"}}"
}

“data”字段变成转义的 json 字符串。 我需要消息保持原样不变。

有什么方法可以转换“数据”字段格式吗? 或者..我可以使用 ksqldb/streams 以另一种方式解决这个问题吗?

apache-kafka apache-kafka-streams ksqldb
1个回答
0
投票

如果您正在寻找使用 kafka-streams java API 解决该问题,答案可能是这样的:

final KStream<String, String> initialStream = streamsBuilder.stream("topic_soure", Consumed.with(Serdes.String(), Serdes.String()));

final KStream<String, String> dataWithPeriority1 =  initialStream.filter((key, value) -> {
            return isPeriorityEqualTo(value, 1);
        });
final KStream<String, String> dataWithPeriority2 =  initialStream.filter((key, value) -> {
            return isPeriorityEqualTo(value, 2);
        });
dataWithPeriority1.to("periority-one-output-topic");
dataWithPeriority1.to("periority-two-output-topic");


private boolean isPeriorityEqualTo(final String message, final int expectedPeriority) {
        final JSONObject messageAsJson = new JSONObject(message);
        final JSONObject data = messageAsJson.getJSONObject("data");
        final String periority = data.getString("periority");
        // do that checking part and enhance it based on what you need 
        int dataPeriority = 0;
        if (periority.contains("1"))
            dataPeriority = 1;
        else
            dataPeriority = 2;

        return dataPeriority==expectedPeriority;
    }

步骤:

  • 从输入主题中消费。
  • 根据您的需要创建两个流。
  • 将每个流中的数据发送到专用的输出主题(应之前创建)。
© www.soinside.com 2019 - 2024. All rights reserved.