spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

在SparkStreaming中暂停和恢复KafkaConsumer的工作。

:) 我已经结束了自己在一个(奇怪的)情况下,简单地说,我不想从Kafka消费任何新的记录,所以暂停sparkStreaming消费(InputDStream[ConsumerRecord])的所有......

回答 1 投票 0

我们如何在Spark结构化流中管理偏移量?

背景:我编写了一个简单的Spark结构化蒸汽应用程序,用于将数据从Kafka移至S3。发现为了支持一次保证,spark创建了_spark_metadata文件夹,该文件夹以...

回答 1 投票 0

将Spark结构化的流数据帧传递给函数

我从源卡夫卡读取了火花结构化的流数据帧。我想将此数据帧传递给函数,并将该函数的结果写入某个目标。案例类JsonSchema(...

回答 1 投票 0

[sbt项目使用kafka进行流式传输

我陷入了下面的问题,我能够从Kafka主题中提取数据以引发数据流,但是当我将RDD流连接到Dataset [String]并转储结果数据(经过一些处理之后……)>

回答 1 投票 1

使用Apache Spark流的实时日志处理

我想创建一个可以实时读取日志并使用apache spark处理它的系统。我是否应该使用类似kafka或水槽的东西将日志传递到火花流,还是应该...

回答 3 投票 9

将流XML转换为Spark中的JSON

我是Spark的新手,正在研究一个简单的应用程序,将从Kafka接收的XML流转换为JSON格式,使用:Spark 2.4.5 Scala 2.11.12在我的用例中,kafka流是xml格式)。 ...

回答 1 投票 0

PySpark:将Spark数据帧写入Kafka主题

正在尝试将数据帧加载到Kafka主题。选择键和值时出错。任何建议都会有所帮助。下面是我的代码,data = spark.sql('select * from job')kafka = data ....

回答 1 投票 0

找不到参考'Kafka'导入KafkaUtils

[正在将Kafka主题流式传输到Spark中,但是在从pyspark导入KafkaUtils导入sys时遇到问题,从pyspark导入SparkContext从pyspark.streaming导入...导入SparkConf ...

回答 1 投票 0

我该如何进行转换?

env:spark2.4.5 source.json:{“ a_key”:“ 1”,“ a_pro”:“ 2”,“ a_con”:“ 3”,“ b_key”:“ 4”,“ b_pro”:“ 5 “,” b_con“:” 6“,” c_key“:” 7“,” c_pro“:” 8“,” c_con“:” 9“,.....

回答 1 投票 0

使用Spark SQL流时缺少Avro自定义标题

在将Avro GenericRecord发送到Kafka之前,像这样插入标头。 ProducerRecord record = new ProducerRecord <>(topicName,key,message); record.headers()。add(“ ...

回答 1 投票 1

无法使用spark将结果写入kafka主题

我的最终目标是在处理的批次中写出汇总数据并将其读取到新的Kafka主题。我遵循了官方文档和其他几篇文章,但是没有运气。我会...

回答 1 投票 0

如何重命名现有列在数组中添加新列?

env:saprk-2.4.5 source.json {“ group”:“ 1”,“ name”:“ badboi”,“ rank”:“ 3”,“ fellows”:[{“ name”:“ David” ,“年龄”:“ 25”,“爱好”:“代码” ...

回答 1 投票 0

如何解决java.lang.NoSuchMethodError:org.apache.kafka.clients.producer.KafkaProducer.flush()V错误在pyspark中

我从Kafka主题中读取了一些消息,并且对于每个rdd,都会执行proccess_rdds函数。 def spark_streaming_online():conf = SparkConf()。setMaster(“ spark:// antonis:7077”).setAppName(“ ...

回答 1 投票 0

如何将Spark Stream数据保存到文件中

我是Spark的新手,目前正在解决与在上下文时间过后将Spark Stream的结果保存到文件有关的问题。所以问题是:我希望查询运行60秒并保存所有输入...

回答 1 投票 0

将writeStream放电到kafka-awaitTermination()与awaitAnyTermination()之间的差异

根据官方文档,我使用下面的代码段来编写kafka主题,但未将其写入kafka。 finalStream = final \ .writeStream \ .format(“ kafka”)\ .option(“ ...

回答 1 投票 0

检查HDFS路径[Spark Scala]中是否存在文件

如何在给定基本路径的情况下检查文件是否存在。我正在为该方法提供文件列表,例如:file1.snappy,file2,snappy,...我需要检查文件是否存在于给定的任何一个中...

回答 2 投票 0

火花流-过滤大型数据帧中不存在键的行

假设我有一个流数据帧A和一个较大的静态数据帧B。假定A的大小通常<10000个记录。但是,B是一个更大的数据帧,大小在数百万范围内。 ...

回答 1 投票 0

计算流数据集中事件之间的时间差

在我的应用程序中,将为用户执行的每个操作生成事件,并使用以下格式的数据生成事件-user_id | step_num | event_timestamp这些命令的顺序...

回答 1 投票 0

如何在一个微批量的Spark结构化流中设置批量大小

我正在从Kafka源中读取流数据,但是来自kafka的所有数据都是在一个微型批次中读取的。 spark.readStream.format(“ kafka”)。option(“ kafka.bootstrap.servers”,bootstrap_servers).option(...

回答 1 投票 0


© www.soinside.com 2019 - 2024. All rights reserved.