我有一个小问题。
for
map <- kafkaEventHoldLine.get // Ref[F, List[String]]
key = dr.derived + dr.metricId.toString
_ <- if !map.contains(key) then
for
_ <- addToHoldLine(key)
process <- outlierProcessor.process(dr).timed
(timeProcess, ol) = process
_ <- info("Outlier.process", timeProcess.toMillis.named("duration"), dr.derived.named("sid"), dr.metricId.toString.named("mid"))
append <- ol.traverse_(odt.append).timed
(timeAppend, _) = append
_ <- info("Outlier.append", timeAppend.toMillis.named("duration"), dr.derived.named("sid"), dr.metricId.toString.named("mid"))
_ <- counters.processOutlier.observeTime(timeProcess + timeAppend)
_ <- clearHoldLine(key)
yield ()
else
Async[F].sleep(delayHoldLine.milliseconds) >> outlierProcess(dr)
yield ()
这堆代码是并发运行的。但我想要
for
map <- kafkaEventHoldLine.get // Ref[F, List[String]]
key = dr.derived + dr.metricId.toString
_ <- if !map.contains(key) then
for
_ <- addToHoldLine(key)
线程之间同步。
所以我基本上害怕从另一个线程中的引用中获取过时的数据,并且我想让所有线程等待,直到工作线程完成
addToHoldLine
函数,因为在执行它之后,我改变了引用的状态,使得映射.contains(key) 返回 true
从你的问题中不清楚这是否真的需要全部在一个Thread上,或者代码是否足以确保一次只运行1个。
限制并发工作的工具是
Semaphore
。在工厂中为您的类构建一个包含您想要限制的逻辑的工厂,然后在您想要限制的特定工作周围使用 permit
。
object MyLogic:
def create: IO[MyLogic] = Semaphore[IO](1).map(new MyLogic(_))
class MyLogic(sem: Semaphore[IO]):
def doWork =
sem.permit.surround {
for
map <- kafkaEventHoldLine.get
// ...
}
如果您需要保证所有工作都发生在同一个线程上,那么构建一个单线程
ExecutionContext
并使用evalOn
将工作转移到该EC上
object MyLogic:
def create: Resource[IO, MyLogic] =
Resource
.make(IO(Executors.newSingleThreadExecutor))(es => IO(es.shutdown()))
.map(ExecutionContext.fromExecutorService)
.map(new MyLogic(_))
class MyLogic(singleThread: ExecutionContext):
def doWork = {
for
map <- kafkaEventHoldLine.get
//
yield etc
}.evalOn(singleThread)