反应器有条件的两个来源

问题描述 投票:0回答:1

我有两个排序整数的来源:A和B。而且我需要将两个来源压缩为一个限制:仅将这些同时存在于A和B中的项目压缩。

例如:A是:1 2 3 10 11 12 13 14B是:10 12 13 14 15 16反应堆的zip运算符将生成(1,10),(2,12),(3,13),(10,14),(11,15),(12,16)但我想得到这个来源:(10,10),(12,12),(13,13),(14,14)

我在下面尝试了一种zipmatch方法,但是失败了,因为第一个完整的源将导致zip运算符过于完整的结果源,从而使源整数不匹配。

public static Flux<Tuple2<Integer, Integer>> match(Flux<Tuple2<Integer, Integer>> flux) {
        Tuple2<Integer, Integer> invalidValue = Tuples.of(0, 0);
        return flux.compose(f -> f.map(new Function<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
            private final Queue<Integer> left = new ArrayDeque<>(); // here is a problem: when complete, left or right queue may still have some data.
            private final Queue<Integer> right = new ArrayDeque<>();

            @Override
            public Tuple2<Integer, Integer> apply(Tuple2<Integer, Integer> pair) {
                left.add(pair.getT1());
                right.add(pair.getT2());

                return tryMatch();
            }

            private Tuple2<Integer, Integer> tryMatch() {
                if (left.isEmpty() || right.isEmpty()) {
                    return invalidValue;
                }
                Integer t1 = left.peek();
                Integer t2 = right.peek();
                if (t1.equals(t2)) {
                    left.poll();
                    right.poll();
                    return Tuples.of(t1, t2);
                } else if (t1 < t2) {
                    Integer item = left.poll();
                    log.warn("not matched: {}", item);
                    return tryMatch();
                } else {
                    Integer item = right.poll();
                    log.warn("not matched: {}", item);
                    return tryMatch();
                }
            }
        })).filter(pair -> pair != invalidValue);
    }

有人可以帮我,告诉我我可以使用什么运营商?

java project-reactor
1个回答
0
投票

不需要自己映射,可以使用联接和过滤器

Flux<Integer> f1 = Flux.just(1,2,3,10,11,12,13,14)
Flux<Integer> f2 = Flux.just(10,12,13,14,15,16)

f1.join(f2,f ->Flux.never(),f-> Flux.never(),Tuples::of)
                .filter(t -> t.getT1().equals(t.getT2()));
© www.soinside.com 2019 - 2024. All rights reserved.