Flink:如何将DataSet写入变量而不是文件中

问题描述 投票:3回答:2

我有一个使用DataSet API在Scala中编写的flink批处理程序,该程序会生成我感兴趣的最终数据集。我想在程序中获取该数据集作为变量或值(例如,字符串的列表或序列) ,而不必将其写入任何文件。有可能吗?

我已经看到flink允许收集数据接收器以便进行调试(它们的文档中唯一的示例是Java)。但是,这只允许在本地执行,无论如何我都不知道它在Scala中的等效性。我想要的是在整个flink并行执行完成之后,将最终结果数据集写入程序值或变量。

scala apache-flink
2个回答
2
投票

[首先,对于收集数据接收器的scala版本尝试以下操作:导入org.apache.flink.api.scala._导入org.apache.flink.api.java.io.LocalCollectionOutputFormat;

 .
 .
val env = ExecutionEnvironment.getExecutionEnvironment

// Create a DataSet from a list of elements
val words = env.fromElements("w1","w2", "w3")

var outData:java.util.List[String]= new java.util.ArrayList[String]()
words.output(new LocalCollectionOutputFormat(outData))

// execute program
env.execute("Flink Batch Scala")
println(outData)

第二,如果您的数据集适合单机内存,为什么需要使用分布式处理框架?我认为您应该多考虑用例!并尝试在数据集上使用正确的transformations


0
投票

我将flink 1.72与scala 2.12一起使用。这是我使用Model类包装的使用SVM的streaming预测。我认为最正确的答案是使用collect()。它将返回Seq。我搜索了几个小时后得到了这个答案。我从Flink Git - Line 95那里得到了这个主意

var temp_jaringan : DataSet[(Vector,Double)] = model.predict_jaringan(value)
temp_jaringan.print()

var temp_produk : DataSet[(Vector,Double)] = model.predict_produk(value)
temp_produk.print()

var result_jaringan : Seq[(Vector,Double)] = temp_jaringan.collect()
var result_produk : Seq[(Vector,Double)] = temp_produk.collect()

if(result_jaringan(0)._2  == 1.0  && result_produk(0)._2 == 1.0 ){
  println("Keduanya")
}else if(result_jaringan(0)._2  == 1.0  && result_produk(0)._2 == -1.0){
  println("Jaringan")
}else if(result_jaringan(0)._2  == -1.0  && result_produk(0)._2 == 1.0){
  println("Produk")
}else{
  println("Bukan Keduanya")
}

它可能根据其他版本而有所不同。在为我的最终项目使用和搜索像疯狗这样的flink材料长达数周甚至数月才成为毕业要求之后,我知道该项目需要更多的文档和教程,尤其是对于像我这样的初学者。

无论如何,如果我错了,请纠正我。谢谢!

© www.soinside.com 2019 - 2024. All rights reserved.