如何处理组中的groupby子流

问题描述 投票:0回答:1

我一直在玩akka-streams,似乎遇到了一个问题,我找不到一个干净的方法来处理这个问题。

我有来自1 ... *球员的事件,他们在棋盘上有相应的位置。我想检查是否有任何给定时间的球员发生碰撞。为此,我需要在一个动作中处理所有当前连接的玩家的一组事件。我只能想出这样的东西,它可以适用于2个玩家,并且在事件顺序流动的情况下,很可能不需要groupby。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.testkit.TestKit
import org.scalatest.{MustMatchers, WordSpecLike}

import scala.concurrent.Await
import scala.concurrent.duration._

case class PlayerPosition(x: Int, y: Int)
case class PlayerState(playerName: String, positions: List[PlayerPosition])

class GameLogicSpec
  extends TestKit(ActorSystem("test-filter"))
    with WordSpecLike
    with MustMatchers {


  val psa = PlayerState("A", List(PlayerPosition(0, 1)))
  val psb = PlayerState("B", List(PlayerPosition(0, 1)))

  implicit val materializer = ActorMaterializer()

  "game flow logic" must {
    "returns handles collision" in {

      val flow =
        Flow[PlayerState]
          .groupBy(2, _.playerName)
          .mergeSubstreams
          .sliding(2, 1)
          .map(evts =>
            evts.size > 1 && evts.head.positions == evts.last.positions)

      val gameLogicGraph = Source(List(psa, psb))
        .via(flow)
        .toMat(Sink.seq[Boolean])(Keep.right)

      Await
        .result(gameLogicGraph.run(), 10 seconds) must be(
        List(true)
      )
    }
  }
}

理想情况下,我希望滑动窗口根据当前连接的玩家数量,在不同的组中均匀地发布每个玩家的事件。当然问题是,如果一个玩家比其他玩家更快地生成事件会需要某种限制,但为了教育,我认为可以假设他们以相同的速度发布。

scala akka akka-stream
1个回答
0
投票

假设有n玩家,其中n等于或大于2(游戏只有一个玩家没有意义),并且在每一轮中,player-1player-n都会在下一轮开始之前做出各自的动作,然后使用groupBy是不必要的。换句话说,如果有三个玩家,例如,并且所有三个玩家在他们进行第二次移动之前进行他们的第一次移动,依此类推,那么可以简单地使用sliding方法。该方法的两个参数都等于玩家的数量,每个滑动的“窗口”代表一个回合。球员在每一轮内移动的顺序无关紧要(看起来在你的情景中,player-1player-n按顺序移动,从1n,每轮)。

此外,用于确定窗口中是否存在冲突的功能可以针对两个或更多个玩家进行推广。这是通过获取玩家的位置,将此集合转换为集合,然后检查此集合的大小是否小于玩家的数量来完成的:如果是,则至少有两个玩家持有相同的位置。

val numPlayers = 3

val moves = List(
  PlayerState("A", List(PlayerPosition(0, 1))), // no collision in this window
  PlayerState("B", List(PlayerPosition(1, 1))),
  PlayerState("C", List(PlayerPosition(2, 2))),

  PlayerState("A", List(PlayerPosition(0, 2))), // no collision in this window
  PlayerState("B", List(PlayerPosition(1, 2))),
  PlayerState("C", List(PlayerPosition(2, 1))),

  PlayerState("A", List(PlayerPosition(1, 2))), // collision exists in this window
  PlayerState("B", List(PlayerPosition(1, 2))),
  PlayerState("C", List(PlayerPosition(1, 2))),

  PlayerState("A", List(PlayerPosition(1, 2))), // collision exists in this window
  PlayerState("B", List(PlayerPosition(2, 2))),
  PlayerState("C", List(PlayerPosition(1, 2)))
)

implicit val materializer = ActorMaterializer()

"game flow logic" must {
  "returns handles collision" in {

    val flow =
      Flow[PlayerState]
        .sliding(numPlayers, numPlayers)
        .map(_.map(_.positions).toSet.size < numPlayers)

    val gameLogicGraph =
      Source(moves)
        .via(flow)
        .toMat(Sink.seq[Boolean])(Keep.right)

    Await.result(gameLogicGraph.run(), 10 seconds) must be(
      List(false, false, true, true)
    )
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.