我正在尝试配置 Monix
Observable
的行为以满足一组特定的要求。
基本上我正在努力做到这一点:
.refCount
)Observable
会再次重新启动(就像冷可观察的)这是我的代码:
import monix.reactive.Observable
import scala.concurrent.duration._
import monix.execution.Scheduler.Implicits.global
object Main {
def main(args: Array[String]): Unit = {
val observable = Observable.interval(1.second).map{i => println("Obs running"); i}.share
val subscription1 = observable.foreach(n => println(s"sub1: $n"))
Thread.sleep(2000)
val subscription2 = observable.foreach(n => println(s"sub2: $n"))
Thread.sleep(5000)
subscription1.cancel()
println("Cancel 1")
Thread.sleep(3000)
subscription2.cancel()
println("Cancel 2")
// All subscribers unsubscribed, observable stops emitting values and is disposed of
println("Sleep")
Thread.sleep(5000)
// After some time, a new subscriber comes along
val subscription3 = observable.foreach(n => println(s"sub3: $n"))
println("Subscribed to 3")
Thread.sleep(7000)
}
}
SBT:
scalaVersion := "3.4.0"
libraryDependencies += "io.monix" %% "monix-reactive" % "3.4.1"
这是我的脚本的输出:
Obs running
sub1: 0
Obs running
sub1: 1
Obs running
sub1: 2
Obs running
sub1: 3
sub2: 3
Obs running
sub1: 4
sub2: 4
Obs running
sub1: 5
sub2: 5
Obs running
sub1: 6
sub2: 6
Obs running
sub1: 7
sub2: 7
Cancel 1
Obs running
sub2: 8
Obs running
sub2: 9
Cancel 2
Sleep
Subscribed to 3
通过使用
.share
,我能够使可观察量进行多播,并在 sub1 和 sub2 取消订阅后停止触发(因为 .share
在幕后使用 .refCount
),但是当 sub3 订阅时,它永远不会收到任何项目。
另请注意,这是一个简化的示例。在我的实际用例中,
Observable
不仅仅是一个整数列表,而是在外部系统上运行查询并返回结果,但问题本质上是相同的。
非常感谢对此的任何帮助。谢谢!
在与 Claude Opus 进行激烈的即兴演奏之后,我们想出了一个解决方案:
import monix.execution.atomic.{Atomic, AtomicInt}
import monix.execution.{Ack, Cancelable, Scheduler}
import monix.reactive.Observable
import monix.reactive.observers.Subscriber
import monix.reactive.subjects.PublishSubject
import scala.concurrent.Future
class RestartableRefCountObservable[A] private (source: Observable[A]) extends Observable[A] {
private var sourceConnection: Cancelable = null
private val subject: PublishSubject[A] = PublishSubject()
private val connectionCount = Atomic(0)
override def unsafeSubscribeFn(subscriber: Subscriber[A]): Cancelable = {
def startConnection(): Unit = {
sourceConnection = source.subscribe(new Subscriber[A] {
override def onNext(elem: A): Future[Ack] = subject.onNext(elem)
override def onError(ex: Throwable): Unit = {
sourceConnection.cancel()
subject.onError(ex)
}
override def onComplete(): Unit = {
sourceConnection.cancel()
subject.onComplete()
}
override implicit def scheduler: Scheduler = subscriber.scheduler
})
}
if (connectionCount.incrementAndGet() == 1) startConnection()
val subjectSubscription = subject.unsafeSubscribeFn(subscriber)
Cancelable { () =>
subjectSubscription.cancel()
if (connectionCount.decrementAndGet() == 0) sourceConnection.cancel()
}
}
}
object RestartableRefCountObservable {
def apply[A](source: Observable[A]): RestartableRefCountObservable[A] =
new RestartableRefCountObservable(source)
}
解决方案是实现一个扩展
Observable
的新类,并在内部跟踪其订阅者的数量。第一次连接时,它将连接到源 Observable,在所有订阅者断开连接后,它将关闭与源的连接。 monix.reactive.observables.RefCountObservable
与此之间的关键区别在于,如果在所有先前订阅者取消订阅后另一个订阅者订阅,则这将重新启动。
这是我之前用来测试的脚本的更新版本:
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import scala.concurrent.duration.*
object Main {
def main(args: Array[String]): Unit = {
val source = Observable.interval(1.second).map(i => {
println(s"Obs running: $i")
i
})
val restartable = RestartableRefCountObservable(source)
println("Sub1")
val sub1 = restartable.foreach(i => println(s"Sub1: $i"))
Thread.sleep(3000)
println("Sub2")
val sub2 = restartable.foreach(i => println(s"Sub2: $i"))
Thread.sleep(5000)
sub1.cancel()
println("Sub1 canceled")
Thread.sleep(3000)
sub2.cancel()
println("Sub2 canceled")
Thread.sleep(10000)
println("Sub3")
val sub3 = restartable.foreach(i => println(s"Sub3: $i"))
Thread.sleep(15000)
}
}
这是新的输出:
Sub1
Obs running: 0
Sub1: 0
Obs running: 1
Sub1: 1
Obs running: 2
Sub1: 2
Obs running: 3
Sub1: 3
Sub2
Sub2: 3
Obs running: 4
Sub1: 4
Sub2: 4
Obs running: 5
Sub1: 5
Sub2: 5
Obs running: 6
Sub1: 6
Sub2: 6
Obs running: 7
Sub1: 7
Sub2: 7
Obs running: 8
Sub1: 8
Sub2: 8
Sub1 canceled
Obs running: 9
Sub2: 9
Obs running: 10
Sub2: 10
Obs running: 11
Sub2: 11
Sub2 canceled
Sub3
Obs running: 0
Sub3: 11
Sub3: 0
Obs running: 1
Sub3: 1
Obs running: 2
Sub3: 2
Obs running: 3
Sub3: 3
Obs running: 4
Sub3: 4
Obs running: 5
Sub3: 5
Obs running: 6
Sub3: 6
Obs running: 7
Sub3: 7
Obs running: 8
Sub3: 8
Obs running: 9
Sub3: 9
Obs running: 10
Sub3: 10
Obs running: 11
Sub3: 11
Obs running: 12
Sub3: 12
Obs running: 13
Sub3: 13
Obs running: 14
Sub3: 14