我是Spark的新手,目前正在解决在Context时间之后将Spark Stream的结果保存到文件中的问题。因此,问题是:我希望查询运行60秒,并将在该时间内读取的所有输入保存到文件中,并且还能够定义文件名以供将来处理。
起初我以为下面的代码将是正确的方法:
sc.socketTextStream("localhost", 12345)
.foreachRDD(rdd -> {
rdd.saveAsTextFile("./test");
});
但是,运行后,我意识到,它不仅为每个输入读取保存了一个不同的文件-(假设我在该端口上以随机速度生成了随机数),因此,如果在一秒钟内读取1,该文件将包含1个数字,但是如果它读取更多,文件将具有它们,而不是只写一个具有该60s时间范围内所有值的文件-但我也无法命名该文件,因为saveAsTextFile中的参数是所需的目录。
所以我想问一下是否有任何火花本机解决方案,因此我不必通过“ java技巧”来解决它,就像这样:
sc.socketTextStream("localhost", 12345)
.foreachRDD(rdd -> {
PrintWriter out = new PrintWriter("./logs/votes["+dtf.format(LocalDateTime.now().minusMinutes(2))+","+dtf.format(LocalDateTime.now())+"].txt");
List<String> l = rdd.collect();
for(String voto: l)
out.println(voto + " "+dtf.format(LocalDateTime.now()));
out.close();
});
我搜索了类似问题的Spark文档,但找不到解决方法://谢谢您的时间:)
下面是使用新的Spark API使用套接字流数据的模板。
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
object ReadSocket {
def main(args: Array[String]): Unit = {
val spark = Constant.getSparkSess
//Start reading from socket
val dfStream = spark.readStream
.format("socket")
.option("host","127.0.0.1") // Replace it your socket host
.option("port","9090")
.load()
dfStream.writeStream
.trigger(Trigger.ProcessingTime("1 minute")) // Will trigger 1 minute
.outputMode(OutputMode.Append) // Batch will processed for the data arrived in last 1 minute
.foreachBatch((ds,id) => { //
ds.foreach(row => { // Iterdate your data set
//Put around your File generation logic
println(row.getString(0)) // Thats your record
})
}).start().awaitTermination()
}
}
有关代码说明,请阅读阅读代码内联注释
Java版本
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
public class ReadSocketJ {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = Constant.getSparkSess();
Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", "127.0.0.1") // Replace it your socket host
.option("port", "9090")
.load();
lines.writeStream()
.trigger(Trigger.ProcessingTime("5 seconds"))
.foreachBatch((VoidFunction2<Dataset<Row>, Long>) (v1, v2) -> {
v1.as(Encoders.STRING())
.collectAsList().forEach(System.out::println);
}).start().awaitTermination();
}
}