如何将Flow转换为Flowable?

问题描述 投票:1回答:1

我刚刚添加

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.3.3"

到项目。我在A类中有suspend fun foo(): Flow<Bar>(来自外部包装)。

我需要获取Flowable<Bar>才能在Java中使用。如果可能的话,我想使用扩展fun A.fooRx(): Flowable<Bar>

kotlin rx-java2 kotlin-coroutines
1个回答
0
投票

您必须从Kotlin的协程中偷偷返回Foo<Bar>

// SomeSuspendAPI.kt
// -----------------

// the method to convert
suspend fun <T> Flow<T>.foo() : Flow<Int> {
    return flow { emit(0) }
}

@ExperimentalCoroutinesApi
fun <T> Flow<T>.fooRx() : CompletableFuture<Flowable<Int>> {
    val self = this
    val future = CompletableFuture<Flowable<Int>>()
    GlobalScope.launch {
        try {
            future.complete(self.foo().asFlowable())
        } catch (ex: Throwable) {
            future.completeExceptionally(ex);
        }
    }
    return future
}

// Demo purposes
fun <T> just(v: T) = flow { emit(v) }

然后您可以在Java中使用它:

public class UseFoo {
    public static void main(String[] args) throws Exception {
        SomeSuspendAPIKt.fooRx(
                SomeSuspendAPIKt.just(1)
        )
        .thenAccept(flowable -> flowable.subscribe(System.out::println))
        .join();
    }
}

编辑1:

您当然可以将一些代码移回Kotlin一侧:

fun <T> Flow<T>.fooRx2() : Flowable<Int> {
    val self = this
    val subject = SingleSubject.create<Flowable<Int>>()
    GlobalScope.launch {
        try {
            subject.onSuccess(self.foo().asFlowable())
        } catch (ex: Throwable) {
            subject.onError(ex)
        }
    }
    return subject.flatMapPublisher { it }
}

然后

public class UseFoo {
    public static void main(String[] args) throws Exception {
        SomeSuspendAPIKt.fooRx2(SomeSuspendAPIKt.just(1))
                .blockingSubscribe(System.out::println);
    }
}

编辑2:

您可以通过在Kotlin端使用转换来对此进行概括,该转换使您可以连续传递对象:

fun <T, R: Any> Flow<T>.transformAsync(fn: suspend (t: Flow<T>) -> Flow<R>) : Flowable<R> {
    val self = this
    val subject = SingleSubject.create<Flowable<R>>()
    GlobalScope.launch {
        try {
            val r = fn(self).asFlowable();
            subject.onSuccess(r)
        } catch (ex: Throwable) {
            subject.onError(ex)
        }
    }
    return subject.flatMapPublisher { it }
}
public class UseFoo {
    public static void main(String[] args) throws Exception {

        SomeSuspendAPIKt.transformAsync(
                SomeSuspendAPIKt.just(1),
                (source, cont) -> SomeSuspendAPIKt.foo(source, cont)
        )
        .blockingSubscribe(System.out::println);
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.