如何使用Scala Akka流检测重复的行

问题描述 投票:1回答:1

我们有一个Scala应用程序,该程序可从文本文件中读取行并使用Akka Stream处理它们。为了获得更好的性能,我们将并行度设置为5。问题是,如果多行包含同一封电子邮件,我们仅保留其中一行,将其他行视为重复并抛出错误。我尝试使用Java parallelHashMap来检测重复,但是它不起作用,这是我的代码:

allIdentifiers = new ConcurrentHashMap[String, Int]()   
Source(rows)
  .mapAsync(config.parallelism.value) {
    case (dataRow, index) => {

      val eventResendResult: EitherT[Future, NonEmptyList[ResendError], ResendResult] =
        for {

          cleanedRow <- EitherT.cond[Future](
            !allIdentifiers.containsKey(dataRow.lift(emailIndex)), {
              allIdentifiers.put(dataRow.lift(emailIndex),index)
              dataRow
            }, {
              NonEmptyList.of(
                DuplicatedError(
                  s"Duplicated record at row $index",
                  List(identifier)
                )
              )
            }
          )

          _ = logger.debug(
            LoggingMessage(
              requestId = RequestId(),
              message = s"allIdentifiers: $allIdentifiers"
            )
          )

          ... more process step ...
        } yield foldResponses(sent)

      eventResendResult
        .leftMap(errors => ResendResult(errors.toList, List.empty))
        .merge
    }
  }
  .runWith(Sink.reduce { (result1: ResendResult, result2: ResendResult) =>
    ResendResult(
      result1.errors ++ result2.errors,
      result1.results ++ result2.results
    )
  })

我们将config.parallelism.value设置为5,这意味着它可以同时最多处理5行。我观察到的是,如果彼此相邻有重复的行,则行不通,例如:

line 0 contains email1
line 1 contains email1
line 2 contains email2
line 3 contains email2
line 4 contains email3

从日志中,我看到并发HashMap充满了条目,但是所有行都通过了重复检测,并移至下一个处理步骤。所以Akka Stream的并行性与java的多线程不是同一回事吗?在这种情况下,如何检测重复的行?

scala akka akka-stream
1个回答
0
投票

问题在以下代码段中:

cleanedRow <- EitherT.cond[Future](
  !allIdentifiers.containsKey(dataRow.lift(emailIndex)), {
    allIdentifiers.put(dataRow.lift(emailIndex),index)
    dataRow
  }, {
    NonEmptyList.of(
      DuplicatedError(
        s"Duplicated record at row $index",
        List(identifier)
      )
    )
  }
)

特别是:想象两个线程同时处理应删除重复数据的电子邮件。有可能发生以下情况(按顺序)

  1. 第一个线程检查containsKey,发现电子邮件不在地图中
  2. 第二个线程检查containsKey,发现电子邮件不在地图中
  3. 第一个线程将电子邮件添加到地图(基于步骤1的结果),并将电子邮件传递通过
  4. 第二个线程将电子邮件添加到地图(基于步骤3的结果。并将电子邮件传递通过]

换句话说:您需要自动检查地图中的键更新它。这是很常见的事情,因此,正是ConcurrentHashMapput所做的:它更新键上的值并返回它替换的前一个值(如果有的话)。

我对Cats中的组合器不太熟悉,因此以下内容可能不是惯用的。但是,请注意它是如何在一个原子步骤中插入并检查先前值的。

cleanedRow <- EitherT(Future.successful {
  val previous = allIdentifiers.put(dataRow.lift(emailIndex), index)
  Either.cond(
    previous != null,
    dataRow,
    NonEmptyList.of(
      DuplicatedError(
        s"Duplicated record at row $index",
        List(identifier)
      )
    )
  )
})
© www.soinside.com 2019 - 2024. All rights reserved.