我有一个使用DataSet API在Scala中编写的flink批处理程序,该程序会生成我感兴趣的最终数据集。我想在程序中获取该数据集作为变量或值(例如,字符串的列表或序列) ,而不必将其写入任何文件。有可能吗?
我已经看到flink允许收集数据接收器以便进行调试(它们的文档中唯一的示例是Java)。但是,这只允许在本地执行,无论如何我都不知道它在Scala中的等效性。我想要的是在整个flink并行执行完成之后,将最终结果数据集写入程序值或变量。
[首先,对于收集数据接收器的scala版本尝试以下操作:导入org.apache.flink.api.scala._导入org.apache.flink.api.java.io.LocalCollectionOutputFormat;
.
.
val env = ExecutionEnvironment.getExecutionEnvironment
// Create a DataSet from a list of elements
val words = env.fromElements("w1","w2", "w3")
var outData:java.util.List[String]= new java.util.ArrayList[String]()
words.output(new LocalCollectionOutputFormat(outData))
// execute program
env.execute("Flink Batch Scala")
println(outData)
第二,如果您的数据集适合单机内存,为什么需要使用分布式处理框架?我认为您应该多考虑用例!并尝试在数据集上使用正确的transformations。
我将flink 1.72与scala 2.12一起使用。这是我使用Model类包装的使用SVM的streaming预测。我认为最正确的答案是使用collect()
。它将返回Seq。我搜索了几个小时后得到了这个答案。我从Flink Git - Line 95那里得到了这个主意
var temp_jaringan : DataSet[(Vector,Double)] = model.predict_jaringan(value) temp_jaringan.print() var temp_produk : DataSet[(Vector,Double)] = model.predict_produk(value) temp_produk.print() var result_jaringan : Seq[(Vector,Double)] = temp_jaringan.collect() var result_produk : Seq[(Vector,Double)] = temp_produk.collect() if(result_jaringan(0)._2 == 1.0 && result_produk(0)._2 == 1.0 ){ println("Keduanya") }else if(result_jaringan(0)._2 == 1.0 && result_produk(0)._2 == -1.0){ println("Jaringan") }else if(result_jaringan(0)._2 == -1.0 && result_produk(0)._2 == 1.0){ println("Produk") }else{ println("Bukan Keduanya") }
它可能根据其他版本而有所不同。在为我的最终项目使用和搜索像疯狗这样的flink材料长达数周甚至数月才成为毕业要求之后,我知道该项目需要更多的文档和教程,尤其是对于像我这样的初学者。
无论如何,如果我错了,请纠正我。谢谢!