kotlin,得到了“类型不匹配。必填:一次性?找到:单元“在subscribe中使用observer对象实例时()

问题描述 投票:0回答:1

编辑:

根据Dmitry Ikryanov的建议,使用DisposableObserver会编译,但它会导致崩溃

io.reactivex.exceptions.ProtocolViolationException: It is not allowed to 
subscribe with a(n) com.DataManager$theObserver$1 multiple times. Please 
create a fresh instance of com.DataManager$theObserver$1 and subscribe that 
to the target source instead.

subecribWith()的唯一代码,只被调用一次

fun initSession() {
    if (mDisposable != null && mDisposable!!.isDisposed) {
        mDisposable!!.dispose()
    }

    mDisposable = RxBus.listen(DataEvent::class.java).subscribeWith(theObserver)  <=== crash at here
}

DisposableObserver是类的成员变量:

var theObserver: DisposableObserver<DataEvent> = object : DisposableObserver<DataEvent>() {
    override fun onComplete() {
        Log.e(TAG, "onComplete: All Done!")        }

    override fun onNext(t: DataEvent) {
        Log.e(TAG, "Next: " + t)
        onDataReady(t)        }

    override fun onError(e: Throwable) {
        Log.e(TAG, "onError: ")
    }
}

===

原始问题:

试图在kotlin中使用RxJava subscribe(),得到错误“Type mismatch. Required: Disposable? Found: Unit”,不确定它意味着什么,谁都知道?

class DataEvent {}

使用RxBus

object RxBus {

private val publisher = PublishSubject.create<Any>()

fun publish(event: Any) {
    publisher.onNext(event)
}

// Listen should return an Observable and not the publisher
// Using ofType we filter only events that match that class type
fun <T> listen(eventType: Class<T>): Observable<T> = publisher.ofType(eventType)

}

当这样打电话时,没关系:

mDisposable = RxBus.listen(DataEvent::class.java).subscribe({
        onDataReady(it)
    })

但是当用定义的RxBus.listen(DataEvent::class.java).subscribe(observer)实例调用observer时,它会显示红色下划线:“类型不匹配。必填:一次性?发现:单位“

mDisposable = RxBus.listen(DataEvent::class.java).subscribe(observer)

观察员是:

var observer: Observer<DataEvent> = object : Observer<DataEvent> {
    override fun onSubscribe(d: Disposable) {
        Log.e(TAG, "onSubscribe: ")
    }

    override fun onNext(@NonNull t: DataEvent) {
        Log.e(TAG, "onNext: " + t)
        onDataReady(t)
    }

    override fun onError(e: Throwable) {
        Log.e(TAG, "onError: ")
    }

    override fun onComplete() {
        Log.e(TAG, "onComplete: All Done!")
     }
}
kotlin rx-java2 subscribe
1个回答
2
投票

这是因为在RxJava 2.0方法中subscribe(observer)被改变并且什么也没有返回。

与版本1.x的Observable不同,subscribe(Observer)不允许外部取消订阅,并且Observer实例应该公开此类功能。

你可以使用subscribeWith(observer)。 例:

val disposable = Observable.just("Hello world!")
                .delay(1, TimeUnit.SECONDS)
                .subscribeWith(object : DisposableObserver<String>() {
                    public override fun onStart() {
                        println("Start!")
                    }

                    fun onNext(t: Int?) {
                        println(t)
                    }

                    override fun onError(t: Throwable) {
                        t.printStackTrace()
                    }

                    override fun onComplete() {
                        println("Done!")
                    }
                })
© www.soinside.com 2019 - 2024. All rights reserved.