akka计时器发出消息,取消计时器[Message],生成为[583]

问题描述 投票:0回答:1

我正在使用akka timer

我正在使用twitter streams,并且我试图在5秒内获得tweets的数量这是我的代码

case class PerSecond(tweet:Tweet)
case class TweetPerSecondCount(tweet:Tweet)

class TweetPerSecondActor extends Actor with Timers{
var counter=0
 def receive: PartialFunction[Any, Unit] = {
    case PerSecond(tweet) =>
      log.info("Actor TweetPerSecondActor recevied the message PerSecond")
      timers.startPeriodicTimer("perSecond", TweetPerSecondCount(tweet), 5.second)

    case TweetPerSecondCount(tweet) =>
      log.info("Actor TweetPerSecondActor recevied the message TweetPerSecondCount")
      log.info("got the tweet {}",getCounter+"in 5 seconds")

    case message =>
      log.warn("Actor TweetPerSecondActor: Unhandled message received : {}", message)
      unhandled(message)
  }
}

在游戏框架的控制器Action中,我从twitter流中获取tweets对象(连续的流而不会弯腰)

    class Mycontroller extends Controller {

    val tweetPerSecondActor = system.actorOf......//create actor


    def tweetAveragePerSecond = Action {
        log.debug("in the action tweetAveragePerSecond")

        def getTweet: PartialFunction[StreamingMessage, Unit] = {
          case tweet: Tweet =>
            val future = ask(tweetPerSecondActor, PerSecond(tweet))
        }
        val streaming: Future[TwitterStream] = handleTwitterStreamClient.getStreamingCLient.sampleStatuses(stall_warnings = true)(getTweet)

        Ok("tweet average per second")
      }
    }

当我点击日志时显示的路线

TweetPerSecondActor INFO - Actor TweetPerSecondActor recevied the message PerSecond
16:42:44.335 28939 [ArteciateActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Cancel timer [perSecond] with generation [758]
16:42:44.335 28939 [ArteciateActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Start timer [perSecond] with generation [759]

    6:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-6] TweetPerSecondActor INFO - Actor TweetPerSecondActor recevied the message PerSecond
    16:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Cancel timer [perSecond] with generation [758]
    16:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Start timer [perSecond] with generation [759]

并且如果我传递虚拟字符串值而不是运行流并传递twitter对象,则计时器可以正常工作像下面案例类PerSecond(tweet:String)案例类TweetPerSecondCount(tweet:String)

class TweetPerSecondActor extends Actor with Timers{
var counter=0
 def receive: PartialFunction[Any, Unit] = {
    case PerSecond(tweet) =>
      log.info("Actor TweetPerSecondActor recevied the message PerSecond")
      timers.startPeriodicTimer("perSecond", TweetPerSecondCount(tweet), 5.second)

    case TweetPerSecondCount(tweet) =>
      log.info("Actor TweetPerSecondActor recevied the message TweetPerSecondCount")
      log.info("got the tweet {}",getCounter+"in 5 seconds")

    case message =>
      log.warn("Actor TweetPerSecondActor: Unhandled message received : {}", message)
      unhandled(message)
  }
}

    class Mycontroller extends Controller {

    val tweetPerSecondActor = system.actorOf......//create actor


    def tweetAveragePerSecond = Action {
        log.debug("in the action tweetAveragePerSecond")

            val future = ask(tweetPerSecondActor, PerSecond("dummy value"))


        Ok("tweet average per second")
      }
    }
scala timer akka twitter4j twitter4s
1个回答
0
投票

Akka中的计时器是带键的(在您的情况下,键为"perSecond",并且API可以保证]

每个计时器都有一个键,如果启动了一个具有相同键的新计时器,则先前的键将被取消

因此,每次调用getTweet函数(假定它是为了效果而不是为了值,我更喜欢“过程”,但这也许是特质的),计时器将被取消。

取决于您要完成的解决方案,可能包括:

  • 每个请求的演员。您需要让请求返回可以传递给将来的请求的东西,以便从参与者获取信息。演员生命周期管理也可能是理想的。
  • 为整个控制器保留一个演员,但每个请求使用唯一的计时器键。如果这些是定期计时器,则必须跟踪哪些计时器键处于活动状态,并取消不再使用的按键(任意运行多个定期计时器可能最终会降低性能)
© www.soinside.com 2019 - 2024. All rights reserved.