JSON对象的Kafka流使用者:如何映射

问题描述 投票:0回答:1

我是Kafka / Kafka Stream的新手。我正在使用最新 Kafka / kafka-stream和kafka-client和openjdk11。我的生产者正在生产看起来像的json对象(其中keyname

{"Name":"John", "amount":123, "time":2019-10-03T05:24:52" }

生产者代码以便更好地理解:

public static ProducerRecord<String, String> newRandomTransaction(String name) {
    // creates an empty json {}
    ObjectNode transaction = JsonNodeFactory.instance.objectNode();

    Integer amount = ThreadLocalRandom.current().nextInt(0, 100);

    // Instant.now() is to get the current time
    Instant now = Instant.now();

    // we write the data to the json document
    transaction.put("name", name);
    transaction.put("amount", amount);
    transaction.put("time", now.toString());
    return new ProducerRecord<>("bank-transactions", name, transaction.toString());
}

现在,我正在尝试编写消耗交易并计算该人余额中总金额的应用程序。

FYI:我使用的是旧代码,并试图使其正常运行。]

已使用GroupBYKey,因为主题已具有右键。然后aggregate计算总余额我在哪里挣扎。

此刻的应用程序(注释掉的部分是我试图使其在下一行中运行的旧代码):

公共类BankBalanceExactlyOnceApp {私有静态ObjectMapper映射器= new ObjectMapper();

public static void main(String[] args) {
    Properties config = new Properties();

    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "bank-balance-application");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
    config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");

    // Exactly once processing!!
    config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

    // json Serde
    final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
    final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
    final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);


    StreamsBuilder builder = new StreamsBuilder();

    KStream<String, JsonNode> bankTransactions =
            builder.stream( "bank-transactions", Materialized.with(Serdes.String(), jsonSerde);


    // create the initial json object for balances
    ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();
    initialBalance.put("count", 0);
    initialBalance.put("balance", 0);
    initialBalance.put("time", Instant.ofEpochMilli(0L).toString());

    /*KTable<String, JsonNode> bankBalance = bankTransactions
            .groupByKey(Serdes.String(), jsonSerde)
            .aggregate(
                    () -> initialBalance,
                    (key, transaction, balance) -> newBalance(transaction, balance),
                    jsonSerde,
                    "bank-balance-agg"
            );*/

    KTable<String, JsonNode> bankBalance = bankTransactions
            .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
            .aggregate(
                    () -> initialBalance,
                    (key, transaction, balance) -> {
                        //String t = transaction.toString();
                        newBalance(transaction, balance);
                    },
                    Materialized.with(Serdes.String(), jsonSerde),
                    "bank-balance-agg"
            );

    bankBalance.toStream().to("bank-balance-exactly-once", Produced.with(Serdes.String(), jsonSerde));

    KafkaStreams streams = new KafkaStreams(builder.build(), config);
    streams.cleanUp();
    streams.start();

    // print the topology
    System.out.println(streams.toString());

    // shutdown hook to correctly close the streams application
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}

private static JsonNode newBalance(JsonNode transaction, JsonNode balance) {
    // create a new balance json object
    ObjectNode newBalance = JsonNodeFactory.instance.objectNode();
    newBalance.put("count", balance.get("count").asInt() + 1);
    newBalance.put("balance", balance.get("balance").asInt() + transaction.get("amount").asInt());

    Long balanceEpoch = Instant.parse(balance.get("time").asText()).toEpochMilli();
    Long transactionEpoch = Instant.parse(transaction.get("time").asText()).toEpochMilli();
    Instant newBalanceInstant = Instant.ofEpochMilli(Math.max(balanceEpoch, transactionEpoch));
    newBalance.put("time", newBalanceInstant.toString());
    return newBalance;
}

}

问题是:当我尝试在该行中调用newBalance(transaction,balance)时:

aggregate(
                    () -> initialBalance,
                    (key, transaction, balance) -> newBalance(transaction, balance),
                    jsonSerde,
                    "bank-balance-agg"
            )

并通过msg查看编译器错误

newBalance(JsonNode, JsonNode) can not be applied to (<lambda parameter>,<lambda parameter>)

我尝试将其读取为字符串,将参数类型从JsonNode更改为Object。但是,无法修复。

我可以对如何解决提出任何建议吗?

java lambda apache-kafka apache-kafka-streams
1个回答
0
投票

Kafka Streams 2.3中的[KGroupedStream没有具有以下签名的方法:

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
                             String aggregateName);

有两个重载方法aggregate

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

您应该使用第二个,您的代码应类似于:

KTable<String, JsonNode> bankBalance = input
        .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
        .aggregate(
                () -> initialBalance,
                (key, transaction, balance) -> newBalance(transaction, balance),
                Materialized.with(Serdes.String(), jsonSerde)
        );
© www.soinside.com 2019 - 2024. All rights reserved.