如何在同一主题上使用globalKtable和StateStore?

问题描述 投票:0回答:1

为了澄清,我是Kafka的新手,很抱歉,如果我的问题似乎没有记载,我正在阅读教程,文档以及我能理解的一切。

我正在尝试从GlobalStore读取所有值以更新其值,然后使用已经存在的StateStore来放置这些新的更新值。

我正在尝试这样做,因为当我这样做时:

this.stateStore.all();

我只有1/10的数据,如果我理解正确,这是因为我有10个分区,而ss只读取一个(尽管我不完全理解为什么)]

这是我的globalTable:

    public StreamsBuilder declareTopology(StreamsBuilder builder) {

        logger.debug("Building topology : input topic ~ {} ; output topics ~ {}, {}",
                getInputTopic(),
                getDataTopic(),
                getToEsTopic());

        builder.globalTable(
                getDataTopic(),
                Consumed.with(Serdes.String(), fooSerdes)
                        .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST),
                Materialized.<String, Foo, KeyValueStore<Bytes, byte[]>>as(
                        "foosktable")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(fooSerdes)
                        .withLoggingEnabled(new HashMap<>()));
    ...

这是addStateStore,我无法删除,因为它已在代码的其他地方使用:

       ...

       builder.addStateStore(
            Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore("foosktable"),
                    Serdes.String(),
                    fooSerdes));
    ...

    return builder;
}

因此,从理论上讲,我在想的是删除也使用相同主题的StateStore,并使用我的data.process主题之一放置数据,问题是此处理器对该StateStore进行其他处理,所以我不能uke它。

我在这里迷路了,任何光线都会大有帮助。谢谢!

java apache-kafka apache-kafka-streams kafka-producer-api
1个回答
0
投票

尚不清楚您实际上要实现什么。但是,一些高级解释:

GlobalKTable仅具有一个目的:不经主题修改即读取数据,以允许进行KStream-GlobalKTable联接或通过“交互式查询”查询存储。

因此,您无法真正做您想做的事情,因为无法按照您的意图将数据从全局存储复制到另一个存储。您将需要复制输入主题并阅读两次:(1)以GlobalKTable的身份和(2)以常规KStream的身份来修改数据,然后再将其放入存储中。对于(2),您可以使用transform()

希望这会有所帮助。

© www.soinside.com 2019 - 2024. All rights reserved.