如何一次仅在一个线程中同步一堆效果

问题描述 投票:0回答:1

我有一个小问题。

      for
        map <- kafkaEventHoldLine.get // Ref[F, List[String]]
        key = dr.derived + dr.metricId.toString
        _ <- if !map.contains(key) then
          for
            _ <- addToHoldLine(key)
            process <- outlierProcessor.process(dr).timed
            (timeProcess, ol) = process
            _ <- info("Outlier.process", timeProcess.toMillis.named("duration"), dr.derived.named("sid"), dr.metricId.toString.named("mid"))
            append <- ol.traverse_(odt.append).timed
            (timeAppend, _) = append
            _ <- info("Outlier.append", timeAppend.toMillis.named("duration"), dr.derived.named("sid"), dr.metricId.toString.named("mid"))
            _ <- counters.processOutlier.observeTime(timeProcess + timeAppend)
            _ <- clearHoldLine(key)
          yield ()
        else
          Async[F].sleep(delayHoldLine.milliseconds) >> outlierProcess(dr)
      yield ()

这堆代码是并发运行的。但我想要

      for
        map <- kafkaEventHoldLine.get // Ref[F, List[String]]
        key = dr.derived + dr.metricId.toString
        _ <- if !map.contains(key) then
          for
            _ <- addToHoldLine(key)

线程之间同步。

所以我基本上害怕从另一个线程中的引用中获取过时的数据,并且我想让所有线程等待,直到工作线程完成

addToHoldLine
函数,因为在执行它之后,我改变了引用的状态,使得映射.contains(key) 返回
true

multithreading scala functional-programming scala-cats cats-effect
1个回答
0
投票

从你的问题中不清楚这是否真的需要全部在一个Thread上,或者代码是否足以确保一次只运行1个。

限制并发工作的工具是

Semaphore
。在工厂中为您的类构建一个包含您想要限制的逻辑的工厂,然后在您想要限制的特定工作周围使用
permit

object MyLogic:
  def create: IO[MyLogic] = Semaphore[IO](1).map(new MyLogic(_))

class MyLogic(sem: Semaphore[IO]):
  def doWork =
    sem.permit.surround {
      for
        map <- kafkaEventHoldLine.get
        // ...
    }

如果您需要保证所有工作都发生在同一个线程上,那么构建一个单线程

ExecutionContext
并使用
evalOn
将工作转移到该EC上

object MyLogic:
  def create: Resource[IO, MyLogic] =
    Resource
      .make(IO(Executors.newSingleThreadExecutor))(es => IO(es.shutdown()))
      .map(ExecutionContext.fromExecutorService)
      .map(new MyLogic(_))

class MyLogic(singleThread: ExecutionContext):
  def doWork = {
    for
        map <- kafkaEventHoldLine.get
        //
    yield etc
  }.evalOn(singleThread)
© www.soinside.com 2019 - 2024. All rights reserved.