spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

什么时候广播变量会发生变化?

我被告知广播变量应该是不可变的。然而,我看到了一个代码片段,其中广播变量用作标志。公共类TestBroadcast {private static ...

回答 1 投票 3

Java Spark:com.mongodb.spark.config.writeconfig问题

我试图通过java spark连接器与MongoDB连接,当我提交jar并在spark shell中运行jar时,我收到错误“com.mongodb.spark.config.writeconfig”。这里错误......

回答 1 投票 1

当使用spark-streaming时,如何通过自己保存多个分区的Kafka偏移量

我使用spark-streaming来读取kafka数据,并处理我在下面使用的每一行来创建一个流:lines = KafkaUtils.createDirectStream(jssc,LocationStrategies ....

回答 1 投票 3

如何将每个DStream保存/插入永久表

我一直面临着关于将输出Dstream插入永久SQL表的“Spark Streaming”的问题。我想插入每个输出DStream(来自单个批处理,引发...

回答 2 投票 4

如何从JavaStreamingContext生成JavaPairInputDStream?

我正在学习Apache Spark流,并尝试从JavaStreamingContext生成JavaPairInputDStream。下面是我的代码:import java.util.ArrayList; import java.util.Arrays; import java.util ....

回答 1 投票 0

Spark Streaming Kafka Receivers API - numPartitions

我们正在使用spark-streaming-kafka-0-8接收器。我们无法通过增加numPartitions来增加消耗事件的数量。似乎增加numPartitions不会影响......

回答 1 投票 0

为什么启动StreamingContext失败并出现“IllegalArgumentException:要求失败:没有注册输出操作,所以无需执行”?

我正在尝试使用Twitter作为源执行Spark Streaming示例,如下所示:public static void main(String .. args){SparkConf conf = new SparkConf()。setAppName(“...

回答 3 投票 20

火花结构流:镶木地板分区名称唯一性

在使用Spark Structured stream 2.1从Kafka流式传输时,使用partitionBy一个字符串列(包含yyyy-mm-dd格式的日期字符串),我期望一个interval27e / _spark_metadata ...

回答 1 投票 1

为什么启动StreamingContext失败并出现“IllegalArgumentException:要求失败:没有注册输出操作,所以无需执行”? [重复]

我在火花上部署了主人和工人。当我尝试使用SparkStreaming进行一些计算时,它会失败。我在sbt控制台中创建了StreamingContext。请看下面的错误信息,示例代码,......

回答 1 投票 3

替代递归运行Spark-submit作业

下面是我需要建议的场景,场景:数据摄取是通过Nifi进入Hive表。 Spark程序必须对数据执行ETL操作和复杂连接...

回答 1 投票 -2

updatestatebykey - Pyspark - Spark流媒体

我是新手来激发流媒体。试图了解UpdateStateByKey操作的重要性?有什么用?存储仲裁国家的必要性是什么?这个怎么运作?

回答 1 投票 0

如何阅读从主题到Spark Streaming的Kafka gzip压缩消息

我确实看到我们需要在生产者端进行更改以使用Gzip压缩,但我不确定如何在阅读消息时解压缩。请了解一下从哪里开始。 ...

回答 1 投票 2

如何在火花流中抛出异常

我们有一个火花流程序,它从kafka中提取消息并使用forEachPartiton转换处理每个消息。如果处理中存在特定错误的情况......

回答 1 投票 0

Spark配对rdd按键和组配对RDD并从每组中选择最新组

新的火花和斯卡拉。试着在下面实现。我的消息看起来如下(key,id,version,dataObject)val transformedRDD = processedMessages.flatMap(message => {message.isProcessed ...

回答 1 投票 0

为什么Kafka Direct Stream会为每条消息创建一个新的解码器?

我有一个用Java编写并使用Spark 2.1的Spark流媒体应用程序。我正在使用KafkaUtils.createDirectStream来读取来自Kafka的消息。我正在使用kryo编码器/解码器用于kafka消息。我指定了......

回答 1 投票 6

Apache Spark与Scala合并流文本

我想合并流数据val contents = tweets.map(status => status.getText())val tSender = tweets.map(status => status.getUser()。getScreenName())tSender.print()contents.print ()......

回答 1 投票 0

Apache Spark流媒体 - Timeout长期运行批处理

我正在设置Apache Spark长时间运行的流作业,以使用InputDStream执行(非并行化)流式传输。我想要实现的是当队列中的批处理花费太长时间时(...

回答 1 投票 1

如何处理来自Kafka的avro格式的消息? [重复]

我正在尝试使用spark streaming以程序的形式实现下面的kafka-console-consumer命令(运行良好并输出预期的json数据)功能。 kafka-console-consumer.sh --...

回答 1 投票 1

如何在Scala中访问嵌套或分层的Map结构

我有嵌套/分层Map格式的大量数据。我正在使用Scala和spark streaming,我很新。让我们说样本流数据实例/行看起来像 - Map(nd - > 1,du - > 870,...

回答 1 投票 0

给定messageId的流数据中的缓冲消息

使用案例:我有消息有messageId,多条消息可以有相同的消息ID,这些消息存在于由messageId分区的流管道(如kafka)中,所以我确保所有...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.