我正在开发一个使用 Graph API 的 Akka.net Streams 应用程序。我想通过 Source.Queue 的 OfferAsync 方法提供源数据。
创建图表后如何访问 ISourceQueueWithComplete.OfferAsync 方法以将数据添加到流中?
这是我用来创建图表的代码:
// Create a graph
var runnableGraph = RunnableGraph.FromGraph(GraphDsl.Create(
builder =>
{
// create source
var sourceQueue = Source.Queue<int>(100, OverflowStrategy.Fail)
// use builder to configure the graph
...
}
// run the graph
runnableGraph.Run(materializer);
这是我想用来获取流数据的代码:
while (true)
{
var events = GetEventsFromExternalSource();
foreach(var singleEvent in events)
{
sourceQueueWithComplete.OfferAsync(singleEvent);
}
}
当您最初调用
Source.Queue<T>
时,您只需将 GraphDsl
作为输入变量传递 - 这将允许图形具有物化类型,您可以在编译图形后访问该类型:
var actorSystem = ActorSystem.Create("Test");
// create source
var sourceQueue = Source.Queue<int>(100, OverflowStrategy.Fail);
var graph = GraphDsl.Create(sourceQueue, (builder, source) =>
{
// connected shapes
var flow = builder.Add(Flow.Create<int>().Select(i =i * 10));
var sink = builder.Add(Sink.ForEach<int>(i =Console.WriteLine(i)));
builder.From(source).To(flow);
builder.From(flow).To(sink);
return ClosedShape.Instance;
});
ISourceQueueWithComplete<intqueueSource = actorSystem.Materializer().Materialize(graph);
foreach(var i in Enumerable.Range(0, 10)){
await queueSource.OfferAsync(i);
}
执行该程序将导致:
0
10
20
30
40
50
60
70
80
90