如何编写Observable 使用Vert.x泵进行归档?

问题描述 投票:1回答:1

我想使用Vert.x 3.5.0 Pump(RxJava2变体)将rs Observable的所有发出的Buffers流式传输到一个文件中。以下示例在某种程度上不起作用,我不知道为什么。

import static org.junit.Assert.assertEquals;

import java.io.File;
import java.io.IOException;

import org.apache.commons.io.FileUtils;
import org.junit.Test;

import io.reactivex.Observable;
import io.vertx.core.file.OpenOptions;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.file.AsyncFile;
import io.vertx.reactivex.core.streams.Pump;

public class PumpTest {

    @Test
    public void testPump() throws IOException {
        Vertx vertx = Vertx.vertx();
        File file = new File("target/test");
        file.delete();

        Observable<Buffer> rs = Observable.just(Buffer.buffer("test123"));
        vertx.fileSystem().rxOpen(file.getAbsolutePath(), new OpenOptions()).map(f -> {
            Pump pump = Pump.pump(rs, f);
            pump.start();
            return f;
        }).map(AsyncFile::flush).subscribe(AsyncFile::close);
        assertEquals("test123", FileUtils.readFileToString(file));
    }
}
rx-java2 vert.x
1个回答
0
投票

查看ReadStream<T>Pump.pump的第一个参数)的文档指向ReadStreamSubscriber类。哪个可以将可观察量变成读取流。

而不是rsReadStreamSubscriber.asReadStream(rs, Function.identity())应该在提供的代码中工作。

© www.soinside.com 2019 - 2024. All rights reserved.