如何丰富流查询的数据并将结果写入Elasticsearch?

问题描述 投票:0回答:1

对于给定的数据集(originalData),我需要映射值,然后准备一个新数据集,并结合来自elasticsearch的搜索结果。

Dataset<Row> orignalData = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers","test")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .load();

Dataset<Row> esData = JavaEsSparkSQL
  .esDF(spark.sqlContext(), "spark_correlation/doc");

esData.createOrReplaceTempView("es_correlation");
List<SGEvent> listSGEvent = new ArrayList<>();

originalData.foreach((ForeachFunction<Row>) row -> {
 SGEvent event = new SGEvent();
 String sourceKey=row.get(4).toString();
 String searchQuery = "select id from es_correlation where es_correlation.key='"+sourceKey+"'";
 Dataset<Row> result = spark.sqlContext().sql(searchQuery);
 String id = null;
 if (result != null) {
    result.show();
    id = result.first().toString();
  }
 event.setId(id);
 event.setKey(sourceKey);
 listSGEvent.add(event)
}
Encoder<SGEvent> eventEncoderSG = Encoders.bean(SGEvent.class);
Dataset<Row> finalData = spark.createDataset(listSGEvent, eventEncoderSG).toDF();

finalData
  .writeStream()
  .outputMode(OutputMode.Append())
  .format("org.elasticsearch.spark.sql")
  .option("es.mapping.id", "id")
  .option("es.write.operation", "upsert")
  .option("checkpointLocation","/tmp/checkpoint/sg_event")
  .start("spark_index/doc").awaitTermination();

火花会引发以下异常:

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)

我将Elasticsearch值与数据集结合的方法是否有效?还有其他更好的解决方案吗?

apache-spark elasticsearch spark-structured-streaming elasticsearch-hadoop
1个回答
0
投票

这里有几个问题。

作为例外,orignalData是一个流查询(streaming Dataset),执行它的唯一方法是使用writeStream.start()。那是一个问题。

您完成了writeStream.start(),但有另一个查询finalData,该查询不是流式传输而是批处理。那是另一个问题。

对于像您这样的“丰富”案例,您可以使用流连接(Dataset.join运算符)或DataStreamWriter.foreachDataStreamWriter.foreachBatch中的一个。我认为DataStreamWriter.foreachBatch会更有效率。

public DataStreamWriter foreachBatch(VoidFunction2 ,Long>函数)

(特定于Java)设置要使用提供的功能处理的流查询的输出。仅在微批量执行模式下(即,触发器不连续时)支持此功能。在每个微批处理中,将在每个微批处理中调用提供的函数,其中(i)输出行作为数据集,以及(ii)批处理标识符。 batchId可以用于重复数据删除,并以事务方式将输出(即提供的数据集)写入外部系统。对于相同的batchId,保证输出数据集完全相同(假设所有操作在查询中都是确定性的)。

[您不仅可以一次性获得流式微批处理的所有数据(类型为Dataset<T>的第一个输入参数,而且还可以基于该数据(跨执行程序)提交另一个Spark作业。) >

伪代码可能如下所示(我正在使用Scala,因为我对语言更满意):

val dsWriter = originalData.foreachBatch { case (data, batchId) =>
  // make sure the data is small enough to collect on the driver
  // Otherwise expect OOME
  // It'd also be nice to have a Java bean to convert the rows to proper types and names
  val localData = data.collect

  // Please note that localData is no longer Spark's Dataset
  // It's a local Java collection

  // Use Java Collection API to work with the localData
  // e.g. using Scala
  // You're mapping over localData (for a single micro-batch)
  // And creating finalData
  // I'm using the same names as your code to be as close to your initial idea as possible
  val finalData = localData.map { row =>
    // row is the old row from your original code
    // do something with it
    // e.g. using Java
    String sourceKey=row.get(4).toString();
    ...
  }

  // Time to save the data processed to ES
  // finalData is a local Java/Scala collection not Spark's DataFrame!
  // Let's convert it to a DataFrame (and leverage the Spark distributed platform)

  // Note that I'm almost using your code, but it's a batch query not a streaming one
  // We're inside foreachBatch
  finalData
    .toDF // Convert a local collection to a Spark DataFrame
    .write  // this creates a batch query
    .format("org.elasticsearch.spark.sql")
    .option("es.mapping.id", "id")
    .option("es.write.operation", "upsert")
    .option("checkpointLocation","/tmp/checkpoint/sg_event")
    .save("spark_index/doc") // save (not start) as it's a batch query inside a streaming query
}

[dsWriterDataStreamWriter,您现在可以启动它以启动流式查询。

© www.soinside.com 2019 - 2024. All rights reserved.