Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。
Spark RDD记录计数和Spark Streaming Web UI不协调
我正在尝试在Spark中创建一个从Kafka获取数据的流。当我检查RDD中的记录计数时,计数似乎与Web UI不同。我为DStream中的所有RDD执行函数(...
使用Java代码将Azure HDInsight Spark应用程序链接到Azure EventHub
[如何使用JAVA代码在Azure HDInsight Spark群集应用程序中进行某些处理之后,编写代码以将消息从1个EventHub读取到另一个消息?]
kafka kafka-consumer-groups.sh --describe不返回消费者组的输出
kafka版本1.1 --list可以获取使用者组bin / kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --command-config config / client_security.properties注意:这不会显示。 ..
从pyspark.py中导入SparkContext,从pyspark.streaming中导入SparkContext。pyspark.streaming.kafka中导入StreamingContext。
如何使用spark-cassandra-connector将数据集写入Cassandra表中?
我正在尝试将结构化蒸汽数据集保存到给定的Cassandra表中。我正在使用datastax cassandra连接器版本spark-cassandra-connector_2-11.jar在尝试保存dataSet时,如下所示...
使用自定义接收器[Spark Streaming]从数据集中接收流数据
我是Spark的新手,它是小菜一碟!我有一种情况,我需要将数据集发送到自定义接收器的store(),然后从此自定义接收器流式传输数据。我正在以这种方式获取数据集...
我是Spark的新手,想了解是否存在额外的开销/延迟来持久存储和取消持久化内存中的数据帧。据我到目前为止所知,没有发生数据移动...
最近几个月,我一直在使用大量结构化流来实施流作业(在大量使用Kafka之后)。读完《用Apache Spark进行流处理》一书后,我有了这个...
Spark-不会停止消耗Kafka主题的Spark Stream
[我正在尝试编写一个测试示例,以测试使用kafka中的数据的Spark Streaming示例。我为此使用EmbeddedKafka。隐式val config = EmbeddedKafkaConfig(kafkaPort = 12345)...
我有一个如下数据框:val df = spark.sql(“ select * from table”)row1 | row2 | row3 A1,B1,C1 A2,B2,C2 A3,B3,C3我想迭代循环得到这样的值:val value1 =“ A1” val ...
Spark + Scala:如何在build.sbt中添加外部依赖项
我是Spark的新手(使用v2.4.5),并且仍在尝试找出添加外部依赖项的正确方法。尝试将Kafka流添加到我的项目时,我的build.sbt看起来像这样:name ...
Drools Spark Streaming集成用于有状态的绑定
我正在处理流口水和火花流。我想在Spark Streaming上下文中的整个工作中维护KieSession。每个工作节点的每个会话都在执行spark。我了解...
无法通过使用scala在Spark Streaming中从kafka主题获取数据来写入csv文件
我正在尝试使用以下代码从Kafka主题中读取数据:object Main {def main(args:Array [String]){val sparkSession = createSparkSession()val df = sparkSession...。
mapGroupsWithState引发错误,原因:java.lang.NoClassDefFoundError:无法初始化
我正在尝试使用mapGroupsWithState读取csv并获取事件状态,并将其写入kafka。如果我注释掉mapGroupsWithState peice,则下面的代码有效。使用spark版本2.3.1 val event = spark ....
[我们正在使用Spark结构化流,并使用mapGroupWithState在一段时间内聚合数据。随着超时的发生,我们使用state.remove()删除状态。我们正在使用...
我建立了一个从Twitter接收DStream的应用程序,停止Streaming上下文的唯一方法是停止执行。我想知道是否有办法设置时间并终止...
如何确保在Spark流中使用Elasticsearch-Hadoop Connector将所有文档写入Elasticsearch集成
我正在使用Elasticsearch-Hadoop连接器将DStream写入Elasticsearch。它是您可以找到连接器的链接https://www.elastic.co/guide/zh-cn/elasticsearch/hadoop/5.6/spark.html我需要...
我正在运行一个spark应用程序,其中每1分钟发送一次数据。我正在执行的分区数量为48。它在12个执行器上运行,执行器内存为4G,执行器核心数= 4。以下是...
具有SASL_SSL身份验证的Kafka Spark结构化流式传输
我一直在尝试使用Spark结构化流API通过SASL_SSL连接到Kafka集群。我已将jaas.conf文件传递给执行者。看来我无法设置密钥库的值...
Java中的火花流:使用JavaInputDStream使用一个使用者从两个Kafka主题中读取
我有一个spark应用程序,需要使用一个使用Spark Java的使用者从两个不同的主题中读取它们。两个主题的kafka消息键和值模式相同。以下是...