如何使用命令过滤器使用其他数据框的信息过滤数据框

问题描述 投票:0回答:1

我有一个大型数据框,其中包含来自不同设备的大量信息及其ID。我想用另一个数据框中的ID过滤此数据框。我知道我可以通过命令联接轻松地做到这一点,但是我想通过命令过滤器尝试一下。

而且,我正在尝试,因为我已经知道命令过滤器比联接更有效,有人可以对此有所了解吗?

谢谢

我已经尝试过:

val DfFiltered = DF1.filter(col("Id").isin(DF2.rdd.map(r => r(0)).collect())

但出现以下错误:

Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Unsupported component type class java.lang.Object in arrays;
=== Streaming Query ===
Identifier: [id = 0d89d684-d794-407d-a03c-feb3ad6a78c2, runId = b7b774c0-ce83-461e-ac26-7535d6d2b0ac]
Current Committed Offsets: {KafkaV2[Subscribe[MeterEEM]]: {"MeterEEM":{"0":270902}}}
Current Available Offsets: {KafkaV2[Subscribe[MeterEEM]]: {"MeterEEM":{"0":271296}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [value2#21.meterid AS meterid#23]
+- Project [jsontostructs(StructField(meterid,StringType,true), cast(value#8 as string), Some(Europe/Paris)) AS value2#21]
   +- StreamingExecutionRelation KafkaV2[Subscribe[MeterEEM]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]

    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.spark.sql.AnalysisException: Unsupported component type class java.lang.Object in arrays;
    at org.apache.spark.sql.catalyst.expressions.Literal$.componentTypeToDataType(literals.scala:117)
    at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:70)
    at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:164)
    at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:164)
    at scala.util.Try.getOrElse(Try.scala:79)
    at org.apache.spark.sql.catalyst.expressions.Literal$.create(literals.scala:163)
    at org.apache.spark.sql.functions$.typedLit(functions.scala:127)
    at org.apache.spark.sql.functions$.lit(functions.scala:110)
    at org.apache.spark.sql.Column$$anonfun$isin$1.apply(Column.scala:796)
    at org.apache.spark.sql.Column$$anonfun$isin$1.apply(Column.scala:796)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.Column.isin(Column.scala:796)
    at StreamingProcedure.MetersEEM$.meterEemCalcs(MetersEEM.scala:28)
    at LaunchFunctions.LaunchMeterEEM$$anonfun$1.apply(LaunchMeterEEM.scala:23)
    at LaunchFunctions.LaunchMeterEEM$$anonfun$1.apply(LaunchMeterEEM.scala:15)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:534)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:532)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:531)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apacahe.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBtchExecution.scala:160)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
    ... 1 more
scala dataframe apache-spark
1个回答
0
投票

我已经假设Id列中的数据是Integer数据类型。

val list = DF2.select("Id").as[Int] collect()

val DfFiltered = DF1.filter($"Id".isin(list: _*))

DfFiltered collect()

0
投票

从High Performance Spark书中,它解释为:

联接数据是我们许多管道的重要组成部分,Spark Core和SQL都支持相同的基本联接类型。尽管联接非常常见且功能强大,但是它们需要特别考虑性能,因为它们可能需要大型网络传输,甚至可能创建超出我们处理能力的数据集。1在核心Spark中,考虑操作顺序可能更为重要,因为DAG与SQL优化器不同,优化器无法重新排序或下推过滤器。

因此,选择过滤器而不是联接似乎是一个不错的选择

© www.soinside.com 2019 - 2024. All rights reserved.