如何以编程方式加载和流式传输Kafka主题作为PySpark数据框

问题描述 投票:-1回答:1

有很多方法可以将spark数据帧读/写到kafka。我正在尝试从kafka主题中读取消息,并从中创建数据框架。能够从主题中提取消息,但是无法将其转换为数据名人。任何建议都会有所帮助。

import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.context import SparkContext
from kafka import KafkaConsumer

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

consumer = KafkaConsumer('Jim_Topic')

for message in consumer:
    data = message
    print(data) # Printing the messages properly
    df = data.map # am unable to convert it to a dataframe.
apache-spark pyspark apache-kafka kafka-consumer-api
1个回答
0
投票

根据您的用例,您可以

  1. create a Kafka source for streaming queries中的任何一个>>
  2. create a Kafka source of batch queries

  3. 用于流式查询

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "Jim_Topic")
  .load()

# Query data
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .as[(String, String)]

用于批量查询

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "Jim_Topic")
  .load()

# Query data
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .as[(String, String)]
    
© www.soinside.com 2019 - 2024. All rights reserved.