对于给定的数据集(originalData),我需要映射值,然后准备一个新数据集,并结合来自elasticsearch的搜索结果。
Dataset<Row> orignalData = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers","test")
.option("subscribe", "test")
.option("startingOffsets", "latest")
.load();
Dataset<Row> esData = JavaEsSparkSQL
.esDF(spark.sqlContext(), "spark_correlation/doc");
esData.createOrReplaceTempView("es_correlation");
List<SGEvent> listSGEvent = new ArrayList<>();
originalData.foreach((ForeachFunction<Row>) row -> {
SGEvent event = new SGEvent();
String sourceKey=row.get(4).toString();
String searchQuery = "select id from es_correlation where es_correlation.key='"+sourceKey+"'";
Dataset<Row> result = spark.sqlContext().sql(searchQuery);
String id = null;
if (result != null) {
result.show();
id = result.first().toString();
}
event.setId(id);
event.setKey(sourceKey);
listSGEvent.add(event)
}
Encoder<SGEvent> eventEncoderSG = Encoders.bean(SGEvent.class);
Dataset<Row> finalData = spark.createDataset(listSGEvent, eventEncoderSG).toDF();
finalData
.writeStream()
.outputMode(OutputMode.Append())
.format("org.elasticsearch.spark.sql")
.option("es.mapping.id", "id")
.option("es.write.operation", "upsert")
.option("checkpointLocation","/tmp/checkpoint/sg_event")
.start("spark_index/doc").awaitTermination();
火花会引发以下异常:
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
我将Elasticsearch值与数据集结合的方法是否有效?还有其他更好的解决方案吗?
这里有几个问题。
作为例外,orignalData
是一个流查询(streaming Dataset),执行它的唯一方法是使用writeStream.start()
。那是一个问题。
您完成了writeStream.start()
,但有另一个查询finalData
,该查询不是流式传输而是批处理。那是另一个问题。
对于像您这样的“丰富”案例,您可以使用流连接(Dataset.join
运算符)或DataStreamWriter.foreach和DataStreamWriter.foreachBatch中的一个。我认为DataStreamWriter.foreachBatch
会更有效率。
public DataStreamWriter
foreachBatch(VoidFunction2 ,Long>函数) (特定于Java)设置要使用提供的功能处理的流查询的输出。仅在微批量执行模式下(即,触发器不连续时)支持此功能。在每个微批处理中,将在每个微批处理中调用提供的函数,其中(i)输出行作为数据集,以及(ii)批处理标识符。 batchId可以用于重复数据删除,并以事务方式将输出(即提供的数据集)写入外部系统。对于相同的batchId,保证输出数据集完全相同(假设所有操作在查询中都是确定性的)。
[您不仅可以一次性获得流式微批处理的所有数据(类型为Dataset<T>
的第一个输入参数,而且还可以基于该数据(跨执行程序)提交另一个Spark作业。) >
伪代码可能如下所示(我正在使用Scala,因为我对语言更满意):
val dsWriter = originalData.foreachBatch { case (data, batchId) => // make sure the data is small enough to collect on the driver // Otherwise expect OOME // It'd also be nice to have a Java bean to convert the rows to proper types and names val localData = data.collect // Please note that localData is no longer Spark's Dataset // It's a local Java collection // Use Java Collection API to work with the localData // e.g. using Scala // You're mapping over localData (for a single micro-batch) // And creating finalData // I'm using the same names as your code to be as close to your initial idea as possible val finalData = localData.map { row => // row is the old row from your original code // do something with it // e.g. using Java String sourceKey=row.get(4).toString(); ... } // Time to save the data processed to ES // finalData is a local Java/Scala collection not Spark's DataFrame! // Let's convert it to a DataFrame (and leverage the Spark distributed platform) // Note that I'm almost using your code, but it's a batch query not a streaming one // We're inside foreachBatch finalData .toDF // Convert a local collection to a Spark DataFrame .write // this creates a batch query .format("org.elasticsearch.spark.sql") .option("es.mapping.id", "id") .option("es.write.operation", "upsert") .option("checkpointLocation","/tmp/checkpoint/sg_event") .save("spark_index/doc") // save (not start) as it's a batch query inside a streaming query }
[
dsWriter
是DataStreamWriter
,您现在可以启动它以启动流式查询。