spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

根据给定的操作列创建一个新的数据集。

我使用spark-sql-2.3.1v有如下场景。给定数据集 val ds = Seq( (1, "x1", "y1", "0.1992019"), (2, null, "y2", "2.2500000"), (3, "x3", null, "15.34567"), (4, null, "y4", ...

回答 1 投票 0

根据给定的操作列创建一个新的数据集。

我使用的是spark-sql-2.3.1v,有如下方案。给定一个数据集: val ds = Seq( (1,"x1","y1","0.1992019"),(2,null,"y2","2.2500000"),(3,"x3",null,"15.34567"),(4,null,"y4"...。

回答 1 投票 1

在Spark中运行现有的生产型Java应用

我一直在阅读Spark,并且对在可扩展计算集群上分配计算的能力非常感兴趣。我们有生产流处理代码(5K行,用Java 9写的) ...

回答 1 投票 0

访问Spark流数据管道。什么方案最有效?

我正在寻找从Spark数据管道访问数据的最佳方案。场景如下。我正在从Kafka主题中读取数据,创建一个流式数据框架,然后对其进行清理和... ...

回答 1 投票 -1

从Spark Streaming中获取异常 "没有注册输出操作,所以没有执行"。

package com.scala.sparkStreaming import org.apache.spark._ import org.apache.spark.streaming._ object Demo1 { def main(assdf:Array[String]){ val sc=new SparkContext("local", "Stream") ....

回答 1 投票 0

Spark *Structured* Streaming中的RecordTooLargeException异常

我一直收到这个错误信息。当序列化时,消息是1169350字节,这比你在max.request.size配置中配置的最大请求大小要大。由于...

回答 1 投票 0

在pyspark中从本地文本文件流

conf = SparkConf().setMaster("spark:/antonis-dell:7077").setAppName("Kafka_Spark") sc = SparkContext(conf=conf) # .getOrCreate() sc.setLogLevel("WARN") ....

回答 2 投票 0

为什么Spark结构化流作业在引发异常后仍未终止?

我在我的结构化流作业中引发了一个自定义异常来测试失败,如下所示。我看到查询被终止,但不能理解为什么驱动脚本没有以非零的方式失败......。

回答 1 投票 0

使用SparkScala用JSON字段过滤RDD的csv。

我正在研究sparkscala,我需要通过一列的特定字段来过滤一个RDD,在这种情况下,用户。我想返回一个包含用户["Joe", "Plank", "Willy"]的RDD,但似乎想不通......。

回答 1 投票 0

附表火花结构化流媒体

它是以某种方式可能安排一个火花流作业只运行在特定的时间吗说从8AM到8PM?集群是在夜间运行,造成不必要的成本。我怎么能重新初始化......。

回答 1 投票 0

spark writeStream into kafka - awaitTermination()与 awaitAnyTermination()之间的区别。

根据官方文档,我使用下面的代码段写入kafka主题,但它没有写入kafka。 finalStream = final.writeStream \ .format("kafka") \ .option("kafka......")。

回答 1 投票 0

如何在控制台中看到数据框架(相当于结构化流的.show())?

我试图看到什么是作为我的数据框架... 这里是火花代码从pyspark.sql导入SparkSession导入pyspark.sql.functions作为psf导入logging导入时间火花=SparkSession ....

回答 1 投票 0

如何在Pyspark中计算或管理流数据?

我想从流媒体数据中提取数据,然后发送到网页上。例如:我想从流式数据中计算出总销售栏的总和,然后发送到网页上。我将计算流数据中TotalSales列的总和。但是在summary = dataStream.select('TotalSales')时出错......。

回答 1 投票 0

错误。在kafka中使用Spark结构化流来读写数据到另一个主题。

我正在做一个小任务,使用kafka主题读取access_logs文件,然后我计算状态,并将状态的计数发送到另一个kafka主题。但是我一直收到错误信息,比如,当我使用no ...

回答 1 投票 0

spark - 方法 错误。匿名函数的参数类型必须是完全已知的。

我知道有很多问题,但我创建了一个简单的例子,我认为应该可以工作,但仍然不行,我不确定我是否理解为什么 def main(args: Array[String]) ...

回答 1 投票 0

Spark Kafka Producer抛出太多打开的文件 Exception

我正在尝试运行一个用Java编写的Spark Kafka Job,以产生大约10K记录,每批到一个Kafka Topic。这是一个Spark批处理作业,它读取100个(共100万条记录)hdfs部分文件... ...

回答 1 投票 0

什么时候Kafka连接器比Spark流媒体解决方案更受欢迎?

通过Spark流,我可以读取Kafka消息,并将数据写入不同类型的表,例如HBase、Hive和Kudu。但这也可以通过在这些表上使用Kafka连接器来实现。我的...

回答 1 投票 0


在构建路径中找到的scala库的版本与scala IDE提供的版本不兼容。

我正在写一个spark流媒体应用程序,我使用spark汇编jar.但我得到以下错误。在构建路径中找到的scala库的版本不兼容。

回答 1 投票 0

等价于withWatermark火花流的SQL查询。

我有一个方法,它需要spark sql查询作为参数,运行在流数据集上,我必须处理窗口函数和withWatermark。 窗口函数似乎是可能的,但我无法找到...

回答 0 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.