我有一个卡夫卡话题的通量。所以我必须处理大量的无限热源。不过,为了简单起见,我将把它同化为一个整数通量。
我想把这个通量变成一个无序的值通量,它只包含不是通量唯一的值。
例如
1, 2, 3, 2, 1, 1, 4 -> 2, 2, 1, 1, 1
这是一个有效的尝试:
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 1, 1, 1, 2, 4);
flux.groupBy(Function.identity())
.flatMap(
it -> {
Flux<Integer> shared = it.cache();
return Flux.concat(shared.buffer(2)
.take(1)
.map(buff -> buff.stream().findFirst().get()), shared.skip(1));
}
)
.doOnNext(System.out::println)
.blockLast();
但是我不喜欢在 flatMap 中使用 Flux.cache,因为我担心内存使用。
我也不确定它如何与热源一起工作,就好像我使用 publish() 变成热通量一样,什么也没有发生。
我之前已经解释过了。
据我了解要求,你想要
要过滤重复项,确定什么是重复项至关重要(快速且安全!)。其余的可以像这样以反应方式处理:
package com.example.demo;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class ReactiveDemo {
// our "test memory"
private static final Integer BIG_MEM = Integer.MAX_VALUE / 32;
// here we "simulate" very fast "indexer"
static Set<Integer> uniques = new HashSet<>(BIG_MEM);// optimize for "few resizes"
// to recognize (and store (all) repeating) dups
static Map<Integer, List<Integer>> dupes = Collections.synchronizedMap(new HashMap<>(BIG_MEM));
public static void main(String[] args) {
Flux<Integer> tstInbound = Flux.just(1, 2, 3, 4, 1, 1, 1, 2, 4);
// for endless simulation, replace with:
// Flux.from(source);// kill with Ctrl+C (SIG), sry
// then, just fire:
tstInbound.subscribe((nxt) -> {// subscribe/publish/do something, DONT block!
checkDupes(nxt);
});
// nxt level: caching/buffering/throtelling/draining :)
}
static void checkDupes(Integer nxt) { // here is the main job/business logic
// do it "thread safe" ..and as fast as you can
if (uniques.contains(nxt)) {// we have a (1s time) duplication
// (as i understood requirement)...
uniques.remove(nxt);
// and:
dupes.compute(nxt, dupLogic);// (should be fast, too)
// we did our job (identifying, handling, memoizing dupes), bye-bye dupe
downStream.accept(nxt); // this can be slow
} else if (dupes.containsKey(nxt)) {// we have a (repeated) duplicate
dupes.get(nxt).add(nxt);
downStream.accept(nxt); // ... we don't halt here
} else { // first time encounter
// don't send to donwstream, safe "uniqe"
uniques.add(nxt);
}
}
// this is the "dup logic"
private static BiFunction<? super Integer, ? super List<Integer>, ? extends List<Integer>> dupLogic = (currKey,
currList) -> {
if (currList == null) {// first time dupe!
currList = new LinkedList<>();
// here can, but no must..you see:
currList.add(currKey);
} else { // add to the others
currList.add(currKey);
}
return currList;
};
// unordered(!), infinite(!), (reactive) int (data!) "consumer" (or one of it's
// super/sub classes;), just logs
static Consumer<Integer> downStream = (i) -> {
try {
TimeUnit.SECONDS.sleep(1); // <- more simulate
log.info("dupe received: {}", i);
log.info(dupes.get(i).toString());
} catch (InterruptedException e) {
log.warn("Interrupted downstream");
}
};
// we want duplicates quickly
private static final Integer KEY_SPACE = BIG_MEM / 8182;
// (only) test data generator
static final Random rnd = new Random(0xCAFEBABE); // same seed-> same data!
// this our (test) publisher:
private static Publisher<? extends Integer> source = (s) -> {
while (true) { // kill with CTRL+C, sowwy! :)
try {
TimeUnit.MICROSECONDS.sleep(1); // <- little simulate
s.onNext(rnd.nextInt(KEY_SPACE)); // <- publisher api (1.)
} catch (InterruptedException e) {
log.warn("Interrupted upstream");
s.onError(e); // <- publisher api (2.)
}
s.onComplete(); // <- publisher api (3.)
}
};
}
对于
Flux.just(1, 2, 3, 4, 1, 1, 1, 2, 4);
,我们得到:
xx:x1:51.028 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 1
xx:x1:51.029 [main] INFO com.example.demo.ReactiveDemo -- [1]
xx:x1:52.042 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 1
xx:x1:52.042 [main] INFO com.example.demo.ReactiveDemo -- [1, 1]
xx:x1:53.055 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 1
xx:x1:53.055 [main] INFO com.example.demo.ReactiveDemo -- [1, 1, 1]
xx:x1:54.062 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 2
xx:x1:54.062 [main] INFO com.example.demo.ReactiveDemo -- [2]
xx:x1:55.073 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 4
xx:x1:55.073 [main] INFO com.example.demo.ReactiveDemo -- [4]
模拟(重新)产生(带有
KEY_SPACE
和随机“种子”)
xx:x0:11.756 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 6751
xx:x0:11.758 [main] INFO com.example.demo.ReactiveDemo -- [6751]
xx:x0:12.941 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 3948
xx:x0:12.941 [main] INFO com.example.demo.ReactiveDemo -- [3948]
xx:x0:13.998 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 838
xx:x0:13.998 [main] INFO com.example.demo.ReactiveDemo -- [838]
xx:x0:15.216 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 3294
xx:x0:15.216 [main] INFO com.example.demo.ReactiveDemo -- [3294]
xx:x0:16.407 [main] INFO com.example.demo.ReactiveDemo -- dupe received: 8139
# .. runs "infinitely" ..until memory filled :)