为什么不单独使用火花流

问题描述 投票:0回答:4

我对 Kafka/Spark-Streaming 没有太多经验,但我读过很多关于该组合在构建用于分析/仪表板的实时系统方面有多么出色的文章。有人可以向我解释为什么火花流不能单独完成吗?换句话说,为什么 Kafka 处于数据源和 Spark-Streaming 之间?

谢谢

real-time apache-kafka spark-streaming lambda-architecture
4个回答
2
投票

其实这个问题有一个简单的解释。

Spark Streaming 和其他流环境旨在即时读取数据。 在读取过程之后,他们没有太多能力保持数据存活。(其中一些有,但效率不高)。顺便说一句,需要像 Kafka 这样的消息代理来使数据在特定时间内保持活动状态。 因此,其他工具可以通过使用消费者随时轻松地从消息代理(Kafka)获取数据。 划分责任会给你带来有效的结果。


0
投票

为了使用Spark处理数据,我们需要通过Spark支持的不同数据源来提供数据。 (或者我们需要编写自己的自定义数据源)

如果是静态数据,spark提供

  sc.textFile("FILE PATH") //For reading text file
  sc.wholeTextFiles("DIRECTORY PATH") //For reading whole text files in a directory
  sqlContext.read.parquet("FILE PATH")
  sqlContext.read.json("FILE PATH")
  1. 将你的逻辑应用到生成的 RDD 上。

在流媒体情况下,火花支持来自不同来源的数据,例如

Kafka、Flume、Kinesis、Twitter、ZeroMQ、MQTT 等

Spark 也支持简单的套接字流,

val 行 = ssc.socketTextStream("localhost", 9999)

了解更多

Kafka是一个高吞吐量的分布式消息系统。 Kafka 的分布式行为、可扩展性和容错能力比其他消息系统具有优势。 (MQTT、ZMQ 等)

那么问题是这些数据源中哪一个是您的? 您可以用自己的数据源替换kafka数据源。我们使用 MQTT 作为默认源。


0
投票

有人可以向我解释一下为什么 Spark-streaming 不能单独完成吗?

Spark 流适用于实时数据,并且需要从某个地方提取数据。例如 Kafka、Flume、Kinesis 或 TCP 套接字。甚至您可以从文件中读取数据。

https://spark.apache.org/docs/latest/streaming-programming-guide.html

如果您的用例足够简单,可以从文件中读取,我会说选择 apache nifi。

https://www.youtube.com/watch?v=gqV_63a0ABo&list=PLzmB162Wvzr05Pe-lobQEZkk0zXuzms56

也就是说,为什么Kafka处于数据源和数据源之间? 火花流?

根据场景,Kafka通常是合适的选择来存储数据,然后从不同方面进行消费。


0
投票

我也是这个领域的新手,我正在寻找完全相同的东西,我通过练习找到了这个简单的解释。 在流上下文中使用 Spark 时,您通常会将 Spark 实例连接到已建立的套接字,如下面的代码所示。

这就需要消息代理来打开此套接字并使其保持活动状态,即使数据没有到来。

当然,使用像 Kafka 这样的消息代理比仅打开套接字连接有更多优势,特别是在弹性和容错方面。

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

spark = SparkSession.builder \
    .appName("SocketConsumer") \
    .getOrCreate()

ssc = StreamingContext(spark.sparkContext, 1)  # 1-second batch interval

# Create DStream from socket
lines = ssc.socketTextStream("localhost", 9999)  # Change host and port as needed

# Process each RDD
def process_data(rdd):
    rdd.foreach(lambda record: process(record))

def process(record):
    # Implement logic to process the received data
    print(record)  # Example: Print received data

lines.foreachRDD(process_data)

# Start streaming context
ssc.start()
ssc.awaitTermination()
© www.soinside.com 2019 - 2024. All rights reserved.