spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

来自 kafka 的 Spark 流

我正在尝试使用Spark从Kafka流式传输数据,我的代码如下: 从 pyspark 导入 SparkConf、SparkContext 从 pyspark.sql 导入 SparkSession 从 pyspark.sql.functions 导入 * 夫...

回答 1 投票 0

自动加载器过滤重复项

我正在处理流数据帧,想知道如何消除重复项并仅选择最新修改的行。 例如。 ID 修改 1 2023年3月8日 1 2023年3月8日 2 2023年2月8日 2 2023年3月8日

回答 1 投票 0

无法使用架构注册表获取正确的架构

我正在使用 PySpark 中的 from_avro 函数以 Avro 格式从 Kafka 读取数据,并利用在模式注册表中注册的模式。但是,我遇到了一个问题,该问题...

回答 1 投票 0

Spark Java 中的列表模式列表

我无法在 java 中为以下创建模式 JavaRDD newRdd = dataset.javaRDD().map(new Function() { @覆盖 public Row call(Row r) 抛出异常...

回答 0 投票 0

在 Azure 事件中心的单个消费者组上是否可以有多个具有自己检查点的读取进程

在我们的场景中——我们正在轮询一个本地数据库,并使用 REST 端点每 2 分钟将数据发布到事件中心 我们正在尝试将多个实体推送到同一个事件中心......

回答 0 投票 0

databticks 中的 Foreachbatch 在第一个微批次完成后引起问题

我正在使用 foreachbatch 将流数据写入多个目标,并且它在第一次微批处理时工作正常。当它尝试运行第二个微批次时,它失败并显示以下错误...

回答 0 投票 0

使用Livy提交spark submit后外部文件的路径是什么?

我正在使用 Livy batch api 提交 spark 作业,如下所示。在这里,我将 .p12 作为文件参数传递,稍后将在应用程序中用于 ssl 通信。 { “类名&

回答 0 投票 0

想使用广播变量来高效地跨多个节点共享只读数据lat long

在这里输入图片描述 假设您有两个大型数据集:第一个数据集包含有关用户活动的信息 在一个网站上。它由 CSV 格式的日志文件组成,其中每一行

回答 0 投票 0

获取数据类型不匹配:列过滤器中的不同类型((数组<string>和字符串))

在尝试过滤列以检查空数据集时,我遇到了以下类型不匹配错误。 由于数据类型不匹配,无法解析 '(`sellers` = '[]')':'(`sellers` 中的类型不同 ...

回答 2 投票 0

Databricks Autoloader 不保存数据

我是 Databricks Autoloader 的新手。我正在尝试摄取一个简单的 csv 文件,其中包含 3 条格式为 [Fname、Lname、age] 的记录。 以下代码在Databricks中运行成功,但是没有数据

回答 0 投票 0

在 Ubuntu VM 上使用 SPARK-SUBMIT 时出错

尝试运行本书中的此命令时:Spark:权威指南。我遇到了一个问题。就我在 Spark 中进行流式传输而言,我需要使用“spark-submit”而不是

回答 0 投票 0

流式表架构更改

是否可以在不影响检查点文件夹的情况下更新 Databricks 中非空表的架构(更改列的数据类型)(由流式自动加载器加载)? 有没有work-ar...

回答 0 投票 0

Instant.now() 传入 spark forEachBatch 函数未更新

输出 .writeStream() *.foreachBatch(新函数(名称, Instant.now()))* .outputMode("追加") .option("checkpointLocation", "/path/") 。开始(); Instant.now() 传入

回答 0 投票 0

WARN CSVHeaderChecker:CSV 标头不符合架构。 - 但标题是正确的

我正在尝试使用 Spark 流式传输 CSV 文件。 我对 https://dzone.com/articles/spark-structured-streaming-using-java 很有启发。 但是我得到了错误: 22/03/07 13:51:52 警告 CSVHeaderChecker:CSV ...

回答 2 投票 0

Spark 最后一个窗口在追加模式下不刷新

问题很简单,当你使用带有追加模式的TUMBLING窗口时,只有当下一条消息到达时窗口才会关闭(+水印逻辑)。 在当前的实现中,如果你停止

回答 1 投票 0

java.lang.NoSuchMethodError: org.apache.hadoop.hive.common.FileUtils.mkdir 试图将表保存到 Hive

我正在尝试读取 kafka 流并将其作为表保存到 Hive。 消费者代码是: 导入 org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession} 导入 org.apache.spark.sql.functi...

回答 1 投票 0

底层方法抛出异常时的SparkStream

我有一个连续从 Kafka 读取的 spark readStream 函数。我对数据执行了一些操作,并想使用 Spark writeStream 将其批量写入 Cassandra DB。虽然一直

回答 1 投票 0

Spark 流处理不同文件格式的文件夹

我有一个“通用”spark 结构化流作业,它监视顶级文件夹(伞)并遍历所有子文件夹(kafka 主题数据),然后写入每个 Kafka ...

回答 0 投票 0

想要创建持续运行的 Spark 流式查询,该查询从 MemoryStream[String] 读取并输出到控制台

我问的问题——一旦回答——帮助我写一些测试 Spark 结构化流管道,从流源读取和 写入 s3/parquet,但我简化了 ...

回答 0 投票 0

在 ETL 之后写入数据库或 myApp(接收器)之前,Spark 只有窗口结果

输入kafka => ETL spark =>输出到kafka。 假设您要计算每个用户的总观看次数,但这些观看次数可能以百万为单位。如果你只是在数据中使用 KAFKA 接收器......

回答 0 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.