对 Akka.NET 流进行故障排除:在源-接收器模式中消息未到达远程接收器 Actor

问题描述 投票:0回答:1

背景: 我正在使用 Akka.Streams 处理 Akka.NET 集群,实现源-接收器模式。我的设置涉及两个 Actor,SourceActor 和 SinkActor,其中 SourceActor 通过流向 SinkActor 发送消息。

问题: 从 SourceActor 发送的消息未被远程 SinkActor 接收。 SourceActor 中的本地流引用似乎正在发送消息,但这些消息未到达远程接收器。

代码片段:

来源演员:

// Inside SourceActor
using Akka.Actor;
using Akka.Hosting;
using Akka.Streams;
using Akka.Streams.Dsl;
using Newtonsoft.Json;
using WebAppShared.Messages;

namespace WebAppSource;
public class SourceActor : ReceiveActor {
    protected ILogger _logger;
    protected IActorRegistry _registry;

    private Source<Hi, IActorRef> _streamSource;
    private IActorRef _streamRef;
    public SourceActor(ILogger<SourceActor> logger, IActorRegistry registry) {
        _logger = logger;
        _registry = registry;
        Become(Ready);
        _streamSource = Source.ActorRef<Hi>(bufferSize: 100, OverflowStrategy.DropNew);

        // Define the primary sink for processing each event
        var sink = Sink.ForEach<Hi>(ProcessEvent);

        // I've changed this a few times.
        _streamRef =
            _streamSource
            .To(Sink.ForEach<Hi>(ProcessEvent))
            .Run(Context.Materializer());
    }

    private void Ready() {
        _logger.LogDebug("Source is Ready");
        Receive<Hi>((msg) => {
            _logger.LogDebug("Sink told Source Hi.");

            // This Doesn't make it to Remote Ref, but does pop up in "ProcessEvent(Hi evt) locally
            Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromMilliseconds(500), _streamRef, msg, Self);
            var sink = StreamRefs.SourceRef<Hi>();
        });
        ReceiveAsync<RequestStreamRef>(HandleStreamRequest);
    }

    private void ProcessEvent(Hi evt) {
        try {
            _logger.LogDebug($"Received Data Event {JsonConvert.SerializeObject(evt)}");
        }
        catch (Exception ex) {
            _logger.LogError(ex.Message);
        }
    }

    private async Task HandleStreamRequest(RequestStreamRef message) {
        await _streamSource
            .RunWith(StreamRefs.SourceRef<Hi>(), Context.System.Materializer())
            .PipeTo(Sender, success: sourceRef => new StreamRefOffered(sourceRef));
    }

    protected override void Unhandled(object message) {

        var theType = message.GetType().FullName;
        _logger.LogCritical($"Unhandled at Source: {theType}");
        base.Unhandled(message);
    }
}

using Akka.Actor;
using Akka.Hosting;
using Akka.Streams;
using WebAppShared.Interfaces;
using WebAppShared.Messages;

namespace WebAppSink;

public class SinkActor : ReceiveActor {
    protected ILogger _logger;
    protected IActorRegistry _registry;
    private bool _subscribed = false;

    public SinkActor(ILogger<SinkActor> logger, IActorRegistry registry) {
        _logger = logger;
        _registry = registry;
        Become(Ready);
        Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(1), Self, new Hi(), Self);
    }

    private async Task Subscribe() {
        var actor = _registry.Get<IAmTheSource>();
        actor.Tell(new RequestStreamRef(), Self);
    }

    private void Ready() {
        _logger.LogDebug("Sink is Ready");
        ReceiveAsync<LogsOffer>(async (m) => {
            await m.SourceRef.Source.RunForeach(Console.WriteLine, Context.System.Materializer());
        });

        Receive<Hi>(async (msg) => {
            var actor = _registry.Get<IAmTheSource>();

            if (!_subscribed) {
                await Subscribe();
                Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(10), Self, msg, Self);
                return;
            }
            _logger.LogDebug("Source told Sink Hi.");
            Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(1), actor, msg, Self);
        });
        Receive<StreamRefOffered>(HandleSourceRefOffered);
        Receive<string>((message) => {
            _logger.LogWarning($"What is this?  {message}");
        });
    }

    private void HandleSourceRefOffered(StreamRefOffered offer) {
        offer
            .SourceRef
            .Source
            .RunForeach(ProcessDataEvent, Context.System.Materializer());
        _subscribed = true;
        Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(5), Self, new Hi(), Self);
    }
    private void ProcessDataEvent(Hi dataEvent) {
        // THIS NEVER FIRES
        _logger.LogInformation($"Sink Received Hi over the Stream.");
    }

    protected override void Unhandled(object message) {
        // NOTHING RECEIVED HERE
        var theType = message.GetType().FullName;
        _logger.LogCritical($"Unhandled at Sink: {theType}");
        base.Unhandled(message);
    }

}

这些位于两个不同的 CSProj 文件中,使用共享类库。

public class StreamRefOffered {
    public StreamRefOffered() {
    }

    public StreamRefOffered(ISourceRef<Hi> sourceRef) {
        SourceRef = sourceRef;
    }
    public ISourceRef<Hi> SourceRef { get; private set; }
}
public class RequestStreamRef {
    
}
public class Hi {
    public string Message { get; set; }
}

生成的日志文件: 来源:

[10:29:10 DBG] Start timer [Akka.Cluster.SBR.SplitBrainResolverBase+Tick] with generation [1]
[10:29:11 DBG] Associated [akka.tcp://drew@localhost:5551] -> akka.tcp://drew@localhost:5550
[10:29:11 DBG] Drained buffer with maxWriteCount: 50, fullBackoffCount: 1,smallBackoffCount: 0, noBackoffCount: 0,adaptiveBackoff: 1000
[10:29:11 INF] Cluster Node [akka.tcp://drew@localhost:5551] - Welcome from [akka.tcp://drew@localhost:5550]
[10:29:11 DBG] SBR add Up [Member(address = akka.tcp://drew@localhost:5550, Uid=1144886708 status = Up, role=[sink], upNumber=1, version=1.0.0)]
[10:29:11 DBG] Creating singleton identification timer...
[10:29:11 DBG] SBR reset stable deadline when members/unreachable changed
[10:29:11 DBG] SBR add Joining/WeaklyUp [Member(address = akka.tcp://drew@localhost:5551, Uid=1545331179 status = Joining, role=[source], upNumber=2147483647, version=1.0.0)]
[10:29:11 DBG] Trying to identify singleton at [akka.tcp://drew@localhost:5550/user/sink/singleton]
[10:29:11 INF] Singleton identified at [akka.tcp://drew@localhost:5550/user/sink/singleton]
[10:29:11 DBG] Watching: [akka://drew/user/sinkProxy -> akka.tcp://drew@localhost:5550/user/sink/singleton]
[10:29:11 DBG] Sending buffered messages to current singleton instance
[10:29:11 DBG] Cluster Node [akka.tcp://drew@localhost:5551] - Receiving gossip from [UniqueAddress: (akka.tcp://drew@localhost:5550, 1144886708)]
[10:29:11 DBG] SBR add Up [Member(address = akka.tcp://drew@localhost:5551, Uid=1545331179 status = Up, role=[source], upNumber=2, version=1.0.0)]
[10:29:11 DBG] Creating singleton identification timer...
[10:29:11 DBG] SBR reset stable deadline when members/unreachable changed
[10:29:11 INF] Singleton manager started singleton actor [akka://drew/user/source/singleton]
[10:29:11 DBG] Trying to identify singleton at [akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:11 INF] ClusterSingletonManager state change [Start -> Oldest] Akka.Cluster.Tools.Singleton.Uninitialized
[10:29:11 DBG] Source is Ready
[10:29:12 INF] Singleton identified at [akka://drew/user/source/singleton]
[10:29:12 DBG] Sending buffered messages to current singleton instance
[10:29:12 DBG] Created SinkRef, pointing to remote Sink receiver: null, local worker: [akka://drew/user/StreamSupervisor-0/$$a#1501495602]
[10:29:12 DBG] Watching: [akka://drew/user/StreamSupervisor-0/$$a -> akka.tcp://drew@localhost:5550/user/StreamSupervisor-0/$$a]
[10:29:12 DBG] Received cumulative demand [32], consumable demand: [32]
[10:29:18 DBG] Sink told Source Hi.
[10:29:19 DBG] Received Data Event {"Message":null}
[10:29:22 DBG] Sink told Source Hi.
[10:29:23 DBG] Received Data Event {"Message":null}

水槽:

[10:29:10 DBG] Start timer [Akka.Cluster.SBR.SplitBrainResolverBase+Tick] with generation [1]
[10:29:10 INF] Cluster Node [1.0.0] - Node [akka.tcp://drew@localhost:5550] is JOINING itself (with roles [sink], version [1.0.0]) and forming a new cluster
[10:29:10 INF] Cluster Node [akka.tcp://drew@localhost:5550] - is the new leader among reachable nodes (more leaders may exist)
[10:29:10 INF] Cluster Node [akka.tcp://drew@localhost:5550] - Leader is moving node [akka.tcp://drew@localhost:5550] to [Up]
[10:29:10 DBG] Creating singleton identification timer...
[10:29:10 DBG] SBR add Up [Member(address = akka.tcp://drew@localhost:5550, Uid=1144886708 status = Up, role=[sink], upNumber=1, version=1.0.0)]
[10:29:10 DBG] SBR reset stable deadline when members/unreachable changed
[10:29:10 INF] This node is now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
[10:29:10 DBG] Trying to identify singleton at [akka.tcp://drew@localhost:5550/user/sink/singleton]
[10:29:10 INF] Singleton manager started singleton actor [akka://drew/user/sink/singleton]
[10:29:10 INF] ClusterSingletonManager state change [Start -> Oldest] Akka.Cluster.Tools.Singleton.Uninitialized
[10:29:10 DBG] Sink is Ready
[10:29:11 DBG] Associated [akka.tcp://drew@localhost:5550] <- akka.tcp://drew@localhost:5551
[10:29:11 INF] Cluster Node [akka.tcp://drew@localhost:5550] - Received InitJoin message from [[akka.tcp://drew@localhost:5551/system/cluster/core/daemon/joinSeedNodeProcess-1#1924990792]] to [akka.tcp://drew@localhost:5550]
[10:29:11 INF] Cluster Node [akka.tcp://drew@localhost:5550] - Sending InitJoinAck message from node [akka.tcp://drew@localhost:5550] to [[akka.tcp://drew@localhost:5551/system/cluster/core/daemon/joinSeedNodeProcess-1#1924990792]]
[10:29:11 INF] Cluster Node [1.0.0] - Node [akka.tcp://drew@localhost:5551] is JOINING, roles [source], version [1.0.0]
[10:29:11 DBG] SBR add Joining/WeaklyUp [Member(address = akka.tcp://drew@localhost:5551, Uid=1545331179 status = Joining, role=[source], upNumber=2147483647, version=1.0.0)]
[10:29:11 DBG] Cluster Node [akka.tcp://drew@localhost:5550] - Receiving gossip from [UniqueAddress: (akka.tcp://drew@localhost:5551, 1545331179)]
[10:29:11 INF] Cluster Node [akka.tcp://drew@localhost:5550] - Leader is moving node [akka.tcp://drew@localhost:5551] to [Up]
[10:29:11 DBG] SBR add Up [Member(address = akka.tcp://drew@localhost:5551, Uid=1545331179 status = Up, role=[source], upNumber=2, version=1.0.0)]
[10:29:11 DBG] SBR reset stable deadline when members/unreachable changed
[10:29:11 DBG] Creating singleton identification timer...
[10:29:11 DBG] Trying to identify singleton at [akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:11 DBG] Trying to identify singleton at [akka.tcp://drew@localhost:5550/user/sink/singleton]
[10:29:11 INF] Singleton identified at [akka://drew/user/sink/singleton]
[10:29:11 DBG] Sending buffered messages to current singleton instance
[10:29:11 DBG] Singleton not available, buffering message type [WebAppShared.Messages.RequestStreamRef]
[10:29:11 DBG] Cluster Node [akka.tcp://drew@localhost:5550] - Receiving gossip from [UniqueAddress: (akka.tcp://drew@localhost:5551, 1545331179)]
[10:29:12 DBG] Trying to identify singleton at [akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:12 INF] Singleton identified at [akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:12 DBG] Sending buffered messages to current singleton instance
[10:29:12 DBG] Watching: [akka://drew/user/sourceProxy -> akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:12 DBG] [SourceRef-0] Allocated receiver: [akka://drew/user/StreamSupervisor-0/$$a#903597150]
[10:29:12 DBG] Received first message from [akka.tcp://drew@localhost:5551/user/StreamSupervisor-0/$$a#1501495602], assuming it to be the remote partner for this stage
[10:29:12 DBG] Watching: [akka://drew/user/StreamSupervisor-0/$$a -> akka.tcp://drew@localhost:5551/user/StreamSupervisor-0/$$a]
[10:29:12 DBG] [SourceRef-0] Demanding until [32] (+32)
[10:29:12 DBG] [SourceRef-0] Received handshake Akka.Streams.Implementation.StreamRef.OnSubscribeHandshake from [akka.tcp://drew@localhost:5551/user/StreamSupervisor-0/$$a#1501495602]
[10:29:13 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:14 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:15 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:16 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:17 DBG] Source told Sink Hi.
[10:29:17 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:18 DBG] Forwarding message of type [WebAppShared.Messages.Hi] to current singleton instance at [akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:18 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:19 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:20 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:21 DBG] Source told Sink Hi.
[10:29:21 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:22 DBG] Forwarding message of type [WebAppShared.Messages.Hi] to current singleton instance at [akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:22 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:23 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]  
// THIS CONTINUES FOREVER
akka.net akka.net-streams
1个回答
0
投票

StreamRef 未到达 SinkActor:

确保StreamRefOffered消息从SourceActor成功发送到SinkActor。从您的日志来看,源似乎能够将接收器识别为流的接收者,但值得确认这一点。 检查收到StreamRefOffered消息时是否调用了SinkActor中的HandleSourceRefOffered方法。这对于建立连接至关重要。 需求和背压:

Akka.NET Streams 使用背压来调节消息流。确保需求通过流正确传播。 在您的日志中,您会在 SourceRef-0 部分中看到“计划重新交付需求直至 [32]”。这表明需求已发送至源,但可能值得调查需求未得到满足的原因。 物化器配置:

验证物化器设置(尤其是缓冲区大小和超时)是否适合您的用例。如果需要,调整这些设置。 确保 SourceActor 和 SinkActor 中的物化器使用相同的配置。配置不匹配可能会导致意外行为。 ActorSystem 地址匹配:

仔细检查两个 Actor 中的 Actor 系统地址是否与 Akka.NET 节点的实际地址匹配。任何不匹配都可能导致沟通问题。 防火墙和网络配置:

确保不存在防火墙问题阻止源节点和接收节点之间的通信。检查所需端口是否打开。 验证网络配置是否允许节点之间进行通信。 演员消息序列化:

确保 Hi 消息和其他自定义消息类型是可序列化的。 Akka.NET 依赖于消息序列化,任何序列化问题都可能导致消息传递失败。 演员终止:

检查 Actor 是否意外终止。 Actor 终止可能会导致消息丢失。 日志记录和调试:

提高 Akka.NET 中的日志记录级别,以获取有关参与者和流内部发生的情况的更多详细信息。 添加更多调试日志来跟踪消息流并确定发生故障的位置。

© www.soinside.com 2019 - 2024. All rights reserved.