Kafka Streams List Serdes:始终为空

问题描述 投票:0回答:2

我做错了什么?

我正在编写一个 ProcessorSupplier,用于将 n 条记录聚合为一条。为此,我正在使用 List Serdes ...

我的问题是ArrayList总是空的。

使用 Java 21 和 Kafka 流 3.7.0

public class KafkaTheBatcherProcessorApiApplication {

  public static void main(String[] args) {

    final Topology topology = new Topology();

    topology.addSource( "source-node", stringSerde.deserializer(), stringSerde.deserializer(), "inputTopic");

    topology.addProcessor("aggregate-records",
      new BatchProcessorSupplierPersistedStore(),
      "source-node");

    topology.addSink( "sink-node", "outputTopic", stringSerde.serializer(),  listSerde.serializer(), "aggregate-records");

    Properties properties = new Properties();

    try (KafkaStreams kafkaStreams = new KafkaStreams(topology, properties)) {     
        kafkaStreams.start();
  }

}

然后是我的供应商

class BatchProcessorSupplierPersistedStore implements ProcessorSupplier<String, String, String, List<String>> {

  @Override
  public Set<StoreBuilder<?>> stores() {
    return Set.of(
      Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.ListSerde(ArrayList.class, Serdes.String()))     
    );
  }

  @Override
  public Processor<String, String, String, List<String>> get() {
    return new Processor<>() {

      private ProcessorContext<String, List<String>> context;
      private KeyValueStore<String, List<String>> storeList;  

      @Override
      public void init(ProcessorContext<String, List<String>> context) {
        this.context = context;
        storeList = context.getStateStore("batch-store");        
        this.context.schedule(Duration.ofSeconds(60), PunctuationType.STREAM_TIME, this::forwardAll);
      }

      private void forwardAll(final long timestamp) {
          storeList.all().forEachRemaining(entry -> {
            if (!entry.value.isEmpty()) {
              context.forward(...  
          });

      }

      @Override
      public void process(Record<String, String> record) {
          if (storeList.get(record.key()) == null) {
            storeList.put(record.key(), new ArrayList<>(10000));
          }
          storeList.get(record.key()).add(record.value());
         
      }
    };
  }
}

我添加了一个商店柜台:

Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("batch-store-counter"), Serdes.String(), Serdes.Integer())

并且这个按预期工作。

java apache-kafka apache-kafka-streams
2个回答
0
投票

您初始化的列表已放入状态存储中,但您从未将更新后的列表放入状态存储中


0
投票

正如@melomane71建议的那样,无法更新状态列表。

解决了覆盖密钥的问题,这样做

  @Override
      public void process(Record<String, String> record) {
      
          var list = storeList.get(record.key()); 
          
          if (list == null) {
            storeList.put(record.key(), List.of(record.value()));            
          } else {
            storeList.put(record.key(), new ArrayList<>(list) {{
              add(record.value());
            }});
          }     
      }
    };
© www.soinside.com 2019 - 2024. All rights reserved.