Databricks:结构化流因 TimeoutException 失败

问题描述 投票:0回答:1

我想使用 kafka 源在 databricks 中创建结构化流。 我按照此处所述的说明进行操作。我的脚本似乎已启动,但它因流的第一个元素而失败。当我使用

confluent_kafka
时,流本身工作正常,并产生结果和工作(在数据块中),因此我似乎缺少一个不同的问题:

处理初始流后,脚本超时:

java.util.concurrent.TimeoutException: Stream Execution thread for stream [id = 80afdeed-9266-4db4-85fa-66ccf261aee4, 
runId = b564c626-9c74-42a8-8066-f1f16c7ab53d] failed to stop within 36000 milliseconds (specified by spark.sql.streaming.stopTimeout). See the cause on what was being executed in the streaming query thread.`

我尝试过的:查看SO并找到this答案,我将其包括在内

spark.conf.set("spark.sql.streaming.stopTimeout",  36000)
进入我的设置 - 没有改变任何东西。

任何意见都将受到高度赞赏!

from pyspark.sql import functions as F
from pyspark.sql.types import *

# Define a data schema
schema = StructType() \
           .add('PARAMETERS_TEXTVALUES_070_VALUES', StringType())\
           .add('ID', StringType())\
           .add('PARAMETERS_TEXTVALUES_001_VALUES', StringType())\
           .add('TIMESTAMP', TimestampType())


df = spark \
    .readStream \
    .format("kafka") \
    .option("host", "stream.xxx.com") \
    .option("port", 12345)\
    .option('kafka.bootstrap.servers', 'stream.xxx.com:12345') \
    .option('subscribe', 'stream_test.json') \
    .option("startingOffset", "earliest") \
    .load()

df_word = df.select(F.col('key').cast('string'),
                    F.from_json(F.col('value').cast('string'), schema).alias("parsed_value"))
  

df_word \
      .writeStream \
      .format("parquet") \
      .option("path", "dbfs:/mnt/streamfolder/stream/") \
      .option("checkpointLocation", "dbfs:/mnt/streamfolder/check/") \
      .outputMode("append") \
      .start()

我的流输出数据如下所示:

"PARAMETERS_TEXTVALUES_070_VALUES":'something'
"ID":"47575963333908"
"PARAMETERS_TEXTVALUES_001_VALUES":12345
"TIMESTAMP": "2020-10-22T15:06:42.507+02:00"

此外,

stream
check
文件夹中充满了 0-b 文件,但
metadata
除外,其中包括上述错误中的 ìd。

谢谢,注意安全。

apache-spark pyspark apache-kafka streaming
1个回答
0
投票

我也有同样的问题。 我检查了驱动程序日志并在堆栈跟踪中发现了此异常:

org.apache.spark.SparkException: Failed to store executor broadcast spark_join_relation_3540_1455983219 (size = Some(67371008)) in BlockManager with storageLevel=StorageLevel(memory, deserialized, 1 replicas)

根据 此建议 我提高了驱动程序内存(在我的情况下为 16GB 至 32GB),它解决了问题。

StackOverflow 上的这个答案解释了它的工作原理。

© www.soinside.com 2019 - 2024. All rights reserved.