我正在使用akka timer
我正在使用twitter streams,并且我试图在5秒内获得tweets的数量这是我的代码
case class PerSecond(tweet:Tweet)
case class TweetPerSecondCount(tweet:Tweet)
class TweetPerSecondActor extends Actor with Timers{
var counter=0
def receive: PartialFunction[Any, Unit] = {
case PerSecond(tweet) =>
log.info("Actor TweetPerSecondActor recevied the message PerSecond")
timers.startPeriodicTimer("perSecond", TweetPerSecondCount(tweet), 5.second)
case TweetPerSecondCount(tweet) =>
log.info("Actor TweetPerSecondActor recevied the message TweetPerSecondCount")
log.info("got the tweet {}",getCounter+"in 5 seconds")
case message =>
log.warn("Actor TweetPerSecondActor: Unhandled message received : {}", message)
unhandled(message)
}
}
在游戏框架的控制器Action中,我从twitter流中获取tweets对象(连续的流而不会弯腰)
class Mycontroller extends Controller {
val tweetPerSecondActor = system.actorOf......//create actor
def tweetAveragePerSecond = Action {
log.debug("in the action tweetAveragePerSecond")
def getTweet: PartialFunction[StreamingMessage, Unit] = {
case tweet: Tweet =>
val future = ask(tweetPerSecondActor, PerSecond(tweet))
}
val streaming: Future[TwitterStream] = handleTwitterStreamClient.getStreamingCLient.sampleStatuses(stall_warnings = true)(getTweet)
Ok("tweet average per second")
}
}
当我点击日志时显示的路线
TweetPerSecondActor INFO - Actor TweetPerSecondActor recevied the message PerSecond
16:42:44.335 28939 [ArteciateActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Cancel timer [perSecond] with generation [758]
16:42:44.335 28939 [ArteciateActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Start timer [perSecond] with generation [759]
6:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-6] TweetPerSecondActor INFO - Actor TweetPerSecondActor recevied the message PerSecond
16:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Cancel timer [perSecond] with generation [758]
16:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Start timer [perSecond] with generation [759]
并且如果我传递虚拟字符串值而不是运行流并传递twitter对象,则计时器可以正常工作像下面案例类PerSecond(tweet:String)案例类TweetPerSecondCount(tweet:String)
class TweetPerSecondActor extends Actor with Timers{
var counter=0
def receive: PartialFunction[Any, Unit] = {
case PerSecond(tweet) =>
log.info("Actor TweetPerSecondActor recevied the message PerSecond")
timers.startPeriodicTimer("perSecond", TweetPerSecondCount(tweet), 5.second)
case TweetPerSecondCount(tweet) =>
log.info("Actor TweetPerSecondActor recevied the message TweetPerSecondCount")
log.info("got the tweet {}",getCounter+"in 5 seconds")
case message =>
log.warn("Actor TweetPerSecondActor: Unhandled message received : {}", message)
unhandled(message)
}
}
class Mycontroller extends Controller {
val tweetPerSecondActor = system.actorOf......//create actor
def tweetAveragePerSecond = Action {
log.debug("in the action tweetAveragePerSecond")
val future = ask(tweetPerSecondActor, PerSecond("dummy value"))
Ok("tweet average per second")
}
}
Akka中的计时器是带键的(在您的情况下,键为"perSecond"
,并且API可以保证]
每个计时器都有一个键,如果启动了一个具有相同键的新计时器,则先前的键将被取消
因此,每次调用getTweet
函数(假定它是为了效果而不是为了值,我更喜欢“过程”,但这也许是特质的),计时器将被取消。
取决于您要完成的解决方案,可能包括: