使用Akka.Net Graphs时如何访问Source.Queue的OfferAsync方法?

问题描述 投票:0回答:1

我正在开发一个使用 Graph API 的 Akka.net Streams 应用程序。我想通过 Source.Queue 的 OfferAsync 方法提供源数据。

创建图表后如何访问 ISourceQueueWithComplete.OfferAsync 方法以将数据添加到流中?

这是我用来创建图表的代码:


    // Create a graph
    var runnableGraph = RunnableGraph.FromGraph(GraphDsl.Create(
        builder =>
    {
       // create source
       var sourceQueue = Source.Queue<int>(100, OverflowStrategy.Fail)
       
       // use builder to configure the graph
       ...
    }

    // run the graph
    runnableGraph.Run(materializer);

这是我想用来获取流数据的代码:

while (true)
{
   var events = GetEventsFromExternalSource();
   foreach(var singleEvent in events)
   {
      sourceQueueWithComplete.OfferAsync(singleEvent);
   }
}
   
c# akka.net akka.net-streams
1个回答
0
投票

当您最初调用

Source.Queue<T>
时,您只需将
GraphDsl
作为输入变量传递 - 这将允许图形具有物化类型,您可以在编译图形后访问该类型:

var actorSystem = ActorSystem.Create("Test");

// create source
var sourceQueue = Source.Queue<int>(100, OverflowStrategy.Fail);

var graph = GraphDsl.Create(sourceQueue, (builder, source) =>
{
    // connected shapes
    var flow = builder.Add(Flow.Create<int>().Select(i =i * 10));
    var sink = builder.Add(Sink.ForEach<int>(i =Console.WriteLine(i)));
    
    builder.From(source).To(flow);
    builder.From(flow).To(sink);
    
    return ClosedShape.Instance;
});

ISourceQueueWithComplete<intqueueSource = actorSystem.Materializer().Materialize(graph);

foreach(var i in Enumerable.Range(0, 10)){
    await queueSource.OfferAsync(i);
}

执行该程序将导致:

0
10
20
30
40
50
60
70
80
90
© www.soinside.com 2019 - 2024. All rights reserved.