Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。
我是Apache Spark Streaming的新手。我正在开发一个spark流媒体应用程序,以找到最短的路径,并再次发送路径回到客户端。我已经写了代码来获取数据和...
我有一个流式数据框架,我想计算一些每日计数器。到目前为止,我一直在使用带有水印的翻滚窗口,如下所示。.withWatermark("timestamp", "10分钟") \ .groupBy(... ...
我使用ubuntu,当我试图将一个数据帧保存到HDFS(spark scala)时:procesed.write.format("json").save("hdfs:/localhost:54310mydataenedisPOCprocessed.json")我得到了这个错误,原因是:org...。
在PySpark结构化流中,Kafka JSON数据与模式为空。新模式的输入不匹配
我正在尝试在Spark结构化流中读取JSON中的Kafka消息。Kafka中的消息示例如下。{ "_id": { "$oid": "5e58f86d5 "5e58f86d5afd84019c13540c" }, "Id": 8, "...
如何Intialize火花shell与特定的用户保存数据到hdfs的apache火花。
im使用ubuntu im使用spark依赖使用intellij命令 "spark "没有找到,但可以安装。...(当我在shell中输入spark)我有两个用户胺,和hadoop_amine(其中hadoop ...
如何在Spark中把输入的数据流保存到执行数据结构中进行sql查询?
I'm new in the word of bigdata. 我的目标是在某种数据结构中维护一个输入数据流,对其进行查询和聚合操作。有一个连续的数据作为输入...
我正在运行一个spark结构化流作业,其中包括创建一个空的数据框架,使用每个微批更新它,如下所示。随着每一个微批处理的执行,阶段数增加......。
什么是需要修复的数据。如何决定spark中Reparation的大小。修复的概念是否适用于spark流和结构化流。DF.Repartion(num)
试图流SQLServer表数据。所以,已经创建了一个简单的java程序与主类。创建了一个sparkconf,并使用,发起了一个JavaStreamingContext和检索SparkContext从它。...
每当我运行Scala对象时,都会出现scala.MatchError消息。
下面这段代码是我使用Spark Streaming的一个Twitter Streaming应用的一部分。
Spark Scala NoClassDefFoundError: orgapachesparkLogging
我检查了很多其他论坛和帖子 但我似乎无法找出问题所在。我所看到的都是人们说不要使用日志记录,以及它是如何被废弃的,但我甚至不知道我在哪里... ...
我收到来自Kafka的JSON字符串, 需要由PySpark处理. 字符串如下。{"_id": {"$oid": "5eb56a371af2d82e242d24ae"}, "Id": 7, "时间戳": {"$date": 1582889068586}, "Id": 7, "Timestamp": {"$date": 1582889068586},"...
我刚开始学习Spark,出现了不少让我惊恐的事情。其中最简单的一个就是,似乎有一些Spark流媒体属性,他们没有使 ...
在我的结构化流作业中,我在updateAcrossEvents方法中更新Spark Accumulators,但是当我试图在StreamingListener中打印它们时,它们总是0。下面是代码。....
Py4JJavaError: 在调用o25.sql时发生错误:org.apache.spark.sql.AnalysisException。表或视图未找到:table1
我正在学习火花流,当我执行下面的代码时,我得到一个错误,这是为了执行tweet分析:我正在使用jupyter-notebook。###可能会引起废弃警告,可以忽略,它们不是来自......的错误。
Hi I have 2 Dataframes df1 and df2 I'm joining these 2 dataframe based on id column then create one new column as result and check below test conditions. 1. 如果名字在两个案例中是相同的......
在现有列的基础上增加新的列,并在Spark数据框架中加入协整值。
我想根据下面的条件在我的数据框中新建一列。我的数据框是这样的: my_string 2020 test 2020 prod 2020 dev 我的条件: value1=从......减去空格后的字符串。
请告诉我如何使用RDD方法Persist()和Cache(),似乎对于我通常用java写的传统程序来说,比如说sparkStreaming,这是一个继续执行的DAG,其中......
如何将Spark Streaming检查点保存在谷歌云存储上?
我试图运行Spark结构化流作业,并将检查点保存到谷歌存储,我有一对夫妇的工作,一个和聚合工作完美,但第二个与聚合抛出异常。I ...
Spark Streaming找到了文件,但却声称找不到文件。
我有下面的东西--它可以监控一个目录& 每X秒拉入一次日志。我的问题是这样的。我设置脚本运行,然后在目录中创建一个文件(比如说testfile... ...