我正在尝试使用Akka流将消费者与生产者联系起来。
while (true) {
JsonNode msg = producer.getNextDataEnvelope();
if (msg == null) {
break;
}
System.out.println(msg.toString());
final Source<JsonNode, NotUsed> source = Source.from(getJSONMessage(msg));
final Sink<JsonNode, CompletionStage<Done>> sink =
Sink.foreach(receivedMsg -> consumer.sendJson((ObjectNode) receivedMsg));
final RunnableGraph<CompletionStage<Done>> runnable = source.toMat(sink, Keep.right());
final CompletionStage<Done> producerConsumer = runnable.run(system);
Thread.sleep(1);
}
private static ObjectNode getJSONMessage(JsonNode message) {
JsonNode pipelineMsg = message.get(KEYNAME);
return (ObjectNode)pipelineMsg;
}
getJSONMessage之后产生的json是这样的>>
{ a: { }, b: { } } When this JSON goes to consumer it is processing it as
a:{}
首先,然后
b: { }
我将如何在Akka流中立即处理完整的JSON负载,而不是在JSON负载上进行迭代。
我正在尝试使用Akka流将消费者与生产者联系起来。 while(true){JsonNode msg = producer.getNextDataEnvelope(); if(msg == null){中断; ...
Source.from
方法从Source.from
对象创建Akka流Source
。由于Iterable
实现了ObjectNode
,因此通过迭代其子级,您的流在对象中的每个值将具有一个元素。