将流数据帧写入kafka

问题描述 投票:1回答:1

我正在通过火花结构化流媒体读取kafka主题的日志行,分隔日志行的字段,对字段执行一些操作,并将其存储在数据帧中,每个字段都有单独的列。我想把这个数据帧写到kafka

下面是我将示例数据帧和写入流写入kafka

 val dfStructuredWrite = dfProcessedLogs.select(
    dfProcessedLogs("result").getItem("_1").as("col1"),
    dfProcessedLogs("result").getItem("_2").as("col2"),
    dfProcessedLogs("result").getItem("_17").as("col3"))

dfStructuredWrite
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()

上面的代码给出了我的错误

Required attribute 'value' not found

我相信这是因为我没有键/值格式的数据帧。如何以最有效的方式将现有数据帧写入kafka?

apache-kafka spark-structured-streaming
1个回答
0
投票

写入Kafka的Dataframe在模式中应该包含以下列:

  • key(可选)(类型:字符串或二进制)
  • value(必需)(类型:字符串或二进制)
  • 主题(可选)(类型:字符串)

在您的情况下,没有value列,并抛出异常。

您必须修改它以至少添加值列,例如:

import org.apache.spark.sql.functions.{concat, lit}

dfStructuredWrite.select(concat($"col1", lit(" "), $"col2", lit(" "), $"col3").alias("value"))

有关详细信息,请查看:https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

© www.soinside.com 2019 - 2024. All rights reserved.