使用 UseCase 将 Rx Observable 转换为 Flow

问题描述 投票:0回答:2

我正在尝试将我的代码转换为 Compose。我有库,但都有 Observable 类型,不能与 Flow 一起使用。我需要 Flow 中的结果。

protected abstract Observable<T> UseCase();



public void execute(Subscriber<T> useCaseSubscriber) {
    mObservable=this.buildUseCaseObservable();
    this.subscription = this.buildUseCaseObservable()
            .subscribeOn(Schedulers.io())
            .doOnError(throwable -> {

            }).doOnNext(t -> {
                ObserverManager.getInstance().remove(useCaseSubscriber);
            }).doOnComplete(() -> {
                ObserverManager.getInstance().remove(useCaseSubscriber);
            })
            .observeOn(postExecutionThread.getScheduler())
            .subscribeWith(useCaseSubscriber);
    if(!(this instanceof EventProcessorUseCase)) {
        ObserverManager.getInstance().cached(useCaseSubscriber);
    }

}

我正在尝试访问上面的方法,例如

public class ForgotPasswordUC extends UseCase<String> {

private String email;

public ForgotPasswordUC(String email,Repository repository, PostExecutionThread postExecutionThread) {
    super(repository, postExecutionThread);
    this.email=email;
}

@Override
protected Observable<String> buildUseCaseObservable() {
    return repository.forgotPassword(email);
}

}

我正在使用 UseCase 调用 API,但在 Compose Access 的情况下无法访问结果。我如何编写上面的用例,它将返回与我们可以收集的流相关的值?

android android-jetpack-compose rx-java
2个回答
0
投票

这是一个原始的方法,但是很有效。

class FlowConvert:Subscriber<String>{
private var _innerFlow:Channel<String> = Channel()
val flow:Flow<String> =_innerFlow.receiveAsFlow()

override fun onSubscribe(s: Subscription?) {

}

override fun onNext(t: String?) {
    _innerFlow.trySend(t.toString())
}

override fun onError(t: Throwable?) {

}

override fun onComplete() {

}}

就个人而言,您的代码看起来很奇怪。为什么不返回 Observable 呢?可以让Rx轻松转换Flow。

例如

fun Observable<String>.convert()= callbackFlow<String> {
val dispose=subscribe {
    trySend("")
}
awaitClose {
    dispose.dispose()
}}

0
投票

如果您希望返回类型为

Flow
而不是
Observable
,请使用带有协程的 Flow 或带有 rx-java 的 Flowable。

public abstract class UseCase<T> {


protected abstract Flowable<T> buildUseCaseFlowable();

public Flowable<T> execute() {
    return buildUseCaseFlowable()
        .subscribeOn(Schedulers.io())
        .doOnError(throwable -> {})
        .doOnComplete(() -> {});
    }
}

实施:

@Override
protected Flowable<String> buildUseCaseFlowable() {
    return repository.forgotPassword(email);
}

用途:

forgotPasswordUC.execute().subscribe(result -> {}, throwable -> {});

您可能需要根据您的班级进行其他更改,但这应该是方法。

© www.soinside.com 2019 - 2024. All rights reserved.