Databricks:来自 Kafka 的 Spark 结构化流卡在“流初始化”

问题描述 投票:0回答:1

我想使用 kafka 源在 databricks 中创建结构化流。 我按照此处所述的说明进行操作。我的脚本似乎已启动,但是我无法在 databricks 笔记本中打印/输出某些内容。当我使用

confluent_kafka
时,流本身工作正常,并产生结果和工作(在数据块中),因此我似乎缺少一个不同的问题:

脚本似乎“卡在”“运行命令”/“流初始化”处。

任何意见都将受到高度赞赏!

from pyspark.sql import functions as F
from pyspark.sql.types import *

# Define a data schema
schema = StructType() \
           .add('PARAMETERS_TEXTVALUES_070_VALUES', StringType())\
           .add('ID', StringType())\
           .add('PARAMETERS_TEXTVALUES_001_VALUES', StringType())\
           .add('TIMESTAMP', TimestampType())


df = spark \
    .readStream \
    .format("kafka") \
    .option("host", "stream.xxx.com") \
    .option("port", 12345)\
    .option('kafka.bootstrap.servers', 'stream.xxx.com:12345') \
    .option('subscribe', 'stream_test.json') \
    .option("startingOffset", "earliest") \
    .load()

df_word = df.select(F.col('key').cast('string'),
                    F.from_json(F.col('value').cast('string'), schema).alias("parsed_value"))
  
# Group by id and count
df_group = df_word.select('parsed_value.*')\
                  .groupBy('ID').count()

query = df_group \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()

我的流输出数据如下所示:

"PARAMETERS_TEXTVALUES_070_VALUES":'something'
"ID":"47575963333908"
"PARAMETERS_TEXTVALUES_001_VALUES":12345
"TIMESTAMP": "2020-10-22T15:06:42.507+02:00"

澄清一下:我正在尝试将

query
的组件打印到笔记本上以测试连接。此单元格之后或以上单元格之前没有单元格。

谢谢,注意安全。

python apache-spark streaming databricks
1个回答
1
投票

这是已弃用的 Kafka 偏移读取器的一个已知问题,当传入无效配置(例如错误的 IP 地址或端口)时。这是由于 Kafka 客户端在获取元数据时出现问题造成的。

默认情况下启用了新的偏移读取器,这将在这种情况下引发 Kafka

TimeoutException
。如果您使用的是旧版本,请设置 Spark 配置:

spark.sql.streaming.kafka.useDeprecatedOffsetFetchingfalse

参见:https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#offset-fetching

© www.soinside.com 2019 - 2024. All rights reserved.