缓冲流中的唯一值,直到它不再唯一 - 在 groupby 中使用缓存

问题描述 投票:0回答:1

我有一个卡夫卡话题的通量。所以我必须处理大量的无限热源。不过,为了简单起见,我将把它同化为一个整数通量。

我想把这个通量变成一个无序的值通量,它只包含不是通量唯一的值。

例如

1, 2, 3, 2, 1, 1, 4 -> 2, 2, 1, 1, 1

这是一个有效的尝试:

    Flux<Integer> flux = Flux.just(1, 2, 3, 4, 1, 1, 1, 2, 4);

    flux.groupBy(Function.identity())
        .flatMap(
                it -> {
                    Flux<Integer> shared = it.cache();
                    return Flux.concat(shared.buffer(2)
                            .take(1)
                            .map(buff -> buff.stream().findFirst().get()), shared.skip(1));
                }
        )
        .doOnNext(System.out::println)
        .blockLast();

但是我不喜欢在 flatMap 中使用 Flux.cache,因为我担心内存使用。

我也不确定它如何与热源一起工作,就好像我使用 publish() 变成热通量一样,什么也没有发生。

我之前已经解释过了。

java project-reactor
1个回答
0
投票

据我了解要求,你想要

  • 过滤重复项
  • 来自实时流。

要过滤重复项,确定什么是重复项至关重要(快速且安全!)。其余的可以像这样以反应方式处理:

package com.example.demo;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;

import org.reactivestreams.Publisher;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

@Slf4j
public class ReactiveDemo {

    // our "test memory"
    private static final Integer BIG_MEM = Integer.MAX_VALUE / 32;
    // here we "simulate" very fast "indexer"
    static Set<Integer> uniques = new HashSet<>(BIG_MEM);// optimize for "few resizes"
    // to recognize (and store (all) repeating) dups
    static Map<Integer, List<Integer>> dupes = Collections.synchronizedMap(new HashMap<>(BIG_MEM));

    public static void main(String[] args) {
        Flux<Integer> tstInbound = Flux.just(1, 2, 3, 4, 1, 1, 1, 2, 4);
        // for endless simulation, replace with:
        // Flux.from(source);// kill with Ctrl+C (SIG), sry

        // then, just fire:
        tstInbound.subscribe((nxt) -> {// subscribe/publish/do something, DONT block!
            checkDupes(nxt);
        });
        // nxt level: caching/buffering/throtelling/draining :)
    }

    static void checkDupes(Integer nxt) { // here is the main job/business logic
        // do it "thread safe" ..and as fast as you can
        if (uniques.contains(nxt)) {// we have a (1s time) duplication
            // (as i understood requirement)...
            uniques.remove(nxt);
            // and:
            dupes.compute(nxt, dupLogic);// (should be fast, too)

            // we did our job (identifying, handling, memoizing dupes), bye-bye dupe
            downStream.accept(nxt); // this can be slow 
        } else if (dupes.containsKey(nxt)) {// we have a (repeated) duplicate
            dupes.get(nxt).add(nxt);
            downStream.accept(nxt); // ... we don't halt here
        } else { // first time encounter
            // don't send to donwstream, safe "uniqe"
            uniques.add(nxt);
        }
    }

    // this is the "dup logic"
    private static BiFunction<? super Integer, ? super List<Integer>, ? extends List<Integer>> dupLogic = (currKey,
            currList) -> {
        if (currList == null) {// first time dupe!
            currList = new LinkedList<>();
            // here can, but no must..you see:
            currList.add(currKey);
        } else { // add to the others
            currList.add(currKey);
        }
        return currList;
    };

    // unordered(!), infinite(!), (reactive) int (data!) "consumer" (or one of it's
    // super/sub classes;), just logs
    static Consumer<Integer> downStream = (i) -> {
        try {
            TimeUnit.SECONDS.sleep(1); // <- more simulate

            log.info("dupe received: {}", i);
            log.info(dupes.get(i).toString());
        } catch (InterruptedException e) {
            log.warn("Interrupted downstream");
        }
    };

    // we want duplicates quickly
    private static final Integer KEY_SPACE = BIG_MEM / 8182;
    // (only) test data generator
    static final Random rnd = new Random(0xCAFEBABE); // same seed-> same data!
    // this our (test) publisher:
    private static Publisher<? extends Integer> source = (s) -> {
        while (true) { // kill with CTRL+C, sowwy! :)
            try {
                TimeUnit.MICROSECONDS.sleep(1); // <- little simulate
                s.onNext(rnd.nextInt(KEY_SPACE)); // <- publisher api (1.)
            } catch (InterruptedException e) {
                log.warn("Interrupted upstream");
                s.onError(e); // <- publisher api (2.)
            }
            s.onComplete(); // <- publisher api (3.)
        }
    };

}

对于

Flux.just(1, 2, 3, 4, 1, 1, 1, 2, 4);
,我们得到:

xx:x1:51.028 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 1
xx:x1:51.029 [main] INFO com.example.demo.ReactiveDemo -- [1]
xx:x1:52.042 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 1
xx:x1:52.042 [main] INFO com.example.demo.ReactiveDemo -- [1, 1]
xx:x1:53.055 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 1
xx:x1:53.055 [main] INFO com.example.demo.ReactiveDemo -- [1, 1, 1]
xx:x1:54.062 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 2
xx:x1:54.062 [main] INFO com.example.demo.ReactiveDemo -- [2]
xx:x1:55.073 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 4
xx:x1:55.073 [main] INFO com.example.demo.ReactiveDemo -- [4]

模拟(重新)产生(带有

KEY_SPACE
和随机“种子”)

xx:x0:11.756 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 6751
xx:x0:11.758 [main] INFO com.example.demo.ReactiveDemo -- [6751]
xx:x0:12.941 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 3948
xx:x0:12.941 [main] INFO com.example.demo.ReactiveDemo -- [3948]
xx:x0:13.998 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 838
xx:x0:13.998 [main] INFO com.example.demo.ReactiveDemo -- [838]
xx:x0:15.216 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 3294
xx:x0:15.216 [main] INFO com.example.demo.ReactiveDemo -- [3294]
xx:x0:16.407 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 8139
# .. runs "infinitely" ..until memory filled :)
© www.soinside.com 2019 - 2024. All rights reserved.