背景: 我正在使用 Akka.Streams 处理 Akka.NET 集群,实现源-接收器模式。我的设置涉及两个 Actor,SourceActor 和 SinkActor,其中 SourceActor 通过流向 SinkActor 发送消息。
问题: 从 SourceActor 发送的消息未被远程 SinkActor 接收。 SourceActor 中的本地流引用似乎正在发送消息,但这些消息未到达远程接收器。
代码片段:
来源演员:
// Inside SourceActor
using Akka.Actor;
using Akka.Hosting;
using Akka.Streams;
using Akka.Streams.Dsl;
using Newtonsoft.Json;
using WebAppShared.Messages;
namespace WebAppSource;
public class SourceActor : ReceiveActor {
protected ILogger _logger;
protected IActorRegistry _registry;
private Source<Hi, IActorRef> _streamSource;
private IActorRef _streamRef;
public SourceActor(ILogger<SourceActor> logger, IActorRegistry registry) {
_logger = logger;
_registry = registry;
Become(Ready);
_streamSource = Source.ActorRef<Hi>(bufferSize: 100, OverflowStrategy.DropNew);
// Define the primary sink for processing each event
var sink = Sink.ForEach<Hi>(ProcessEvent);
// I've changed this a few times.
_streamRef =
_streamSource
.To(Sink.ForEach<Hi>(ProcessEvent))
.Run(Context.Materializer());
}
private void Ready() {
_logger.LogDebug("Source is Ready");
Receive<Hi>((msg) => {
_logger.LogDebug("Sink told Source Hi.");
// This Doesn't make it to Remote Ref, but does pop up in "ProcessEvent(Hi evt) locally
Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromMilliseconds(500), _streamRef, msg, Self);
var sink = StreamRefs.SourceRef<Hi>();
});
ReceiveAsync<RequestStreamRef>(HandleStreamRequest);
}
private void ProcessEvent(Hi evt) {
try {
_logger.LogDebug($"Received Data Event {JsonConvert.SerializeObject(evt)}");
}
catch (Exception ex) {
_logger.LogError(ex.Message);
}
}
private async Task HandleStreamRequest(RequestStreamRef message) {
await _streamSource
.RunWith(StreamRefs.SourceRef<Hi>(), Context.System.Materializer())
.PipeTo(Sender, success: sourceRef => new StreamRefOffered(sourceRef));
}
protected override void Unhandled(object message) {
var theType = message.GetType().FullName;
_logger.LogCritical($"Unhandled at Source: {theType}");
base.Unhandled(message);
}
}
using Akka.Actor;
using Akka.Hosting;
using Akka.Streams;
using WebAppShared.Interfaces;
using WebAppShared.Messages;
namespace WebAppSink;
public class SinkActor : ReceiveActor {
protected ILogger _logger;
protected IActorRegistry _registry;
private bool _subscribed = false;
public SinkActor(ILogger<SinkActor> logger, IActorRegistry registry) {
_logger = logger;
_registry = registry;
Become(Ready);
Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(1), Self, new Hi(), Self);
}
private async Task Subscribe() {
var actor = _registry.Get<IAmTheSource>();
actor.Tell(new RequestStreamRef(), Self);
}
private void Ready() {
_logger.LogDebug("Sink is Ready");
ReceiveAsync<LogsOffer>(async (m) => {
await m.SourceRef.Source.RunForeach(Console.WriteLine, Context.System.Materializer());
});
Receive<Hi>(async (msg) => {
var actor = _registry.Get<IAmTheSource>();
if (!_subscribed) {
await Subscribe();
Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(10), Self, msg, Self);
return;
}
_logger.LogDebug("Source told Sink Hi.");
Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(1), actor, msg, Self);
});
Receive<StreamRefOffered>(HandleSourceRefOffered);
Receive<string>((message) => {
_logger.LogWarning($"What is this? {message}");
});
}
private void HandleSourceRefOffered(StreamRefOffered offer) {
offer
.SourceRef
.Source
.RunForeach(ProcessDataEvent, Context.System.Materializer());
_subscribed = true;
Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(5), Self, new Hi(), Self);
}
private void ProcessDataEvent(Hi dataEvent) {
// THIS NEVER FIRES
_logger.LogInformation($"Sink Received Hi over the Stream.");
}
protected override void Unhandled(object message) {
// NOTHING RECEIVED HERE
var theType = message.GetType().FullName;
_logger.LogCritical($"Unhandled at Sink: {theType}");
base.Unhandled(message);
}
}
这些位于两个不同的 CSProj 文件中,使用共享类库。
public class StreamRefOffered {
public StreamRefOffered() {
}
public StreamRefOffered(ISourceRef<Hi> sourceRef) {
SourceRef = sourceRef;
}
public ISourceRef<Hi> SourceRef { get; private set; }
}
public class RequestStreamRef {
}
public class Hi {
public string Message { get; set; }
}
生成的日志文件: 来源:
[10:29:10 DBG] Start timer [Akka.Cluster.SBR.SplitBrainResolverBase+Tick] with generation [1]
[10:29:11 DBG] Associated [akka.tcp://drew@localhost:5551] -> akka.tcp://drew@localhost:5550
[10:29:11 DBG] Drained buffer with maxWriteCount: 50, fullBackoffCount: 1,smallBackoffCount: 0, noBackoffCount: 0,adaptiveBackoff: 1000
[10:29:11 INF] Cluster Node [akka.tcp://drew@localhost:5551] - Welcome from [akka.tcp://drew@localhost:5550]
[10:29:11 DBG] SBR add Up [Member(address = akka.tcp://drew@localhost:5550, Uid=1144886708 status = Up, role=[sink], upNumber=1, version=1.0.0)]
[10:29:11 DBG] Creating singleton identification timer...
[10:29:11 DBG] SBR reset stable deadline when members/unreachable changed
[10:29:11 DBG] SBR add Joining/WeaklyUp [Member(address = akka.tcp://drew@localhost:5551, Uid=1545331179 status = Joining, role=[source], upNumber=2147483647, version=1.0.0)]
[10:29:11 DBG] Trying to identify singleton at [akka.tcp://drew@localhost:5550/user/sink/singleton]
[10:29:11 INF] Singleton identified at [akka.tcp://drew@localhost:5550/user/sink/singleton]
[10:29:11 DBG] Watching: [akka://drew/user/sinkProxy -> akka.tcp://drew@localhost:5550/user/sink/singleton]
[10:29:11 DBG] Sending buffered messages to current singleton instance
[10:29:11 DBG] Cluster Node [akka.tcp://drew@localhost:5551] - Receiving gossip from [UniqueAddress: (akka.tcp://drew@localhost:5550, 1144886708)]
[10:29:11 DBG] SBR add Up [Member(address = akka.tcp://drew@localhost:5551, Uid=1545331179 status = Up, role=[source], upNumber=2, version=1.0.0)]
[10:29:11 DBG] Creating singleton identification timer...
[10:29:11 DBG] SBR reset stable deadline when members/unreachable changed
[10:29:11 INF] Singleton manager started singleton actor [akka://drew/user/source/singleton]
[10:29:11 DBG] Trying to identify singleton at [akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:11 INF] ClusterSingletonManager state change [Start -> Oldest] Akka.Cluster.Tools.Singleton.Uninitialized
[10:29:11 DBG] Source is Ready
[10:29:12 INF] Singleton identified at [akka://drew/user/source/singleton]
[10:29:12 DBG] Sending buffered messages to current singleton instance
[10:29:12 DBG] Created SinkRef, pointing to remote Sink receiver: null, local worker: [akka://drew/user/StreamSupervisor-0/$$a#1501495602]
[10:29:12 DBG] Watching: [akka://drew/user/StreamSupervisor-0/$$a -> akka.tcp://drew@localhost:5550/user/StreamSupervisor-0/$$a]
[10:29:12 DBG] Received cumulative demand [32], consumable demand: [32]
[10:29:18 DBG] Sink told Source Hi.
[10:29:19 DBG] Received Data Event {"Message":null}
[10:29:22 DBG] Sink told Source Hi.
[10:29:23 DBG] Received Data Event {"Message":null}
水槽:
[10:29:10 DBG] Start timer [Akka.Cluster.SBR.SplitBrainResolverBase+Tick] with generation [1]
[10:29:10 INF] Cluster Node [1.0.0] - Node [akka.tcp://drew@localhost:5550] is JOINING itself (with roles [sink], version [1.0.0]) and forming a new cluster
[10:29:10 INF] Cluster Node [akka.tcp://drew@localhost:5550] - is the new leader among reachable nodes (more leaders may exist)
[10:29:10 INF] Cluster Node [akka.tcp://drew@localhost:5550] - Leader is moving node [akka.tcp://drew@localhost:5550] to [Up]
[10:29:10 DBG] Creating singleton identification timer...
[10:29:10 DBG] SBR add Up [Member(address = akka.tcp://drew@localhost:5550, Uid=1144886708 status = Up, role=[sink], upNumber=1, version=1.0.0)]
[10:29:10 DBG] SBR reset stable deadline when members/unreachable changed
[10:29:10 INF] This node is now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
[10:29:10 DBG] Trying to identify singleton at [akka.tcp://drew@localhost:5550/user/sink/singleton]
[10:29:10 INF] Singleton manager started singleton actor [akka://drew/user/sink/singleton]
[10:29:10 INF] ClusterSingletonManager state change [Start -> Oldest] Akka.Cluster.Tools.Singleton.Uninitialized
[10:29:10 DBG] Sink is Ready
[10:29:11 DBG] Associated [akka.tcp://drew@localhost:5550] <- akka.tcp://drew@localhost:5551
[10:29:11 INF] Cluster Node [akka.tcp://drew@localhost:5550] - Received InitJoin message from [[akka.tcp://drew@localhost:5551/system/cluster/core/daemon/joinSeedNodeProcess-1#1924990792]] to [akka.tcp://drew@localhost:5550]
[10:29:11 INF] Cluster Node [akka.tcp://drew@localhost:5550] - Sending InitJoinAck message from node [akka.tcp://drew@localhost:5550] to [[akka.tcp://drew@localhost:5551/system/cluster/core/daemon/joinSeedNodeProcess-1#1924990792]]
[10:29:11 INF] Cluster Node [1.0.0] - Node [akka.tcp://drew@localhost:5551] is JOINING, roles [source], version [1.0.0]
[10:29:11 DBG] SBR add Joining/WeaklyUp [Member(address = akka.tcp://drew@localhost:5551, Uid=1545331179 status = Joining, role=[source], upNumber=2147483647, version=1.0.0)]
[10:29:11 DBG] Cluster Node [akka.tcp://drew@localhost:5550] - Receiving gossip from [UniqueAddress: (akka.tcp://drew@localhost:5551, 1545331179)]
[10:29:11 INF] Cluster Node [akka.tcp://drew@localhost:5550] - Leader is moving node [akka.tcp://drew@localhost:5551] to [Up]
[10:29:11 DBG] SBR add Up [Member(address = akka.tcp://drew@localhost:5551, Uid=1545331179 status = Up, role=[source], upNumber=2, version=1.0.0)]
[10:29:11 DBG] SBR reset stable deadline when members/unreachable changed
[10:29:11 DBG] Creating singleton identification timer...
[10:29:11 DBG] Trying to identify singleton at [akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:11 DBG] Trying to identify singleton at [akka.tcp://drew@localhost:5550/user/sink/singleton]
[10:29:11 INF] Singleton identified at [akka://drew/user/sink/singleton]
[10:29:11 DBG] Sending buffered messages to current singleton instance
[10:29:11 DBG] Singleton not available, buffering message type [WebAppShared.Messages.RequestStreamRef]
[10:29:11 DBG] Cluster Node [akka.tcp://drew@localhost:5550] - Receiving gossip from [UniqueAddress: (akka.tcp://drew@localhost:5551, 1545331179)]
[10:29:12 DBG] Trying to identify singleton at [akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:12 INF] Singleton identified at [akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:12 DBG] Sending buffered messages to current singleton instance
[10:29:12 DBG] Watching: [akka://drew/user/sourceProxy -> akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:12 DBG] [SourceRef-0] Allocated receiver: [akka://drew/user/StreamSupervisor-0/$$a#903597150]
[10:29:12 DBG] Received first message from [akka.tcp://drew@localhost:5551/user/StreamSupervisor-0/$$a#1501495602], assuming it to be the remote partner for this stage
[10:29:12 DBG] Watching: [akka://drew/user/StreamSupervisor-0/$$a -> akka.tcp://drew@localhost:5551/user/StreamSupervisor-0/$$a]
[10:29:12 DBG] [SourceRef-0] Demanding until [32] (+32)
[10:29:12 DBG] [SourceRef-0] Received handshake Akka.Streams.Implementation.StreamRef.OnSubscribeHandshake from [akka.tcp://drew@localhost:5551/user/StreamSupervisor-0/$$a#1501495602]
[10:29:13 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:14 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:15 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:16 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:17 DBG] Source told Sink Hi.
[10:29:17 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:18 DBG] Forwarding message of type [WebAppShared.Messages.Hi] to current singleton instance at [akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:18 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:19 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:20 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:21 DBG] Source told Sink Hi.
[10:29:21 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:22 DBG] Forwarding message of type [WebAppShared.Messages.Hi] to current singleton instance at [akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:22 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:23 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
// THIS CONTINUES FOREVER
StreamRef 未到达 SinkActor:
确保StreamRefOffered消息从SourceActor成功发送到SinkActor。从您的日志来看,源似乎能够将接收器识别为流的接收者,但值得确认这一点。 检查收到StreamRefOffered消息时是否调用了SinkActor中的HandleSourceRefOffered方法。这对于建立连接至关重要。 需求和背压:
Akka.NET Streams 使用背压来调节消息流。确保需求通过流正确传播。 在您的日志中,您会在 SourceRef-0 部分中看到“计划重新交付需求直至 [32]”。这表明需求已发送至源,但可能值得调查需求未得到满足的原因。 物化器配置:
验证物化器设置(尤其是缓冲区大小和超时)是否适合您的用例。如果需要,调整这些设置。 确保 SourceActor 和 SinkActor 中的物化器使用相同的配置。配置不匹配可能会导致意外行为。 ActorSystem 地址匹配:
仔细检查两个 Actor 中的 Actor 系统地址是否与 Akka.NET 节点的实际地址匹配。任何不匹配都可能导致沟通问题。 防火墙和网络配置:
确保不存在防火墙问题阻止源节点和接收节点之间的通信。检查所需端口是否打开。 验证网络配置是否允许节点之间进行通信。 演员消息序列化:
确保 Hi 消息和其他自定义消息类型是可序列化的。 Akka.NET 依赖于消息序列化,任何序列化问题都可能导致消息传递失败。 演员终止:
检查 Actor 是否意外终止。 Actor 终止可能会导致消息丢失。 日志记录和调试:
提高 Akka.NET 中的日志记录级别,以获取有关参与者和流内部发生的情况的更多详细信息。 添加更多调试日志来跟踪消息流并确定发生故障的位置。