spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

Spark RDD记录计数和Spark Streaming Web UI不协调

我正在尝试在Spark中创建一个从Kafka获取数据的流。当我检查RDD中的记录计数时,计数似乎与Web UI不同。我为DStream中的所有RDD执行函数(...

回答 1 投票 0

使用Java代码将Azure HDInsight Spark应用程序链接到Azure EventHub

[如何使用JAVA代码在Azure HDInsight Spark群集应用程序中进行某些处理之后,编写代码以将消息从1个EventHub读取到另一个消息?]

回答 1 投票 0

kafka kafka-consumer-groups.sh --describe不返回消费者组的输出

kafka版本1.1 --list可以获取使用者组bin / kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --command-config config / client_security.properties注意:这不会显示。 ..

回答 1 投票 1

如何将Spark Streaming与Kafka连接

从pyspark.py中导入SparkContext,从pyspark.streaming中导入SparkContext。pyspark.streaming.kafka中导入StreamingContext。

回答 1 投票 0

如何使用spark-cassandra-connector将数据集写入Cassandra表中?

我正在尝试将结构化蒸汽数据集保存到给定的Cassandra表中。我正在使用datastax cassandra连接器版本spark-cassandra-connector_2-11.jar在尝试保存dataSet时,如下所示...

回答 1 投票 1

使用自定义接收器[Spark Streaming]从数据集中接收流数据

我是Spark的新手,它是小菜一碟!我有一种情况,我需要将数据集发送到自定义接收器的store(),然后从此自定义接收器流式传输数据。我正在以这种方式获取数据集...

回答 1 投票 2

是否有额外的开销将Spark数据帧缓存在内存中?

我是Spark的新手,想了解是否存在额外的开销/延迟来持久存储和取消持久化内存中的数据帧。据我到目前为止所知,没有发生数据移动...

回答 1 投票 1

火花流与结构化流

最近几个月,我一直在使用大量结构化流来实施流作业(在大量使用Kafka之后)。读完《用Apache Spark进行流处理》一书后,我有了这个...

回答 1 投票 1

Spark-不会停止消耗Kafka主题的Spark Stream

[我正在尝试编写一个测试示例,以测试使用kafka中的数据的Spark Streaming示例。我为此使用EmbeddedKafka。隐式val config = EmbeddedKafkaConfig(kafkaPort = 12345)...

回答 1 投票 2

Spark scala数据框获取每一行的值并分配给变量

我有一个如下数据框:val df = spark.sql(“ select * from table”)row1 | row2 | row3 A1,B1,C1 A2,B2,C2 A3,B3,C3我想迭代循环得到这样的值:val value1 =“ A1” val ...

回答 1 投票 -3

Spark + Scala:如何在build.sbt中添加外部依赖项

我是Spark的新手(使用v2.4.5),并且仍在尝试找出添加外部依赖项的正确方法。尝试将Kafka流添加到我的项目时,我的build.sbt看起来像这样:name ...

回答 1 投票 -1

Drools Spark Streaming集成用于有状态的绑定

我正在处理流口水和火花流。我想在Spark Streaming上下文中的整个工作中维护KieSession。每个工作节点的每个会话都在执行spark。我了解...

回答 1 投票 3

无法通过使用scala在Spark Streaming中从kafka主题获取数据来写入csv文件

我正在尝试使用以下代码从Kafka主题中读取数据:object Main {def main(args:Array [String]){val sparkSession = createSparkSession()val df = sparkSession...。

回答 2 投票 1

mapGroupsWithState引发错误,原因:java.lang.NoClassDefFoundError:无法初始化

我正在尝试使用mapGroupsWithState读取csv并获取事件状态,并将其写入kafka。如果我注释掉mapGroupsWithState peice,则下面的代码有效。使用spark版本2.3.1 val event = spark ....

回答 2 投票 0

Spark有状态流随着时间的推移增加内存数量

[我们正在使用Spark结构化流,并使用mapGroupWithState在一段时间内聚合数据。随着超时的发生,我们使用state.remove()删除状态。我们正在使用...

回答 1 投票 0

一段时间后在Spark Streaming中停止流上下文

我建立了一个从Twitter接收DStream的应用程序,停止Streaming上下文的唯一方法是停止执行。我想知道是否有办法设置时间并终止...

回答 2 投票 11

如何确保在Spark流中使用Elasticsearch-Hadoop Connector将所有文档写入Elasticsearch集成

我正在使用Elasticsearch-Hadoop连接器将DStream写入Elasticsearch。它是您可以找到连接器的链接https://www.elastic.co/guide/zh-cn/elasticsearch/hadoop/5.6/spark.html我需要...

回答 1 投票 0

火花流中的分区需要更多时间?

我正在运行一个spark应用程序,其中每1分钟发送一次数据。我正在执行的分区数量为48。它在12个执行器上运行,执行器内存为4G,执行器核心数= 4。以下是...

回答 1 投票 -1

具有SASL_SSL身份验证的Kafka Spark结构化流式传输

我一直在尝试使用Spark结构化流API通过SASL_SSL连接到Kafka集群。我已将jaas.conf文件传递给执行者。看来我无法设置密钥库的值...

回答 1 投票 0

Java中的火花流:使用JavaInputDStream使用一个使用者从两个Kafka主题中读取

我有一个spark应用程序,需要使用一个使用Spark Java的使用者从两个不同的主题中读取它们。两个主题的kafka消息键和值模式相同。以下是...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.