使用RxJava进行线程流水处理

问题描述 投票:0回答:1

RxJava专家,这是您发光的机会!

您是否可以通过仅更改IllegalStateException方法中以Flowable.generate()开头的RxJava管道来确保以下程序不会抛出main()

class ExportJob {
    private static Scheduler singleThread(String threadName) {
        return Schedulers.from(newFixedThreadPool(1, r -> {
            Thread t = new Thread(r, threadName);
            t.setDaemon(true);
            return t;
        }));
    }

    public static void main(String[] args) {
        Scheduler genSched = singleThread("genThread");
        Scheduler mapSched = singleThread("mapThread");
        // execute on "genThread"
        Flowable.generate(ExportJob::infiniteGenerator)
                .subscribeOn(genSched, false)
                // execute on "mapThread"
                .observeOn(mapSched, false)
                .concatMapMaybe(ExportJob::mapping)
                // execute on the thread that creates the pipeline, block it until finished
                .blockingForEach(ExportJob::terminal);
    }

    private static int nb;
    /** Must execute on "genThread" thread. */
    private static void infiniteGenerator(Emitter<Integer> emitter) {
        print(nb, "infiniteGenerator");
        emitter.onNext(nb++);
        checkCurrentThread("genThread");
    }

    /** Must execute on "mapThread" thread. */
    private static Maybe<Integer> mapping(Integer s) {
        print(s, "mapping");
        checkCurrentThread("mapThread");
        return Maybe.just(s);
    }

    /** Must execute on "terminal" thread. */
    private static void terminal(Integer s) {
        print(s, "terminal");
        checkCurrentThread("main");
    }

    private static void print(int item, String method) {
        System.out.format("%d - %s - %s()%n", item, Thread.currentThread().getName(), method);
    }

    private static void checkCurrentThread(String expectedThreadName) throws IllegalStateException {
        String name = Thread.currentThread().getName();
        if (!name.equals(expectedThreadName)) {
            throw new IllegalStateException("Thread changed from '" + expectedThreadName + "' to '" + name + "'");
        }
    }
}
multithreading rx-java2 scheduling
1个回答
© www.soinside.com 2019 - 2024. All rights reserved.