我有两个排序整数的来源:A和B。而且我需要将两个来源压缩为一个限制:仅将这些同时存在于A和B中的项目压缩。
例如:A是:1 2 3 10 11 12 13 14B是:10 12 13 14 15 16反应堆的zip
运算符将生成(1,10),(2,12),(3,13),(10,14),(11,15),(12,16)但我想得到这个来源:(10,10),(12,12),(13,13),(14,14)
我在下面尝试了一种zip
和match
方法,但是失败了,因为第一个完整的源将导致zip
运算符过于完整的结果源,从而使源整数不匹配。
public static Flux<Tuple2<Integer, Integer>> match(Flux<Tuple2<Integer, Integer>> flux) {
Tuple2<Integer, Integer> invalidValue = Tuples.of(0, 0);
return flux.compose(f -> f.map(new Function<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
private final Queue<Integer> left = new ArrayDeque<>(); // here is a problem: when complete, left or right queue may still have some data.
private final Queue<Integer> right = new ArrayDeque<>();
@Override
public Tuple2<Integer, Integer> apply(Tuple2<Integer, Integer> pair) {
left.add(pair.getT1());
right.add(pair.getT2());
return tryMatch();
}
private Tuple2<Integer, Integer> tryMatch() {
if (left.isEmpty() || right.isEmpty()) {
return invalidValue;
}
Integer t1 = left.peek();
Integer t2 = right.peek();
if (t1.equals(t2)) {
left.poll();
right.poll();
return Tuples.of(t1, t2);
} else if (t1 < t2) {
Integer item = left.poll();
log.warn("not matched: {}", item);
return tryMatch();
} else {
Integer item = right.poll();
log.warn("not matched: {}", item);
return tryMatch();
}
}
})).filter(pair -> pair != invalidValue);
}
有人可以帮我,告诉我我可以使用什么运营商?
不需要自己映射,可以使用联接和过滤器
Flux<Integer> f1 = Flux.just(1,2,3,10,11,12,13,14)
Flux<Integer> f2 = Flux.just(10,12,13,14,15,16)
f1.join(f2,f ->Flux.never(),f-> Flux.never(),Tuples::of)
.filter(t -> t.getT1().equals(t.getT2()));