在我们的 Java Spring 应用程序中,我们使用 Scala Akka 消息传递库来发送和接收消息。
在消息中,我们传递要处理的对象。
向接收者发送对象的情况经常每 5 分钟发生一次。我们有一个 cron 作业来获取要处理的对象列表。
然后我们将这些对象发送给接收者参与者。一段时间后,我们看到接收者停止处理发送消息。
当我们重新启动应用程序时,只有它会选择最后发送的消息并开始处理。
日志中没有任何内容。
您能否告诉我们为什么它在中间停止?
akka-版本 - 2.3.9
akka-actor_2.11-2.3.9.jar
akka-testkit_2.11-2.3.9.jar
我们在发送消息时使用tell()方法
this.actorRef.tell(new Work(消息), null);
我们使用ask()方法来获取响应
this.ref.tell(new Work(消息), null);
我想知道为什么它在中间停止了。我需要修改什么才能处理大量消息吗?
public class WorkGroupClient <T>
{
private final static Log log = LogFactory.getLog(WorkGroupClient.class);
private final ActorRef ref;
public WorkGroupClient(ActorRef ref)
{
this.ref = ref;
}
public void send(T message)
{
if (message!=null) {
log.info("STEP: Send Akka message for class : "+message.getClass());
}
this.ref.tell(new Work(message), null);
}
public void sendOnCommit(final T message)
{
if (message!=null) {
log.info("STEP: Send Akka message : "+message.getClass());
}
TxUtil.runOnCommit(new Runnable()
{
@Override
public void run()
{
send(message);
}
});
}
public Object ask(T message, long timeout, TimeUnit unit)
{
if (message!=null) {
log.info("STEP: ask message for class : "+message.getClass() ");
}
return AkkaUtil.ask(this.ref, new Work(message), timeout, unit);
}
}
@Bean
public ActorSystem actorSystem()
{
ConfigBuilder builder = new ConfigBuilder();
Map<String,String> external = Maps.newHashMap();
Iterator<String> keys = config.getKeys();
builder.withValues(external);
if( config.getBoolean(DEBUG_ALL, false) )
{
builder.withDebugAll();
}
Config config = builder.build();
ActorSystem system = ActorSystem.create("myActorSystem", config);
return system;
}
public abstract class WorkGroupWorker <T> extends UntypedActor
{
private final static Log log = LogFactory.getLog(WorkGroupWorker.class);
private final FailureCounter COUNTER = new FailureCounter();
private volatile T work;
private final Class<T> type;
private final String metric;
private volatile boolean complete = false;
@Override
public final void onReceive(Object msg) throws Exception
{
log.info("STEP: onReceive method BEGIN");
Scope scope = Profiler.start(this.metric);
try
{
if( msg instanceof Work )
{
log.info("STEP: msg is instance of Work");
Work work = (Work) msg;
this.work = this.type.cast(work.getTarget());
log.info("STEP: work = "+work);
workAssigned();
}
else if ( work != null )
{
log.info("STEP: Executing onReceive elseIf for msg class = "+msg.getClass()+", msg = "+msg);
onMessage(msg);
}
else
{
log.info("STEP: Message is null");
unhandled(msg);
}
}
catch (Exception e)
{
log.error("error in processing " + this.metric + " : " +msg, e);
COUNTER.count(this.metric, e);
throw e;
}
// treat as non-fatal, normally akka will shutdown with any thread error
catch ( Error e)
{
log.error("critical error in processing " + this.metric + " : " +msg, e);
throw new CriticalWorkerFailure("critical error in processing " + this.metric + " : " +msg, e);
}
finally
{
Profiler.stop(scope);
}
log.info("STEP: onReceive method End");
}
protected void onMessage(Object msg)
{
log.info("STEP: onMessage method BEGIN");
unhandled(msg);
log.info("STEP: onMessage method END");
}
protected abstract void workAssigned() throws Exception;
protected void workComplete()
{
log.info("STEP: workComplete method BEGIN");
this.work = null;
getContext().stop(getSelf());
this.complete = true;
log.info("STEP: workComplete method END");
}
public boolean isComplete()
{
return complete;
}
protected T getWork()
{
return work;
}
}
我们使用ask()方法来获取响应
ask
方法发送消息然后等待响应。因此,如果您拨打 send
那么 ask
您将发送两条消息,并且只等待其中一条消息。
参与者会覆盖每条引入竞争条件的消息上的
work
字段,这一事实加剧了这种情况。
看起来您需要删除
send
方法并仅使用 ask
这将确保参与者一次只处理一条消息。