有关RxJava和PublishSubject的初学者问题

问题描述 投票:0回答:1

我对PublishSubject中的RxJava有疑问。我创建了一个虚拟的PublishSubject,它发出一些对象。这是我的代码:

override fun generate(exportRequest: ExportRequest): Observable<Report> {
        val faker = Faker()
        val dummyPublisher = PublishSubject.create<Report>()
        for(x in 1..1_000){
            val dataToExport = DataToExport(UUID.randomUUID(), faker.company().buzzword(), faker.company().name())
            val report = Report(dataToExport)
            sddPublisher.onNext(report)
            Thread.sleep(1)
        }
        dummyPublisher.onComplete()
        return dummyPublisher
    }

订阅时,不会发射任何物体。例如,什么都没有打印:

... // somewhere in the code
reportStrategy.generate(exportRequest).subscribe { report: Report? ->
     println(report)
 }

也许我缺少一些东西。任何帮助将不胜感激

java kotlin observable reactive-programming rx-java2
1个回答
0
投票

如注释中@akarnokd所述,您创建的PublishSubject立即发出通过其onNext方法传递给它的任何值。无论当前是否有任何订阅,都会发生这种情况。它的主要目的是帮助弥合命令式或基于回调的代码与反应式代码之间的鸿沟。

您似乎想要的是Observable,一旦有人订阅它,它便开始执行一些同步代码。 Observable.create是创建此类实例的一种方法,但是正确使用它可能很麻烦。

创建所需内容的一种更方便的方法是Observable.fromPublisher。它以Publisher作为参数。 Publisher本身就是一个函数,只要Subscriber订阅由Observer创建的Observable并允许您直接向该fromPublisher发送事件,就将其传递给Observer实例。

您想要的代码如下所示:

fun generateReportStream(genFakeReport: () -> Report): Observable<Report> {
    return Observable.fromPublisher { subscriber ->
        for (x in 1..1_000) {
            val fakeReport = genFakeReport()
            subscriber.onNext(fakeReport)
            Thread.sleep(1)
        }
        subscriber.onComplete()
    }
}

fun main() {
    /** supply whatever logic you want to generate a fake [Report] */
    fun genFakeReport(): Report = TODO()
    val subscription = generateReportStream(::genFakeReport).subscribe(::println)
}

一旦订阅了Observable返回的generateReportStream实例,它将正确发出值。另外,可以对同一实例进行更多预订,并且每个预订将使用相同的逻辑发出新的序列值。

© www.soinside.com 2019 - 2024. All rights reserved.