我刚刚添加
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.3.3"
到项目。我在A类中有suspend fun foo(): Flow<Bar>
(来自外部包装)。
我需要获取Flowable<Bar>
才能在Java中使用。如果可能的话,我想使用扩展fun A.fooRx(): Flowable<Bar>
。
您必须从Kotlin的协程中偷偷返回Foo<Bar>
:
// SomeSuspendAPI.kt
// -----------------
// the method to convert
suspend fun <T> Flow<T>.foo() : Flow<Int> {
return flow { emit(0) }
}
@ExperimentalCoroutinesApi
fun <T> Flow<T>.fooRx() : CompletableFuture<Flowable<Int>> {
val self = this
val future = CompletableFuture<Flowable<Int>>()
GlobalScope.launch {
try {
future.complete(self.foo().asFlowable())
} catch (ex: Throwable) {
future.completeExceptionally(ex);
}
}
return future
}
// Demo purposes
fun <T> just(v: T) = flow { emit(v) }
然后您可以在Java中使用它:
public class UseFoo {
public static void main(String[] args) throws Exception {
SomeSuspendAPIKt.fooRx(
SomeSuspendAPIKt.just(1)
)
.thenAccept(flowable -> flowable.subscribe(System.out::println))
.join();
}
}
编辑1:
您当然可以将一些代码移回Kotlin一侧:
fun <T> Flow<T>.fooRx2() : Flowable<Int> {
val self = this
val subject = SingleSubject.create<Flowable<Int>>()
GlobalScope.launch {
try {
subject.onSuccess(self.foo().asFlowable())
} catch (ex: Throwable) {
subject.onError(ex)
}
}
return subject.flatMapPublisher { it }
}
然后
public class UseFoo {
public static void main(String[] args) throws Exception {
SomeSuspendAPIKt.fooRx2(SomeSuspendAPIKt.just(1))
.blockingSubscribe(System.out::println);
}
}
编辑2:
您可以通过在Kotlin端使用转换来对此进行概括,该转换使您可以连续传递对象:
fun <T, R: Any> Flow<T>.transformAsync(fn: suspend (t: Flow<T>) -> Flow<R>) : Flowable<R> {
val self = this
val subject = SingleSubject.create<Flowable<R>>()
GlobalScope.launch {
try {
val r = fn(self).asFlowable();
subject.onSuccess(r)
} catch (ex: Throwable) {
subject.onError(ex)
}
}
return subject.flatMapPublisher { it }
}
public class UseFoo {
public static void main(String[] args) throws Exception {
SomeSuspendAPIKt.transformAsync(
SomeSuspendAPIKt.just(1),
(source, cont) -> SomeSuspendAPIKt.foo(source, cont)
)
.blockingSubscribe(System.out::println);
}
}