将JSON作为Akka流中的单个实体处理,而不是对其进行迭代

问题描述 投票:-1回答:1

我正在尝试使用Akka流将消费者与生产者联系起来。

while (true) {
            JsonNode msg = producer.getNextDataEnvelope();
            if (msg == null) {
                break;
            }
            System.out.println(msg.toString());
            final Source<JsonNode, NotUsed> source = Source.from(getJSONMessage(msg));
            final Sink<JsonNode, CompletionStage<Done>> sink =
              Sink.foreach(receivedMsg -> consumer.sendJson((ObjectNode) receivedMsg));

            final RunnableGraph<CompletionStage<Done>> runnable = source.toMat(sink, Keep.right());
            final CompletionStage<Done> producerConsumer = runnable.run(system);
            Thread.sleep(1);
        }

        private static ObjectNode getJSONMessage(JsonNode message) {
          JsonNode pipelineMsg = message.get(KEYNAME);
          return (ObjectNode)pipelineMsg;
        }

getJSONMessage之后产生的json是这样的>>

{
  a: {
  },
  b: {
  }
}

When this JSON goes to consumer it is processing it as 

a:{}

首先,然后

b: {
}

我将如何在Akka流中立即处理完整的JSON负载,而不是在JSON负载上进行迭代。

我正在尝试使用Akka流将消费者与生产者联系起来。 while(true){JsonNode msg = producer.getNextDataEnvelope(); if(msg == null){中断; ...

java akka akka-stream
1个回答
0
投票

Source.from方法从Source.from对象创建Akka流Source。由于Iterable实现了ObjectNode,因此通过迭代其子级,您的流在对象中的每个值将具有一个元素。

© www.soinside.com 2019 - 2024. All rights reserved.