Kafka流:KTable到KStream一对多转换

问题描述 投票:1回答:1

我不确定是否可行,但这是我的情况:

  • 我有一个用Long作为键的KTable和一个作为值的ArrayList的KTable
  • [当KTable中特定键的值(ID的数组列表)发生更改时,我想针对数组列表中每个ID的主题发布消息。
  • 假设我在KTable中有一个记录<1, [2,3]>。如果现在更改为<1, [2,4]>,我想在output_topic-<2, Object>, <4,Object>上发布消息。对象是从KTable值中的其他属性派生的。

为此,我一直在考虑将KTable转换为KStream,方法是为其提供自定义KeyValueMapper,然后将Stream发布到一个主题上。我遇到了一个问题,从网上可以看到KeyValueMapper一次只能转换一条消息,因此它对于一对一映射很有用。在我的方案中,我想将一个传入消息转换为多个传出消息。这是我的代码:

final KeyValueMapper<Long, CustomGroup, KeyValue<Long, CustomCluster>> outputMapper = new KeyValueMapper<Long, CustomGroup,KeyValue<Long, CustomCluster>>() {
            public KeyValue<Long, CustomCluster> apply(Long key, CustomGroup value) {

            ArrayList<KeyValue<Long, CustomCluster>> result = new ArrayList<>(value.compositeIds.size());
            for(Long compositeId : value.compositeIds) {
                CustomCluster c = new CustomCluster(value.appSettingID, value.appSettingName, compositeId, value.user);
                c.clusters = new HashSet<Cluster>();
                c.clusters.add(new Cluster(c.clusterId, c.clusterName));
                result.add(new KeyValue<>(compositeId, c));
            }

            return result;

        }
    };

KStream<Long, CustomCluster> outputStream = cgTable.toStream(outputMapper);

结果是包含我所有消息的列表。这段代码给了我一个语法错误,因为它期望返回一个KeyValue对象而不是一个KeyValue对象的ArrayList。

是否有一种方法可以使outputMapper按照我的设想工作?还有另一种方法可以完成任务。基本上,我需要发送从特定主题的ktable记录值派生的多个消息。

当传入的更改位于KStream而不是KTable中时,我能够完成任务。 KSTream提供了flatMap方法来做到这一点。

final KeyValueMapper<Long, CustomGroup, Iterable<KeyValue<Long, CustomCluster>>> outputMapper = new KeyValueMapper<Long, CustomGroup, Iterable<KeyValue<Long, CustomCluster>>>() {
        public Iterable<KeyValue<Long, CustomCluster>> apply(Long key, CustomGroup value) {

            ArrayList<KeyValue<Long, CustomCluster>> result = new ArrayList<>(value.compositeIds.size());
            for(Long compositeId : value.compositeIds) {
                CustomCluster c = new CustomCluster(value.appSettingID, value.appSettingName, compositeId, value.user);
                c.clusters = new HashSet<Cluster>();
                c.clusters.add(new Cluster(c.clusterId, c.clusterName));
                result.add(new KeyValue<>(compositeId, c));
            }

            return result;

        }
    };

KStream<Long, CustomCluster> outputStream = cgStream.flatMap(outputMapper);

所以问题是KTable是否提供与KStream的flatMap方法等效的方法。我希望我不要太困惑了。感谢您的帮助

java apache-kafka one-to-many apache-kafka-streams
1个回答
0
投票

您是否可以使用KTable完成此操作?

© www.soinside.com 2019 - 2024. All rights reserved.