创建多播 Monix Observable,当新订阅者订阅时可以重新启动

问题描述 投票:0回答:1

我正在尝试配置 Monix

Observable
的行为以满足一组特定的要求。

基本上我正在努力做到这一点:

  • 可观察量是多播的(就像热可观察量)
  • 当所有订阅者取消订阅时,可观察对象将停止运行(如
    .refCount
  • 但是,如果有新订阅者订阅,则
    Observable
    会再次重新启动(就像冷可观察的)

这是我的代码:

import monix.reactive.Observable
import scala.concurrent.duration._
import monix.execution.Scheduler.Implicits.global

object Main {
  def main(args: Array[String]): Unit = {
    val observable = Observable.interval(1.second).map{i => println("Obs running"); i}.share

    val subscription1 = observable.foreach(n => println(s"sub1: $n"))

    Thread.sleep(2000)

    val subscription2 = observable.foreach(n => println(s"sub2: $n"))

    Thread.sleep(5000)

    subscription1.cancel()
    println("Cancel 1")

    Thread.sleep(3000)

    subscription2.cancel()
    println("Cancel 2")
    // All subscribers unsubscribed, observable stops emitting values and is disposed of

    println("Sleep")
    Thread.sleep(5000)
    // After some time, a new subscriber comes along
    val subscription3 = observable.foreach(n => println(s"sub3: $n"))
    println("Subscribed to 3")
    Thread.sleep(7000)
  }
}

SBT:

scalaVersion := "3.4.0"
libraryDependencies += "io.monix" %% "monix-reactive" % "3.4.1"

这是我的脚本的输出:

Obs running
sub1: 0
Obs running
sub1: 1
Obs running
sub1: 2
Obs running
sub1: 3
sub2: 3
Obs running
sub1: 4
sub2: 4
Obs running
sub1: 5
sub2: 5
Obs running
sub1: 6
sub2: 6
Obs running
sub1: 7
sub2: 7
Cancel 1
Obs running
sub2: 8
Obs running
sub2: 9
Cancel 2
Sleep
Subscribed to 3

通过使用

.share
,我能够使可观察量进行多播,并在 sub1 和 sub2 取消订阅后停止触发(因为
.share
在幕后使用
.refCount
),但是当 sub3 订阅时,它永远不会收到任何项目。

另请注意,这是一个简化的示例。在我的实际用例中,

Observable
不仅仅是一个整数列表,而是在外部系统上运行查询并返回结果,但问题本质上是相同的。

非常感谢对此的任何帮助。谢谢!

scala observable reactive-programming scala-3 monix
1个回答
0
投票

在与 Claude Opus 进行激烈的即兴演奏之后,我们想出了一个解决方案:

import monix.execution.atomic.{Atomic, AtomicInt}
import monix.execution.{Ack, Cancelable, Scheduler}
import monix.reactive.Observable
import monix.reactive.observers.Subscriber
import monix.reactive.subjects.PublishSubject

import scala.concurrent.Future

class RestartableRefCountObservable[A] private (source: Observable[A]) extends Observable[A] {
  private var sourceConnection: Cancelable = null
  private val subject: PublishSubject[A] = PublishSubject()
  private val connectionCount = Atomic(0)

  override def unsafeSubscribeFn(subscriber: Subscriber[A]): Cancelable = {

    def startConnection(): Unit = {
      sourceConnection = source.subscribe(new Subscriber[A] {
        override def onNext(elem: A): Future[Ack] = subject.onNext(elem)

        override def onError(ex: Throwable): Unit = {
          sourceConnection.cancel()
          subject.onError(ex)
        }

        override def onComplete(): Unit = {
          sourceConnection.cancel()
          subject.onComplete()
        }

        override implicit def scheduler: Scheduler = subscriber.scheduler
      })
    }

    if (connectionCount.incrementAndGet() == 1) startConnection()

    val subjectSubscription = subject.unsafeSubscribeFn(subscriber)

    Cancelable { () =>
      subjectSubscription.cancel()
      if (connectionCount.decrementAndGet() == 0) sourceConnection.cancel()
    }
  }
}

object RestartableRefCountObservable {
  def apply[A](source: Observable[A]): RestartableRefCountObservable[A] =
    new RestartableRefCountObservable(source)
}

解决方案是实现一个扩展

Observable
的新类,并在内部跟踪其订阅者的数量。第一次连接时,它将连接到源 Observable,在所有订阅者断开连接后,它将关闭与源的连接。
monix.reactive.observables.RefCountObservable
与此之间的关键区别在于,如果在所有先前订阅者取消订阅后另一个订阅者订阅,则这将重新启动。

这是我之前用来测试的脚本的更新版本:

import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable

import scala.concurrent.duration.*

object Main {
  def main(args: Array[String]): Unit = {
    val source = Observable.interval(1.second).map(i => {
      println(s"Obs running: $i")
      i
    })

    val restartable = RestartableRefCountObservable(source)

    println("Sub1")
    val sub1 = restartable.foreach(i => println(s"Sub1: $i"))
    Thread.sleep(3000)
    println("Sub2")
    val sub2 = restartable.foreach(i => println(s"Sub2: $i"))

    Thread.sleep(5000)

    sub1.cancel()
    println("Sub1 canceled")

    Thread.sleep(3000)
    sub2.cancel()
    println("Sub2 canceled")
    Thread.sleep(10000)

    println("Sub3")
    val sub3 = restartable.foreach(i => println(s"Sub3: $i"))

    Thread.sleep(15000)
  }
}

这是新的输出:

Sub1
Obs running: 0
Sub1: 0
Obs running: 1
Sub1: 1
Obs running: 2
Sub1: 2
Obs running: 3
Sub1: 3
Sub2
Sub2: 3
Obs running: 4
Sub1: 4
Sub2: 4
Obs running: 5
Sub1: 5
Sub2: 5
Obs running: 6
Sub1: 6
Sub2: 6
Obs running: 7
Sub1: 7
Sub2: 7
Obs running: 8
Sub1: 8
Sub2: 8
Sub1 canceled
Obs running: 9
Sub2: 9
Obs running: 10
Sub2: 10
Obs running: 11
Sub2: 11
Sub2 canceled
Sub3
Obs running: 0
Sub3: 11
Sub3: 0
Obs running: 1
Sub3: 1
Obs running: 2
Sub3: 2
Obs running: 3
Sub3: 3
Obs running: 4
Sub3: 4
Obs running: 5
Sub3: 5
Obs running: 6
Sub3: 6
Obs running: 7
Sub3: 7
Obs running: 8
Sub3: 8
Obs running: 9
Sub3: 9
Obs running: 10
Sub3: 10
Obs running: 11
Sub3: 11
Obs running: 12
Sub3: 12
Obs running: 13
Sub3: 13
Obs running: 14
Sub3: 14
© www.soinside.com 2019 - 2024. All rights reserved.