spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

如何在每一行调用外部服务并保存到jdbc

如何在数据框中的每一行上调用外部服务并将每个结果保存到 jdbc。 df2 = df.map(row -> externalservice.call(row)).cache() df2.save().format('...') ... - 在 ALL 'd...之后执行

回答 0 投票 0

java.lang.ClassCastException:kafka.cluster.BrokerEndPoint 无法转换为 kafka.cluster.Broker:kafka streaming 和 spark

我在用 卡夫卡 0.8.2.1 火花 2.1.2 我试图运行一个代码,它将数据从 kafka 流式传输到 spark bu 我收到这个错误 文件“c:/Users/anish/OneDrive/Desktop/major project/

回答 0 投票 0

爆炸包含 str 格式的字典列表的火花列

How to convert this streaming dataframe in pyspark, +--------------------+------+-------------------- --------------------------+ |时间戳|偏移量|字符串解码(值,UTF-8)...

回答 1 投票 0

包含多个连接的 Spark 流式查询没有输出

我有一个连接查询,它有另一个连接查询作为子查询,但该查询没有输出。我单独运行子查询来找出问题所在,并且它按预期工作。 我正在尝试...

回答 0 投票 0

如何刷新一个表并发执行?

我正在使用 Spark Streaming 2.1。我想定期刷新一些缓存表(由 spark 提供的 DataSource 加载,如 parquet、MySQL 或用户定义的数据源)。 如何刷新表格?

回答 3 投票 0

在使用 PySpark 消费来自 Kafka 的消息时处理架构演变

我是卡夫卡的新手。目前我正在处理一个要求 - 用例: 我正在使用来自 Kafka 的消息(消息由上游团队在 Kafka 中生成)。上游团队不维护...

回答 0 投票 0

pyspark 中是否有“执行日期”或运行日期参数?

我想为历史数据运行一个脚本(2022 data_ 以日期目录格式 yyyy/mm/dd 存储在 azure data lake gen 2 中) 步骤:1 对于 2022 年的每个日期,我都想为......提取数据

回答 1 投票 0

将 nlp 应用于抓取的文本和流数据的 Python 应用程序

我正在尝试为我正在构建的应用程序选择合适的工具。 我想抓取多个平台并将抓取的数据保存在某个地方,然后我想清理并应用 nlp (nltk) 和

回答 0 投票 0

在 Spark Streaming 中使用 UDF 读取大量 XML 到 Delta 表非常慢

我们有一个输入文件的存储库,如 �3 \*\*Events*.xml => 这表示需要在 Spark Structured Streaming 中读取的输入 XML 文件的路径,以便...

回答 0 投票 0

如何连接到 jdbc 作为 Databricks 中的流源

使用 https://github.com/sutugin/spark-streaming-jdbc-source 中的示例 我试图连接到 Postgres 数据库作为 AWS Databricks 中的流媒体源。 我有一个正在运行的集群: 11....

回答 1 投票 0

Spark 没有给所有执行者同等的任务

我正在阅读有 5 个分区的 kafka 主题。由于 5 个内核不足以处理负载,我正在将输入重新分区为 30 个。我用 6

回答 0 投票 0

使用 toTable 在 Databricks 中写入流不会执行 foreachBatch

下面的代码正常工作,即将数据写入输出表,并可在 10 秒内从表中选择。问题是 foreachBatch 没有被执行。 当我有...

回答 1 投票 0

pyspark-如何并行提交Spark SQL?

您好,我有超过 1200 多个 SQL 查询,我想并行提交多个 SQL 查询并将每个查询存储到 CSV 文件中, 由于python有GIL限制,如何并行提交, 我看过其他

回答 0 投票 0

在spark结构化流作业中,我如何从每个微批中的相同起始偏移量读取?

我使用的是spark结构化流。能否在每个批次执行后重置Kafka偏移量,使每个批次从相同的起始偏移量读取,而不是只读取新发现的事件?...

回答 1 投票 0

将Spark Streaming数据发送回客户端

我是Apache Spark Streaming的新手。我正在开发一个spark流媒体应用程序,以找到最短的路径,并再次发送路径回到客户端。我已经写了代码来获取数据和...

回答 1 投票 0

错误 java.lang.NoSuchFieldError。NO_INTS

当运行spark流媒体应用程序从kinesis中获取数据时,出现以下错误。在线程 "Kinesis Receiver 0 "中出现异常 java.lang.NoSuchFieldError: NO_INTS at com.fasterxml......。

回答 1 投票 0

Spark Scala中DataSet[Row]和sql.DataFrame类型的区别[重复]

我对DataSet[Row]和sql.DataFrame这两个数据类型感到困惑。在各种文档等中都提到DataFrame就是DataSet[Row]。那么什么是sql.DataFrame。下面是代码...

回答 1 投票 0

在 "main "线程中出现错误。在线程 "main "中出现异常 java.lang.NoClassDefFoundError: orgapachesparkSparkConf

我正在使用Kafka Spark Streaming。IDLE没有显示任何错误,程序也能成功构建,但我得到了这个错误。Exception in thread "main" java.lang.......

回答 1 投票 0

强行删除火花塞驱动器时,该驱动器无法重新启动。

我有一个火花流作业,我正试图由火花-k8-操作员提交。我一直保持重启策略为始终。然而,在手动删除的驱动程序是没有得到......

回答 1 投票 0

在Spark Straming中使用DStream API从Kafka读取时间戳。

我想用Python用Spark流读取一个Kafka主题的值。我正在使用DStream API,使用spark-streaming-kafka-0-8支持(虽然已经废弃)。我的代码如下。...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.