spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

如何调用从Spark作业调用的Web服务?

我想调用Web服务来获取Spark结构化流中的一些数据。可能吗?怎么样?

回答 2 投票 1

Spark结构化流式传输未授权访问组

我正在尝试通过Spark结构化流从Kafka读取数据。但是,在Spark 2.4.0。中,您无法设置流的组ID(请参阅如何在...中设置如何在kafka数据源中为使用者组设置group.id ...

回答 1 投票 1

Spark结构化流媒体不支持Kafka选项'group.id'

[似乎,通过新的Spark结构化流,从Kafka读取数据时,我们不能再将组ID作为选项传递。不支持将Kafka选项'group.id'作为用户指定的使用者组...

回答 1 投票 1

Spark结构化流给我带来了错误,因为org.apache.spark.sql.AnalysisException:'foreachBatch'不支持分区;

我已经在Databricks中设计了以下结构化流代码以写入Azure Data Lake:def upsertToDelta(microBatchOutputDF:DataFrame,batchId:Long){microBatchOutputDF ....

回答 1 投票 0

将数据从Spark结构化流保存到Postgresql(Python)

我很麻烦执行以下任务:我通过Spark结构化流媒体从Kafka消费了随机消息(带有温度数字)。以下是spark DataFrame的屏幕截图:...

回答 1 投票 0

结构化流如何执行pandas_udf?

我想了解结构化流处理如何处理新数据。如果更多行同时到达,则将它们附加到输入流数据帧中,对吗?如果我有一个withColumn和...

回答 1 投票 -1

结构化流如何执行pandas_udf?

我想了解结构化流处理如何处理新数据。如果更多行同时到达,则将它们附加到输入流数据帧中,对吗?如果我有一个withColumn和...

回答 1 投票 0

[[Spark Streaming]:将流数据帧写入Postgres

我有一个流数据帧,我正在尝试将其写入数据库。有用于将rdd或df写入Postgres的文档。但是,我找不到有关它的示例或文档...

回答 1 投票 0

如何将流数据与表进行缓慢更新(例如每天一次)连接?

[在结构化流中,我需要将流数据与一些变化缓慢的数据结合在一起。变化缓慢的数据每天更新一次,并且可能不是在固定的时间更新。但是,流数据来自...

回答 1 投票 0

如何将流数据与表进行缓慢更新(例如每天一次)连接?

[在结构化流中,我需要将流数据与一些变化缓慢的数据结合在一起。变化缓慢的数据每天更新一次,并且可能不是在固定的时间更新。但是,流数据来自...

回答 1 投票 0

如何在流批量流连接中定义连接条件?

我在Java 1.8中使用spark-sql-2.4.1v。和kafka版本spark-sql-kafka-0-10_2.11_2.4.3。我正在尝试将静态数据框架(即元数据)与另一个流数据框架合并,如下所示...

回答 1 投票 0

使用结构化流的流静态内部联接的输出聚合

此问题与Spark 2.4.4有关。我正在执行流静态内部联接,并得到以下结果:-val orderDetailsJoined = orderItemsDF.join(ordersDF,Seq(“ CustomerID”),joinType =“ inner”)+ ...

回答 1 投票 0

为什么UDF在流查询中抛出NotSerializableException?

我将Spark 2.4.3用于一个结构化的流应用程序(从Event Hub Azure的readStream /从CosmosDB的writeStream)。有一些数据转换步骤,其中一个步骤是使...

回答 1 投票 0

如何在Spark结构化流中将流数据集转换为JavaRDD

我有一个结构化的流应用程序,它从hdfs路径读取。 structStream = spark.readStream()。format(“ text”)。load(parameters.get(“ input”))); JavaRDD ... ...>

回答 1 投票 2

如何透视pyspark流数据帧

我在pyspark结构化流中接收流数据,我需要对其进行透视,以便可以从该数据中获得一行。进入我的集群的数据结构是:{“ version”:1 ....

回答 1 投票 0

如何通过Spark结构化流媒体在Kafka中以编程方式创建主题

我想在我的Spark结构化流应用程序中创建多个kafka主题运行时。我发现Java API提供了多种方法。但是用Spark我找不到任何东西...

回答 1 投票 -1

如何将流查询的数据写入Hive?

我正在使用Spark结构化流从HDFS读取数据。我想将该dataFrame保存到Hive。我已经这样做了,但是却给出了错误。 totalSalary.write.format(“ csv”)。mode(“ append”)....

回答 2 投票 3

如何在发生故障时管理,修改Spark HDFS检查点

我对Spark Checkpoint存有疑问。我有一个火花流应用程序,我正在使用以下方法来管理Checkpoint n HDFS:-val checkpointDirectory =“ hdfs://192.168.0.1:8020 / ...

回答 1 投票 1

如何为StructuredNetworkWordCountWindowed示例输入数据,以便时间戳不同(自1970年以来)?

我运行名为StructuredNetworkWordCountWindowed的结构化流演示,我感到困惑,为什么时间戳为“ 1970-01-19 13:28:00”。我应该如何输入示例数据?输入:[i @ 15:44:48]〜$ nc -...

回答 1 投票 2

火花水印需要太多时间来进行分组操作?

我尝试进行一些groupby操作时会抛出火花结构化的流,它提供了预期的输出,但是我的问题是这花费了超过10分钟的时间,但是我的水印时间却是“ 30秒” ...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.