Apache Storm:无法从多个螺栓接收元组

问题描述 投票:0回答:1

我正在创建一个拓扑,它读取示例视频,对其进行一些转换并将其保存为输出,并且我正在使用 apachestorm 同时应用不同的过滤器。想象一下,在阅读完视频后,我将其帧发送到 2 个螺栓。一个应用高斯模糊效果,另一个锐化每个接收帧。锐化器和高斯模糊都成功地将它们的元组发送到同一目的地。现在我想合并从这两个螺栓发出的结果帧,但聚合器螺栓一次只接收一个元组值。我该如何修复它?

拓扑.java:

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import java.io.*;

public class Topology {
   public static void main(String[] args) throws Exception {
      // Create a log file for stdout and stderr
      File logFile = new File("topology.log");

      // Redirect stdout and stderr to the log file
      PrintStream printStream = new PrintStream(new FileOutputStream(logFile));
       try (printStream) {
           System.setOut(printStream);
           System.setErr(printStream);
           Config config = new Config();                      // Create Config instance for cluster configuration
           config.setDebug(true);
           TopologyBuilder builder = new TopologyBuilder();   // Create a TopologyBuilder

           Spout spout = new Spout();                         // Set the spout and bolts in the topology
           BoltFrameAnalyzer boltFrameAnalyzer = new BoltFrameAnalyzer();
           BoltAnalysisSaver boltAnalysisSaver = new BoltAnalysisSaver();
           BoltImageProcessor boltImageProcessor = new BoltImageProcessor();
           BoltGaussianBlur boltGaussianBlur = new BoltGaussianBlur();
           BoltSharpener boltSharpener = new BoltSharpener();
           BoltFrameAggregator boltFrameAggregator = new BoltFrameAggregator();
           BoltOutputGenerator boltOutputGenerator = new BoltOutputGenerator();
           builder.setSpout("spout", spout, 1);      // Define the data flow by connecting the spout and bolts
           builder.setBolt("bolt-frame-analyzer", boltFrameAnalyzer, 1).shuffleGrouping("spout");
           builder.setBolt("bolt-analysis-saver", boltAnalysisSaver, 1).shuffleGrouping("bolt-frame-analyzer");
           builder.setBolt("bolt-image-processor", boltImageProcessor, 1).shuffleGrouping("spout");
           builder.setBolt("bolt-gaussian-blur", boltGaussianBlur, 1).shuffleGrouping("bolt-image-processor");
           builder.setBolt("bolt-sharpener", boltSharpener, 1).shuffleGrouping("bolt-image-processor");
           builder.setBolt("bolt-frame-aggregator", boltFrameAggregator, 1).shuffleGrouping("bolt-sharpener").shuffleGrouping("bolt-gaussian-blur");
           builder.setBolt("bolt-output-generator", boltOutputGenerator, 1).shuffleGrouping("bolt-frame-aggregator");

           try (LocalCluster cluster = new LocalCluster()) {  // Use try-with-resources
               cluster.submitTopology("Topology", config, builder.createTopology());
               Thread.sleep(100000);  // Adjust sleep time as needed
           }  // Automatic cluster shutdown when exiting the try block
       }
       // Close the PrintStream and log file
   }
}

BoltGaussianBlur.java:

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.opencv.core.Mat;
import org.opencv.core.Size;
import org.opencv.imgcodecs.Imgcodecs;
import org.opencv.imgproc.Imgproc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;

public class BoltGaussianBlur extends BaseBasicBolt {
    private final String framesGaussianBlurFilePath;

    public BoltGaussianBlur() {
        try {
            Properties prop = new Properties();
            prop.load(new FileInputStream("config.ini"));
            framesGaussianBlurFilePath = prop.getProperty("framesGaussianBlurFilePath");
        } catch (IOException e) {
            LOG.error("BoltImageProcessor: Error occurred while reading config.ini file", e);
            throw new RuntimeException(e);
        }
    }

    private static final Logger LOG = LoggerFactory.getLogger(BoltGaussianBlur.class);
    public void execute(Tuple input, BasicOutputCollector collector) {
        int gaussianBlurFrameNumber = input.getIntegerByField("frameNumber");
        LOG.info("BoltGaussianBlur: Frame #" + gaussianBlurFrameNumber + " has been received successfully.");
        Mat receivedFrame = (Mat) input.getValueByField("resizedFrame");
        Mat gaussianBlurFrame = new Mat();
        Imgproc.GaussianBlur(receivedFrame, gaussianBlurFrame, new Size(9, 9), 2, 2);
        String gaussianBlurFileName = framesGaussianBlurFilePath + "/frame_" + gaussianBlurFrameNumber + "_gaussian_blur.png";
        Imgcodecs.imwrite(gaussianBlurFileName, gaussianBlurFrame);
        LOG.info("BoltGaussianBlur: Frame #" + gaussianBlurFrameNumber + " has been converted to Gaussian blur successfully.");
        collector.emit(new Values(gaussianBlurFrame, gaussianBlurFrameNumber));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("gaussianBlurFrame", "gaussianBlurFrameNumber"));
    }
}

BoltSharpener.java:

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.opencv.core.Core;
import org.opencv.core.Mat;
import org.opencv.core.Size;
import org.opencv.imgproc.Imgproc;
import org.opencv.imgcodecs.Imgcodecs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;

public class BoltSharpener extends BaseBasicBolt {
    private final String framesSharpenedFilePath;

    public BoltSharpener() {
        try {
            Properties prop = new Properties();
            prop.load(new FileInputStream("config.ini"));
            framesSharpenedFilePath = prop.getProperty("framesSharpenedFilePath");
        } catch (IOException e) {
            LOG.error("BoltSharpener: Error occurred while reading config.ini file", e);
            throw new RuntimeException(e);
        }
    }

    private static final Logger LOG = LoggerFactory.getLogger(BoltSharpener.class);
    public void execute(Tuple input, BasicOutputCollector collector) {
        int sharpenedFrameNumber = input.getIntegerByField("frameNumber");
        LOG.info("BoltSharpener: Frame #" + sharpenedFrameNumber + " has been received successfully.");
        Mat receivedFrame = (Mat) input.getValueByField("resizedFrame");
        Mat sharpenedFrame = new Mat();
        Imgproc.GaussianBlur(receivedFrame, sharpenedFrame, new Size(0, 0), 10);
        Core.addWeighted(receivedFrame, 1.5, sharpenedFrame, -0.5, 0, sharpenedFrame);
        String sharpenedFileName = framesSharpenedFilePath + "/frame_" + sharpenedFrameNumber + "_sharpened.png";
        Imgcodecs.imwrite(sharpenedFileName, sharpenedFrame);
        LOG.info("BoltSharpener: Frame #" + sharpenedFrameNumber + " has been sharpened successfully.");
        collector.emit(new Values(sharpenedFrame, sharpenedFrameNumber));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sharpenedFrame", "sharpenedFrameNumber"));
    }
}

BoltFrame聚合器

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.opencv.core.Core;
import org.opencv.core.Mat;
import org.opencv.imgcodecs.Imgcodecs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;

public class BoltFrameAggregator extends BaseBasicBolt {
    private final String framesAggregated;
    public BoltFrameAggregator() {
        try {
            Properties prop = new Properties();
            prop.load(new FileInputStream("config.ini"));
            framesAggregated = prop.getProperty("framesAggregatedFilePath");
        } catch (IOException e) {
            LOG.error("BoltAggregator: Error occurred while reading config.ini file", e);
            throw new RuntimeException(e);
        }
    }

    private static final Logger LOG = LoggerFactory.getLogger(BoltFrameAggregator.class);
    public void execute(Tuple input, BasicOutputCollector collector) {
        Mat receivedGaussianBlurFrame = (Mat) input.getValueByField("gaussianBlurFrame");
        int gaussianBlurFrameNumber = input.getIntegerByField("gaussianBlurFrameNumber");
        Mat receivedSharpenedFrame = (Mat) input.getValueByField("sharpenedFrame");
        int sharpenedFrameNumber = input.getIntegerByField("sharpenedFrameNumber");
        int aggregatedFrameNumber = input.getIntegerByField("sharpenedFrameNumber");
        Mat aggregatedFrame = new Mat();
        if (sharpenedFrameNumber == gaussianBlurFrameNumber) {
            Core.addWeighted(receivedSharpenedFrame, 1, receivedGaussianBlurFrame, 1, 0, aggregatedFrame);
            String aggregatedFileName = framesAggregated + "/frame_" + sharpenedFrameNumber + "_aggregated.png";
            Imgcodecs.imwrite(aggregatedFileName, aggregatedFrame);
            LOG.info("BoltAggregator: Frame #" + sharpenedFrameNumber + " has been aggregated successfully.");
            collector.emit(new Values(aggregatedFrame, aggregatedFrameNumber));
        } else {
            LOG.warn("BoltAggregator: Frame numbers for Gaussian blur and sharpened frames do not match.");
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("aggregatedFrame", "aggregatedFrameNumber"));
    }
}

我检查了每个拼写错误,尝试为高斯模糊和锐化器的输出流指定一个名称。即使我尝试存储接收元组,看看在高斯模糊的前 4 个或 5 个元组之后,锐化器元组是否会到达,但不起作用。

错误日志:

[Thread-42-bolt-frame-aggregator-executor[3, 3]] INFO  o.a.s.e.Executor - Processing received TUPLE: source: bolt-gaussian-blur:5, stream: default, id: {}, [Mat [ 720*1280*CV_8UC1, isCont=true, isSubmat=false, nativeObj=0x1fc231bd4a0, dataAddr=0x1fc258f0f60 ], 0] PROC_START_TIME(sampled): null EXEC_START_TIME(sampled): null for TASK: 3 
...
[Thread-42-bolt-frame-aggregator-executor[3, 3]] ERROR o.a.s.u.Utils - Async loop died!
java.lang.RuntimeException: java.lang.IllegalArgumentException: sharpenedFrame does not exist
    at org.apache.storm.executor.Executor.accept(Executor.java:301) ~[storm-client-2.6.0.jar:2.6.0]
    at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113) ~[storm-client-2.6.0.jar:2.6.0]
    at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89) ~[storm-client-2.6.0.jar:2.6.0]
    at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:154) ~[storm-client-2.6.0.jar:2.6.0]
    at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:140) ~[storm-client-2.6.0.jar:2.6.0]
    at org.apache.storm.utils.Utils$1.run(Utils.java:398) ~[storm-client-2.6.0.jar:2.6.0]
    at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: java.lang.IllegalArgumentException: sharpenedFrame does not exist
    at org.apache.storm.tuple.Fields.fieldIndex(Fields.java:98) ~[storm-client-2.6.0.jar:2.6.0]
    at org.apache.storm.tuple.TupleImpl.fieldIndex(TupleImpl.java:101) ~[storm-client-2.6.0.jar:2.6.0]
    at org.apache.storm.tuple.TupleImpl.getValueByField(TupleImpl.java:161) ~[storm-client-2.6.0.jar:2.6.0]
    at BoltFrameAggregator.execute(BoltFrameAggregator.java:33) ~[classes/:?]
    at org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:48) ~[storm-client-2.6.0.jar:2.6.0]
    at org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) ~[storm-client-2.6.0.jar:2.6.0]
    at org.apache.storm.executor.Executor.accept(Executor.java:294) ~[storm-client-2.6.0.jar:2.6.0]
    ... 6 more
java opencv apache-storm
1个回答
0
投票

如错误消息所示,您的变量

sharpenedFrame
似乎不存在。那么您可以仔细检查它是否正确初始化吗?您可能需要应用广泛的日志记录 - 或者在本地计算机上使用伪集群,这使得调试变得更加容易。

© www.soinside.com 2019 - 2024. All rights reserved.