我做错了什么?
我正在编写一个 ProcessorSupplier,用于将 n 条记录聚合为一条。为此,我正在使用 List Serdes ...
我的问题是ArrayList总是空的。
使用 Java 21 和 Kafka 流 3.7.0
public class KafkaTheBatcherProcessorApiApplication {
public static void main(String[] args) {
final Topology topology = new Topology();
topology.addSource( "source-node", stringSerde.deserializer(), stringSerde.deserializer(), "inputTopic");
topology.addProcessor("aggregate-records",
new BatchProcessorSupplierPersistedStore(),
"source-node");
topology.addSink( "sink-node", "outputTopic", stringSerde.serializer(), listSerde.serializer(), "aggregate-records");
Properties properties = new Properties();
try (KafkaStreams kafkaStreams = new KafkaStreams(topology, properties)) {
kafkaStreams.start();
}
}
然后是我的供应商
class BatchProcessorSupplierPersistedStore implements ProcessorSupplier<String, String, String, List<String>> {
@Override
public Set<StoreBuilder<?>> stores() {
return Set.of(
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.ListSerde(ArrayList.class, Serdes.String()))
);
}
@Override
public Processor<String, String, String, List<String>> get() {
return new Processor<>() {
private ProcessorContext<String, List<String>> context;
private KeyValueStore<String, List<String>> storeList;
@Override
public void init(ProcessorContext<String, List<String>> context) {
this.context = context;
storeList = context.getStateStore("batch-store");
this.context.schedule(Duration.ofSeconds(60), PunctuationType.STREAM_TIME, this::forwardAll);
}
private void forwardAll(final long timestamp) {
storeList.all().forEachRemaining(entry -> {
if (!entry.value.isEmpty()) {
context.forward(...
});
}
@Override
public void process(Record<String, String> record) {
if (storeList.get(record.key()) == null) {
storeList.put(record.key(), new ArrayList<>(10000));
}
storeList.get(record.key()).add(record.value());
}
};
}
}
我添加了一个商店柜台:
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("batch-store-counter"), Serdes.String(), Serdes.Integer())
并且这个按预期工作。
您初始化的列表已放入状态存储中,但您从未将更新后的列表放入状态存储中
正如@melomane71建议的那样,无法更新状态列表。
解决了覆盖密钥的问题,这样做
@Override
public void process(Record<String, String> record) {
var list = storeList.get(record.key());
if (list == null) {
storeList.put(record.key(), List.of(record.value()));
} else {
storeList.put(record.key(), new ArrayList<>(list) {{
add(record.value());
}});
}
}
};