Java Akka Actor 当消息数较多时停止接收消息

问题描述 投票:0回答:1

在我们的 Java Spring 应用程序中,我们使用 Scala Akka 消息传递库来发送和接收消息。
在消息中,我们传递要处理的对象。

向接收者发送对象的情况经常每 5 分钟发生一次。我们有一个 cron 作业来获取要处理的对象列表。
然后我们将这些对象发送给接收者参与者。一段时间后,我们看到接收者停止处理发送消息。

当我们重新启动应用程序时,只有它会选择最后发送的消息并开始处理。

日志中没有任何内容。

您能否告诉我们为什么它在中间停止?

akka-版本 - 2.3.9
akka-actor_2.11-2.3.9.jar
akka-testkit_2.11-2.3.9.jar

我们在发送消息时使用tell()方法
this.actorRef.tell(new Work(消息), null);

我们使用ask()方法来获取响应
this.ref.tell(new Work(消息), null);

我想知道为什么它在中间停止了。我需要修改什么才能处理大量消息吗?

public class WorkGroupClient <T>
{
    private final static Log log = LogFactory.getLog(WorkGroupClient.class);
    private final ActorRef ref;
    
    public WorkGroupClient(ActorRef ref)
    {
        this.ref = ref;
    }

    public void send(T message)
    {
        if (message!=null) {
            log.info("STEP: Send Akka message for class : "+message.getClass());
        }
        this.ref.tell(new Work(message), null);
    }
    
    public void sendOnCommit(final T message)
    {
        if (message!=null) {
            log.info("STEP: Send Akka message : "+message.getClass());
        }
        TxUtil.runOnCommit(new Runnable()
        {
            @Override
            public void run()
            {
                send(message);
            }
        });
    }
    
    public Object ask(T message, long timeout, TimeUnit unit)
    {
        if (message!=null) {
            log.info("STEP: ask message for class : "+message.getClass() ");
        }
        return AkkaUtil.ask(this.ref, new Work(message), timeout, unit);
    }
}


@Bean
    public ActorSystem actorSystem()
    {
        ConfigBuilder builder = new ConfigBuilder();
        Map<String,String> external = Maps.newHashMap();
        Iterator<String> keys = config.getKeys();
        builder.withValues(external);
        if( config.getBoolean(DEBUG_ALL, false) )
        {
            builder.withDebugAll();
        }
        Config config = builder.build();
        ActorSystem system = ActorSystem.create("myActorSystem", config);
        return system;
    }
    

public abstract class WorkGroupWorker <T> extends UntypedActor
{
    private final static Log log = LogFactory.getLog(WorkGroupWorker.class);

    private final FailureCounter COUNTER = new FailureCounter();
    private volatile T work;
    private final Class<T> type;
    private final String metric;
    
    private volatile boolean complete = false;
    
      @Override
    public final void onReceive(Object msg) throws Exception
    {
        log.info("STEP: onReceive method BEGIN");
        Scope scope = Profiler.start(this.metric);
        try
        {
            if(  msg instanceof Work )
            {
                log.info("STEP: msg is instance of Work");
                Work work = (Work) msg;
                this.work = this.type.cast(work.getTarget());
                log.info("STEP: work = "+work);
                workAssigned();
            } 
            else if ( work != null )
            {
                log.info("STEP: Executing onReceive elseIf for msg class = "+msg.getClass()+", msg = "+msg);
                onMessage(msg); 
            }
            else
            {
                log.info("STEP: Message is null");
                unhandled(msg);
            }
        }
        catch (Exception e)
        {
            log.error("error in processing " + this.metric + " : "  +msg, e);
            COUNTER.count(this.metric, e);
            throw e;
        }
        // treat as non-fatal, normally akka will shutdown with any thread error
        catch ( Error e)
        {
            log.error("critical error in processing " + this.metric + " : "  +msg, e);
            throw new CriticalWorkerFailure("critical error in processing " + this.metric + " : "  +msg, e);
        }
        finally
        {
           Profiler.stop(scope); 
        }
        log.info("STEP: onReceive method    End");
    }
    
    protected void onMessage(Object msg) 
    {
        log.info("STEP: onMessage method BEGIN");
        unhandled(msg);
        log.info("STEP: onMessage method END");
    }

    protected abstract void workAssigned() throws Exception;
    
    protected void workComplete()
    {
        log.info("STEP: workComplete method BEGIN");
        this.work = null;
        getContext().stop(getSelf());
        this.complete = true;
        log.info("STEP: workComplete method END");
    }
    
    public boolean isComplete()
    {
        return complete;
    }
    
    protected T getWork()
    {
        return work;
    }
}

java akka
1个回答
0
投票

我们使用ask()方法来获取响应

ask
方法发送消息然后等待响应。因此,如果您拨打
send
那么
ask
您将发送两条消息,并且只等待其中一条消息。

参与者会覆盖每条引入竞争条件的消息上的

work
字段,这一事实加剧了这种情况。

看起来您需要删除

send
方法并仅使用
ask
这将确保参与者一次只处理一条消息。

© www.soinside.com 2019 - 2024. All rights reserved.