spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

将Spark Streaming数据发送回客户端

我是Apache Spark Streaming的新手。我正在开发一个spark流媒体应用程序,以找到最短的路径,并再次发送路径回到客户端。我已经写了代码来获取数据和...

回答 1 投票 0

火花流。每天进行一次汇总

我有一个流式数据框架,我想计算一些每日计数器。到目前为止,我一直在使用带有水印的翻滚窗口,如下所示。.withWatermark("timestamp", "10分钟") \ .groupBy(... ...

回答 1 投票 0

当试图将数据帧火花保存到一个hdfs文件时出现错误。

我使用ubuntu,当我试图将一个数据帧保存到HDFS(spark scala)时:procesed.write.format("json").save("hdfs:/localhost:54310mydataenedisPOCprocessed.json")我得到了这个错误,原因是:org...。

回答 1 投票 1

在PySpark结构化流中,Kafka JSON数据与模式为空。新模式的输入不匹配

我正在尝试在Spark结构化流中读取JSON中的Kafka消息。Kafka中的消息示例如下。{ "_id": { "$oid": "5e58f86d5 "5e58f86d5afd84019c13540c" }, "Id": 8, "...

回答 1 投票 -1

如何Intialize火花shell与特定的用户保存数据到hdfs的apache火花。

im使用ubuntu im使用spark依赖使用intellij命令 "spark "没有找到,但可以安装。...(当我在shell中输入spark)我有两个用户胺,和hadoop_amine(其中hadoop ...

回答 1 投票 0

如何在Spark中把输入的数据流保存到执行数据结构中进行sql查询?

I'm new in the word of bigdata. 我的目标是在某种数据结构中维护一个输入数据流,对其进行查询和聚合操作。有一个连续的数据作为输入...

回答 1 投票 0

跳过的阶段对Spark作业有什么性能影响吗?

我正在运行一个spark结构化流作业,其中包括创建一个空的数据框架,使用每个微批更新它,如下所示。随着每一个微批处理的执行,阶段数增加......。

回答 1 投票 0

火花重新分区

什么是需要修复的数据。如何决定spark中Reparation的大小。修复的概念是否适用于spark流和结构化流。DF.Repartion(num)

回答 1 投票 0

是否可以使用spark流媒体来流式传输数据库表数据?

试图流SQLServer表数据。所以,已经创建了一个简单的java程序与主类。创建了一个sparkconf,并使用,发起了一个JavaStreamingContext和检索SparkContext从它。...

回答 1 投票 0

每当我运行Scala对象时,都会出现scala.MatchError消息。

下面这段代码是我使用Spark Streaming的一个Twitter Streaming应用的一部分。

回答 1 投票 0

Spark Scala NoClassDefFoundError: orgapachesparkLogging

我检查了很多其他论坛和帖子 但我似乎无法找出问题所在。我所看到的都是人们说不要使用日志记录,以及它是如何被废弃的,但我甚至不知道我在哪里... ...

回答 1 投票 0

从Kafka解析嵌套的json的方案。

我收到来自Kafka的JSON字符串, 需要由PySpark处理. 字符串如下。{"_id": {"$oid": "5eb56a371af2d82e242d24ae"}, "Id": 7, "时间戳": {"$date": 1582889068586}, "Id": 7, "Timestamp": {"$date": 1582889068586},"...

回答 1 投票 0

为什么会有隐藏的Spark流媒体属性?

我刚开始学习Spark,出现了不少让我惊恐的事情。其中最简单的一个就是,似乎有一些Spark流媒体属性,他们没有使 ...

回答 1 投票 0

使用Spark Accumulators与结构化流的关系

在我的结构化流作业中,我在updateAcrossEvents方法中更新Spark Accumulators,但是当我试图在StreamingListener中打印它们时,它们总是0。下面是代码。....

回答 1 投票 0

Py4JJavaError: 在调用o25.sql时发生错误:org.apache.spark.sql.AnalysisException。表或视图未找到:table1

我正在学习火花流,当我执行下面的代码时,我得到一个错误,这是为了执行tweet分析:我正在使用jupyter-notebook。###可能会引起废弃警告,可以忽略,它们不是来自......的错误。

回答 1 投票 0

在火花数据框架中找不到值&&。当比较空值时

Hi I have 2 Dataframes df1 and df2 I'm joining these 2 dataframe based on id column then create one new column as result and check below test conditions. 1. 如果名字在两个案例中是相同的......

回答 1 投票 0

在现有列的基础上增加新的列,并在Spark数据框架中加入协整值。

我想根据下面的条件在我的数据框中新建一列。我的数据框是这样的: my_string 2020 test 2020 prod 2020 dev 我的条件: value1=从......减去空格后的字符串。

回答 1 投票 0

如何使用RDD的persist和cache?

请告诉我如何使用RDD方法Persist()和Cache(),似乎对于我通常用java写的传统程序来说,比如说sparkStreaming,这是一个继续执行的DAG,其中......

回答 2 投票 2

如何将Spark Streaming检查点保存在谷歌云存储上?

我试图运行Spark结构化流作业,并将检查点保存到谷歌存储,我有一对夫妇的工作,一个和聚合工作完美,但第二个与聚合抛出异常。I ...

回答 1 投票 4

Spark Streaming找到了文件,但却声称找不到文件。

我有下面的东西--它可以监控一个目录& 每X秒拉入一次日志。我的问题是这样的。我设置脚本运行,然后在目录中创建一个文件(比如说testfile... ...

回答 1 投票 -1

© www.soinside.com 2019 - 2024. All rights reserved.