我们有一个Scala应用程序,该程序可从文本文件中读取行并使用Akka Stream处理它们。为了获得更好的性能,我们将并行度设置为5。问题是,如果多行包含同一封电子邮件,我们仅保留其中一行,将其他行视为重复并抛出错误。我尝试使用Java parallelHashMap来检测重复,但是它不起作用,这是我的代码:
allIdentifiers = new ConcurrentHashMap[String, Int]()
Source(rows)
.mapAsync(config.parallelism.value) {
case (dataRow, index) => {
val eventResendResult: EitherT[Future, NonEmptyList[ResendError], ResendResult] =
for {
cleanedRow <- EitherT.cond[Future](
!allIdentifiers.containsKey(dataRow.lift(emailIndex)), {
allIdentifiers.put(dataRow.lift(emailIndex),index)
dataRow
}, {
NonEmptyList.of(
DuplicatedError(
s"Duplicated record at row $index",
List(identifier)
)
)
}
)
_ = logger.debug(
LoggingMessage(
requestId = RequestId(),
message = s"allIdentifiers: $allIdentifiers"
)
)
... more process step ...
} yield foldResponses(sent)
eventResendResult
.leftMap(errors => ResendResult(errors.toList, List.empty))
.merge
}
}
.runWith(Sink.reduce { (result1: ResendResult, result2: ResendResult) =>
ResendResult(
result1.errors ++ result2.errors,
result1.results ++ result2.results
)
})
我们将config.parallelism.value设置为5,这意味着它可以同时最多处理5行。我观察到的是,如果彼此相邻有重复的行,则行不通,例如:
line 0 contains email1
line 1 contains email1
line 2 contains email2
line 3 contains email2
line 4 contains email3
从日志中,我看到并发HashMap充满了条目,但是所有行都通过了重复检测,并移至下一个处理步骤。所以Akka Stream的并行性与java的多线程不是同一回事吗?在这种情况下,如何检测重复的行?
问题在以下代码段中:
cleanedRow <- EitherT.cond[Future](
!allIdentifiers.containsKey(dataRow.lift(emailIndex)), {
allIdentifiers.put(dataRow.lift(emailIndex),index)
dataRow
}, {
NonEmptyList.of(
DuplicatedError(
s"Duplicated record at row $index",
List(identifier)
)
)
}
)
特别是:想象两个线程同时处理应删除重复数据的电子邮件。有可能发生以下情况(按顺序)
containsKey
,发现电子邮件不在地图中containsKey
,发现电子邮件不在地图中换句话说:您需要自动检查地图中的键并更新它。这是很常见的事情,因此,正是ConcurrentHashMap
的put
所做的:它更新键上的值并返回它替换的前一个值(如果有的话)。
我对Cats中的组合器不太熟悉,因此以下内容可能不是惯用的。但是,请注意它是如何在一个原子步骤中插入并检查先前值的。
cleanedRow <- EitherT(Future.successful {
val previous = allIdentifiers.put(dataRow.lift(emailIndex), index)
Either.cond(
previous != null,
dataRow,
NonEmptyList.of(
DuplicatedError(
s"Duplicated record at row $index",
List(identifier)
)
)
)
})