Aggregation of Joined stream

问题描述 投票:0回答:2

I want to join two topic streams (left join) and do a window-based aggregation over the joined stream. However, the aggregation is counting some of the messages twice as during join some messages are emitted twice depending on the delay in the right topic. Following is the code for the POC.

        StreamsBuilder builder = new StreamsBuilder();
        KStream<Long, BidMessage> bidStream = builder.stream("bid", Consumed.with(new LongSerde(), new BidMessageSerde()).withTimestampExtractor(new BidMessageTimestampExtractor()));
        KStream<Long, ClickMessage> clickStream = builder.stream("click", Consumed.with(new LongSerde(), new ClickMessageSerde()).withTimestampExtractor(new ClickMessageTimestampExtractor()));

        KStream<String, BidMessage> newBidStream = bidStream.selectKey((key, value) -> value.getRequestId());
        KStream<String, CLickMessage> newClickStream = impStream.selectKey((key, value) -> value.getRequestId());

        KStream<String, BidMergedMessage> result = newBidStream.leftJoin(newImpStream,
                getValueJoiner(),
                JoinWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(0)),
                Joined.with(Serdes.String(), new BidMessageSerde(), new ClickMessageSerde()));

        result.groupBy((key, value) -> "" + value.getClientId(), Grouped.with(Serdes.String(), newBidMergedSerde()))
                .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(40)))
                .aggregate(() -> new AggResult(0, 0), (key, value, aggregate) -> {
                    if (value.getClickId() != null) {
                        aggregate.clicks_++;
                    }
                    aggregate.bids_++;
                    return aggregate;
                }, Materialized.with(Serdes.String(),new AggResultJsonSerde()))
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .toStream()
                .foreach((key, value) -> {
                    logger.info("{}-{}, clientId : {}, Value: {}", new Date(key.window().start()), new Date(key.window().end()),key.key(), value);
                });

        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

Can it be fixed to avoid the duplicates due to join?

apache-kafka apache-kafka-streams
2个回答
1
投票

I'm not sure I'm following your requirements thoroughly, but it seems the problem is you want to count bids only if there wasn't a merge with the RHS or if the join is successful. But due to some slowness in the RHS topic, you occasionally get two results first no-merged followed by a merge later when the RHS record arrives.

You could add a TransformValues operator on the result KStream and use a statestore to keep track of the records coming in. When you have a duplicate that arrives from a successful join, you can look in the statestore and remove the record with the null RHS if it exists, then forward the correct join result.

To forward the records that never resulted in a successful join, you can consider using punctuate() to periodically go through the store and emit records that don't have a match and have been in the statestore beyond a time you feel a join should have occurred.

This tutorial from Kafka Tutorials may server as a guide as well.


0
投票

I can think of two options.

  1. The first thing to try would be to add a filter to the KStream<String, BidMergedMessage> result stream. I'm assuming that you can tell from the object returned from the ValueJoiner that the RHS value is null.

  2. Use an inner-join newBidStream.join(newImpStream...

-Bill

© www.soinside.com 2019 - 2024. All rights reserved.