Spark流媒体实时处理

问题描述 投票:0回答:2

我需要一个应用程序来实现工人之间的通信。比方说,工人1正在进行工作1,它会产生一个数据输出,而其他的工作则依赖这个数据输出,这个过程要重复很多次,也就是说每当工人1产生一个新的数据集时,其他的工人就要开始输入这个数据集并进行工作。此外,这个过程要重复很多次,也就是说,每当工人1产生一个新的数据集,其他工人就要开始输入这个数据集,并做他们的工作。spark能做到这一点吗?到目前为止,我已经看到spark的流式实时处理,但工人之间似乎没有发生流式通信?任何方向或建议将被感激。

apache-spark streaming
2个回答
0
投票

你必须在1个单一的Spark Streaming Job中一个接一个地定义所需的操作。

虽然我没有尝试过,但你也可以尝试使用一些工作流组件,比如Oozie来配置你的标准Spark Batch Job(非流式)。

最近Spring XD也引入了与Spark Job的集成。这可能也是可行的--。http:/www.slideshare.netmark_fisherspark-meets-spring.


-1
投票
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
import org.json4s.jackson.Json;

public class lotWeather {
    public static void main(String[] args) throws StreamingQueryException {
        System.setProperty("hadoop.home.dir", "C:\\hadoop-common-2.2.0-bin-master");
        SparkSession sparkSession = SparkSession.builder().appName("SparkStreamingMessageListener").master("local").getOrCreate();
    enter code here
        StructType weatherType= new StructType().add("quarter","String").add("heatType", "string").add("heat","integer")
                .add("windType","string").add("wind","integer");

        Dataset<Row> rawData = sparkSession.readStream().schema(weatherType).option("sep", ",")
                .csv("C:\\Users\\sorun\\OneDrive\\Masaüstü\\bigdata\\sparkstreaming\\*");

        Dataset<Row> heatData = rawData.select("quarter", "heat").where("heat>29");


        StreamingQuery start = heatData.writeStream().outputMode("append").format("console").start();
        start.awaitTermination();




    }
}
© www.soinside.com 2019 - 2024. All rights reserved.