创建 Kafka Stream 以根据值计算金额

问题描述 投票:0回答:3

我正在生成如下数据:

Key: "Mike", value: {"amount":46,"time":"2021-11-05T07:53:32.005751Z"}
Key: "John", value: {"amount":46,"time":"2021-11-05T07:53:32.005751Z"}
Key: "Mike", value: {"amount":50,"time":"2021-11-05T07:53:32.005751Z"}

Key 是字符串(像 Alice、John 这样的名字......)。 例如我需要结果:

{"Mike": 2}
{"John": 1}

{"key":"Mike", "count": 2}
{"key":"John", "count": 1}

我接下来尝试:

   public Topology createTopology(){
    StreamsBuilder builder = new StreamsBuilder();
    // json Serde
    final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
    final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
    final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

    KStream<String, JsonNode> textLines = builder.stream("bank-transactions", Consumed.with(Serdes.String(), jsonSerde));
    KTable<String, Long> wordCounts = textLines
            .map((k, v) -> new KeyValue<>(k, v.get("amount").asInt()))
            .groupByKey(Serialized.with(Serdes.String(), Serdes.Integer()))
            .count();

    wordCounts.toStream().to("person-transaction-frequency", Produced.with(Serdes.String(), Serdes.Long()));

    return builder.build();
}

public static void main(String[] args) {
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "bank-favorite-amount-application");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:29092");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    Mc4CalculateFavoriteAmount wordCountApp = new Mc4CalculateFavoriteAmount();

    KafkaStreams streams = new KafkaStreams(wordCountApp.createTopology(), config);
    streams.start();

    // shutdown hook to correctly close the streams application
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}

我正在尝试按名称对消息进行计数。但我在主题中得到了文物:

java apache-kafka apache-kafka-streams
3个回答
0
投票

如果您只是想计算键的数量,那么您可以丢弃整个值并将其替换为

1
对于每个看到的键。

KStream<String, Bytes> textLines = builder.stream("bank-transactions", Consumed.with(Serdes.String(), Serdes.Bytes()));
KTable<String, Long> wordCounts = textLines
        .mapValues(v -> 1L)
        .groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
        .count();

wordCounts.toStream().to("person-transaction-frequency", Produced.with(Serdes.String(), Serdes.Long()));

0
投票

您可以根据您的用例改编此官方 Confluence 示例。这个例子与你所问的非常相似。

为了进一步解释这一点,您需要创建一个流应用程序,在其中将数据从主题读取到

KStream

中。您尚未提供有关您的密钥的信息。在 Confluence 示例中,记录使用 
map()
 方法显式重新分区,为每个记录创建一个新的 
KeyValue
 实例(您可以执行类似使用 
amount
 作为键的操作)。然后事件按键分组并进行计数。


0
投票
Count 使用长 Serdes 写入值,因此当您使用 Conduktor 等工具查看它时,请确保使用正确的值格式(在本例中为长格式)。

© www.soinside.com 2019 - 2024. All rights reserved.