spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

Spark Streaming df.writeStream不生成任何输出

我正在使用hdp沙盒2.6.4,并且在本地计算机(主机)上配置了spark。我已经使用shell登录到docker镜像,并开始了一个简单的控制台使用者。我是...

回答 1 投票 0

火花流媒体停止停止

我正在尝试从Spark kafka流中读取消息。但是它停止运行并出现以下错误20/02/14 08:12:33 INFO SparkContext:从关机钩子调用stop()20/02/14 08:12:33 INFO SparkUI:...

回答 1 投票 0

在写S3时,为什么会出现FileNotFoundException

我在我的项目中使用Spark-SQL-2.3.1,Kafka,Java 8,并且希望使用AWS-S3作为野蛮存储。我正在将来自Kafka主题的已消费数据写入/存储到S3存储桶中,如下所示:ds ....

回答 1 投票 0


Spark结构化流Kafka集成-流查询

我正在开发将连接到Kafka源并且在同一源上的应用程序,我想创建具有不同过滤条件的多个流式查询。每个查询都会...

回答 1 投票 0

Spark:使用Spark Scala从Kafka读取Avro消息

我正在尝试使用Spark 2.4.3中的以下代码来读取来自kafka的Avro消息。当数据在kafka上发布时,架构存储在融合架构注册表中。我一直在尝试一些解决方案...

回答 1 投票 1

Zeppelin Spark解释器:从Spark结构化流写入的HDFS数据读取时禁用_spark_metadata

[我们有一个通过Spark结构化流实现的流,它写入HDFS文件夹,并因此创建_spark_metadata子文件夹,以便在写入...时获得一次准确的保证]]

回答 1 投票 1

Scala / Spark Streaming Store将kafka消息转换为Hive

作为数据源,我正在使用kafka流来消费tweet。我编写了一个简单的Spark Streaming应用程序。我可以使用这些推文,并且可以将记录转换为我自己的情况...

回答 1 投票 0

Apache Kafka(v2.4.0)-Python中的Spark(v2.4.4)流集成

所以我在不同的ubuntu机器上安装了Apache Spark 2.4.4和Kafka_2.12-2.4.0。我想从IOT设备中获取所有数据,馈入kafka,然后使用spark-streaming进入spark。我...

回答 1 投票 0

Spark结构化流foreachBatch和UPSERT(合并):保留还是不保留?

如果在具有foreachBatch的结构化流中进行有状态聚合(任意)以将更新合并到增量表中的情况下,是否应该在升级之前将批处理数据帧保留在foreachBatch中? ...

回答 1 投票 0

在Spark中查询流数据集

我有一个流式数据集,其列为:bag_id,ball_color。我想找到每个袋子最受欢迎的颜色。因此,我尝试了以下方法:dataset.groupBy(“ bag_id”,“ color”)#第一个聚合.agg(count(“ ...

回答 2 投票 1

两个Spark结构化的流作业无法写入相同的基本路径

Spark结构化流传输不允许两个结构化流作业将数据写入同一基本目录,而使用dstream则可以。由于_spark_metadata目录将由...

回答 1 投票 -1

正在寻找将我的Spark Scala流作业容器化的Docker映像

我是Spark(3.0.0_preview)和Scala(SBT)的新手。我编写了一个火花流作业,可以从我的IDE在本地成功运行。现在,我正在寻找一种方法来对代码进行泊坞处理,以便... ...>

回答 1 投票 0

如何转义'是保留关键字,不能用作字段名称'在Spark SQL和结构化流中出错?

[当前,当我使用结构化流v2.1.0 + Kafka v0.10进行实时日志处理时,线程“ main” java.lang.UnsupportedOperationException中出现异常:`package`是保留关键字...

回答 1 投票 0

Spark结构化流redis接收器执行不理想

我使用了Spark结构化的流式消费kafka消息,并将数据保存到Redis。通过扩展ForeachWriter [org.apache.spark.sql.Row],我使用了redis接收器来保存数据。代码运行良好,但是...

回答 1 投票 0

Pyspark结构化流解析嵌套Json

[我的项目是,将json写入Kafka主题,然后从kafka主题中读取json,最终导致csv崩溃。一切都很好。但是有些键是嵌套的json。我如何解析json中的列表?示例Json:{“ a”:“ test”,...

回答 1 投票 0

火花结构流Pyspark接收器Csv不附加

将json写入Kafka主题,并从kafka主题中读取json。实际上,我订阅主题并逐行编写控制台。但是我必须下沉/写入文件csv。但是我不能。我曾经写过一次csv,但是没有...

回答 1 投票 1

Scala 2.12对Spark 2.4.2的Elasticsearch支持

我无法找到任何支持Scala 2.12的Spark 2.4.2的ES 6.7.1支持jar在maven回购中,jar仅支持scala 2.11和2.10。 org ....

回答 1 投票 0

如何根据列值选择列?

我的数据框架构如下:根|-值:结构(nullable = true)| |-之前:struct(nullable = true)| | |-id:长(可空=假) | |-名称:字符串(可空= ...

回答 2 投票 1

根据列值火花选择列

我的数据框架构如下:根|-值:结构(nullable = true)| |-之前:struct(nullable = true)| | |-id:长(可空=假) | |-名称:字符串(可空= ...

回答 2 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.