spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

Spark动态分配

我有一个在启用了动态资源分配的集群上运行的spark作业。我使用num executors和executor内存属性提交spark作业..这里优先考虑的是什么?会......

回答 1 投票 1

pyspark流媒体与卡夫卡错误

我在MapR环境中使用带有kafka 0.9的spark 2.1.0版本。我正在尝试从Kafka主题读入spark spark。但是,当我运行Kafkautils时,我面临如下错误...

回答 1 投票 1

使用spark.readStream .format(“s3-sqs”)获取空值以获取SQS消息

我正在尝试从Amazon SQS队列中读取消息。权限正常,我可以看到记录计数 - 但所有记录都是空的。无法弄清楚为什么我得到空值。我可以看到......

回答 1 投票 0

如何在结构化流媒体下通过spark从kafka行中提取值?

鉴于我从Kafka中提取的数据框。如何通过模式匹配从中提取值?数据帧:df = spark \ .readStream \ .format(“kafka”)\ .option(“kafka.bootstrap.servers”...

回答 1 投票 2

是否可以使用Spark Streaming SQL实时解析Kafka主题中的JSON字符串?

我有一个连接到kafka经纪人的Pyspark笔记本,并创建一个名为temp的spark writeStream。 Kafka主题中的数据值是json格式,但我不知道如何创建一个火花......

回答 1 投票 1

Azure Iot Hub将Spark结构化流式传输到COSMOS DB(带和wo Kafka)

对于第一个场景,我想在入口和输出中的Cosmos DB中使用带有Iot Hub的Spark Structured Streaming。我使用了以下连接器:azure-eventhubs-spark_2.11-2.3.2 azure -...

回答 2 投票 1

SparkStreaming:DirectStream RDD到数据帧[重复]

我正在研究火花流式传输上下文,它从avaf序列化中获取kafka主题的数据,如下所示。 val kafkaParams = Map [String,Object](“bootstrap.servers” - >“localhost:9092”,“...

回答 1 投票 0

Spark Streaming:排长队/活跃批次

有谁可以请指出这个活跃批次在那里停留数周并且从未被处理过的原因是什么?非常感谢。我的猜测不是执行者,而是更多的工人/ ......

回答 1 投票 4

如果我有LOCALLY执行的函数,则在foreachRdd中使用kafka代码进行Spart流式传输不会执行

我已在本地设置spark 2.2并且使用scala spark会话配置在val sparkSession = SparkSession .builder()。appName(“我的应用程序”).config(“es.nodes”,“localhost:9200”)...

回答 1 投票 0

如何使用广播变量集成spark streaming和kafka?

我在整合kafka和spark streaming时遇到广播变量的问题。当我没有使用spark广播时,kafka和spark流整合没有问题,那么我用广播,它是......

回答 2 投票 0

SBT测试错误:java.lang.NoSuchMethodError:net.jpountz.lz4.LZ4BlockInputStream

获得以下异常,当我尝试使用scalatest在SBT窗口上执行我的火花流代码的单元测试时。 sbt testOnly < > * * * * * * 2018-06 -...

回答 1 投票 10

循环与rdd上的累加器

我想循环n次,其中n是一个累加器,在同一个rdd上,假设n = 10,所以我希望下面的代码循环5次(因为累加器增加了2)val key = keyAcm.value.toInt ...

回答 1 投票 0

Spark Structured Streaming将mapGroupWithState输出到镶木地板

我有一个Spark Structured Streaming应用程序,它使用mapGroupWithState进行事件合并。它与控制台接收器完美配合,但在生产中我需要以镶木地板格式写入数据,但我...

回答 1 投票 0

使用Spark .setMaster(“local [*]”)与.setMaster(“local [3]”)[复制]的不同输出

我正在研究火花一段时间。最近我遇到了一些奇怪的情况,我试图找出根本原因。我怀疑.setMaster(“local [*]”)和....的不同输出

回答 2 投票 2

如何将AWS Kinesis流用于多个不同的数据源

我们有一个传统的批处理应用程序,我们从多个来源(Oracle,Salesforce,FTP文件,Web日志等)中提取数据。我们将传入的数据存储在S3存储桶中并在EMR上运行Spark以...

回答 1 投票 0

如何验证pyspark流媒体到安全的sasl普通卡夫卡?

我试图从火花流到安全的kafka经纪人(使用SASL PLAINTEXT机制)。即时通讯使用Pyspark,使用spark-streaming-kafka-0-8版本,并使用kafka broker版本0.10.2。 ...

回答 1 投票 1

如何计算来自Spark Streaming DStream的每个Key或Grouping记录的平均值?

我有一个Spark Scala的问题,我想从dstream数据计算平均值,我从kafka到dstream得到数据,[(2,110),(2,130),(2,120),(3,200),(3,206),( 3,206),(4150),(4160),(...

回答 1 投票 0

在rdd中将元素组合成数组

如何将RDD [(Int,Int)]转换为RDD [Array [(Int,Int)]],其中我将元素与其键组合在一起。让我们说(0,0),(1,0),(1,1),(0,1),我希望它是一个数组arr1 =((0,0),(1,0))和一个...

回答 1 投票 0

如何在Scala中将时间戳大小调整为毫秒?

我有一个包含列列表的数据框;其中一列是具有不同长度的key_time。我需要在第二部分中将所有key_time长度保持为毫秒,如:Original key_time ...

回答 1 投票 0

如何在本地模式下更改执行程序的数量?

是否可以使用某些Spark Conf设置在本地模式下为Spark Streaming应用程序设置多个执行程序?目前,我无法看到Spark UI在性能方面的任何变化或......

回答 3 投票 2

© www.soinside.com 2019 - 2024. All rights reserved.