如何将Spark Stream数据保存到文件中

问题描述 投票:0回答:1

我是Spark的新手,目前正在解决在Context时间之后将Spark Stream的结果保存到文件中的问题。因此,问题是:我希望查询运行60秒,并将在该时间内读取的所有输入保存到文件中,并且还能够定义文件名以供将来处理。

起初我以为下面的代码将是正确的方法:

sc.socketTextStream("localhost", 12345)
                .foreachRDD(rdd -> {
                    rdd.saveAsTextFile("./test");
                });

但是,运行后,我意识到,它不仅为每个输入读取保存了一个不同的文件-(假设我在该端口上以随机速度生成了随机数),因此,如果在一秒钟内读取1,该文件将包含1个数字,但是如果它读取更多,文件将具有它们,而不是只写一个具有该60s时间范围内所有值的文件-但我也无法命名该文件,因为saveAsTextFile中的参数是所需的目录。

所以我想问一下是否有任何火花本机解决方案,因此我不必通过“ java技巧”来解决它,就像这样:

sc.socketTextStream("localhost", 12345)
                .foreachRDD(rdd -> {
                    PrintWriter out = new PrintWriter("./logs/votes["+dtf.format(LocalDateTime.now().minusMinutes(2))+","+dtf.format(LocalDateTime.now())+"].txt");
                    List<String> l = rdd.collect();
                    for(String voto: l)
                        out.println(voto + "    "+dtf.format(LocalDateTime.now()));
                    out.close();
                });

我搜索了类似问题的Spark文档,但找不到解决方法://谢谢您的时间:)

apache-spark apache-spark-sql spark-streaming
1个回答
0
投票

下面是使用新的Spark API使用套接字流数据的模板。

import org.apache.spark.sql.streaming.{OutputMode, Trigger}

object ReadSocket {

  def main(args: Array[String]): Unit = {
    val spark = Constant.getSparkSess

    //Start reading from socket
    val dfStream = spark.readStream
      .format("socket")
      .option("host","127.0.0.1") // Replace it your socket host
      .option("port","9090")
      .load()

    dfStream.writeStream
      .trigger(Trigger.ProcessingTime("1 minute")) // Will trigger 1 minute
      .outputMode(OutputMode.Append) // Batch will processed for the data arrived in last 1 minute
      .foreachBatch((ds,id) => { //
        ds.foreach(row => { // Iterdate your data set
          //Put around your File generation logic
          println(row.getString(0)) // Thats your record
        })
      }).start().awaitTermination()
  }

}

有关代码说明,请阅读阅读代码内联注释

Java版本

import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;

public class ReadSocketJ {

    public static void main(String[] args) throws StreamingQueryException {
        SparkSession spark = Constant.getSparkSess();


        Dataset<Row> lines = spark
                .readStream()
                .format("socket")
                .option("host", "127.0.0.1") // Replace it your socket host
                .option("port", "9090")
                .load();

        lines.writeStream()
                .trigger(Trigger.ProcessingTime("5 seconds"))
                .foreachBatch((VoidFunction2<Dataset<Row>, Long>) (v1, v2) -> {
                    v1.as(Encoders.STRING())
                            .collectAsList().forEach(System.out::println);
                }).start().awaitTermination();


    }
}
© www.soinside.com 2019 - 2024. All rights reserved.