Databricks Autoloader 写入流

问题描述 投票:0回答:1

我在 azure datalake 中加载了多个表(每个表的 csv 文件),并且想使用自动加载器加载 Databricks Delta 表中的每个表。

我有一个Python代码,我使用

for loop
来创建每个表的模式,创建
df
,然后
writeStream
df

我还有函数

update_insert
,我可以在其中进行一些数据操作,并且还包含
merge
函数来更新插入增量表。

这是我的功能代码:

def update_insert(df, epochId, cdm):
    # clean only  100% identical rows'
    print("-------------------   " + cdm)
    df = df.dropDuplicates()
    w = Window.partitionBy("Id").orderBy(F.col("modifiedon").desc())

    df = df.withWatermark("modifiedon", "1 day").withColumn("rn", F.row_number().over(w)).where(F.col("rn") == 1).drop('rn')
    # final =df.join(agg, on=["id", "modifiedon"], how="right")
    dfUpdates = df.withColumnRenamed("id","BK_id")

    p = re.compile('^BK_')
    list_of_columns = dfUpdates.columns
    list_of_BK_columns = [ s for s in dfUpdates.columns if p.match(s) ]

    string = ''
    for column in list_of_BK_columns:
        string += f'table.{column} = newData.{column} and '

    string_insert = ''
    for column in list_of_BK_columns:
        string_insert += f'table.{column} = newData.{column} and '
    string_insert[:-4]

    dictionary = {}

    for key in list_of_columns:
        dictionary[key] = f'newData.{key}'
    
    print("printing " + cdm + " columns")
    print(dfUpdates.columns)
    deltaTable = DeltaTable.forPath(spark,f"abfss://[email protected]/D365/{cdm}"+"_autoloader_nodups")
    deltaTable.alias('table') \
  .merge(dfUpdates.alias("newData"),
    string
  ) \
  .whenMatchedUpdate(set =
    dictionary
  ) \
  .whenNotMatchedInsert(values =
    dictionary
  ) \
  .execute()

以上功能在自动加载器的

foreachBatch
中使用如下:

for entity in manifest.collect()[0]['entities']:
    cdm = entity.asDict()['name']
    print(cdm)
    schema = StructType()
    length = len(entity.asDict()['attributes']) - 1
    for index1, attribute in enumerate(entity.asDict()['attributes']):
        if (attribute.asDict()['dataType'] in ('int32', 'time')) and (index1 != length):
            field = StructField(attribute.asDict()['name'],IntegerType(),True)
            schema.add(field)
        elif attribute.asDict()['dataType'] in ('dateTime') and (index1 != length):
            field = StructField(attribute.asDict()['name'],TimestampType(),True)
            schema.add(field)
        elif attribute.asDict()['dataType'] in ('string') and (index1 != length):
            field = StructField(attribute.asDict()['name'],StringType(),True)
            schema.add(field)
        elif attribute.asDict()['dataType'] in ('int64') and (index1 != length):
            field = StructField(attribute.asDict()['name'],LongType(),True)
            schema.add(field)
        elif attribute.asDict()['dataType'] in ('decimal') and (index1 != length):
            field = StructField(attribute.asDict()['name'],DecimalType(38, 20),True)
            schema.add(field)
        elif index1 == length:
            field = StructField(attribute.asDict()['name'],StringType(),True)
            schema.add(field)
            LastColumnName = attribute.asDict()['name']
            LastColumnDataType = attribute.asDict()['dataType']
        else:
            field = StructField(attribute.asDict()['name'],StringType(),True)
            schema.add(field)

    # Define variables
    checkpoint_directory = f"abfss://[email protected]/D365/checkpoints/{cdm}"
    data_source = f"abfss://[email protected]/*/{cdm}/*.csv"
    source_format = "csv"
    # Configure Auto Loader to ingest csv data to a Delta table
    print("schema for " + cdm)
    # print(schema)
    df = (
        spark.readStream
        .option("delimiter", ",")
        .option("quote", '"')
        .option("mode", "permissive")
        .option("lineSep", "\r\n")
        .option("multiLine", "true")
        .format("cloudFiles")
        .option("cloudFiles.format", source_format)
        # .option("cloudFiles.schemaLocation", checkpoint_directory)
        .option("cloudFiles.inferColumnTypes","true")
        .option("header", "false")
        .option("escape", '"')
        .schema(schema)
        .load(data_source)
    )
    print("writing " + cdm)
    # print(df.columns)
    df.writeStream.format("delta").foreachBatch(lambda df, epochId: update_insert(df, epochId, cdm)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start()

问题是每个循环都没有按预期工作。我已将打印语句添加到代码中,以查看为哪些表创建了哪些

df

例如:

  1. 首先打印
    print(cdm)
    cdm
    是表的名称),输出为
    msdyn_workorder
  2. 那么应该
    print("schema for " + cdm)
    并且输出是
    schema for msdyn_workorder
  3. 下一个打印是
    print("writing " + cdm
    ,输出是
    writing msdyn_workorder

这就是出错的地方,因为下一个打印应该给出函数内部打印的输出

print("-------------------   " + cdm)
。相反,它的作用是打印下一个表名称
print(cdm)
,即
nrq_customerassetproperty
,因此再次开始 for 循环(我只有两个表,因此
for loop
应该运行两次)。

然后继续相同的打印语句顺序

  1. print("schema for " + cdm)
    ,输出为
    schema for nrq_customerassetproperty
  2. 下一个打印是
    print("writing " + cdm
    并且输出正在写入 nrq_customerassetproperty

在这里它开始打印

def
中的内容,例如:
print("-------------------   " + cdm)
print("schema for " + cdm)
有输出
printing nrq_customerassetproperty columns

随着下一次打印,当我询问

print(dfUpdates.columns)
应该是
df
时,我会在
for each
循环中阅读。它打印前一个
df
的列。在本例中,
msdyn_workorder
的列。

我不知道哪里出了问题。是不是流数据有一些问题

for loop
s?

打印报表的屏幕截图。 请注意,其打印

printing nrq_customerassetproperty columns
,但列确实对应于
msdyn_workorder
表。

python for-loop spark-streaming azure-databricks autoload
1个回答
0
投票

cdm
传递到
foreachBatch
函数中,如下所示。

lambda df, epochId, cdm=cdm: update_insert(df, epochId, cdm)

因为当您在 lambda 中未指定 cdm 的情况下传递时,它将采用 cdm 值 来自外部作用域,这是 lambda 创建时的值。

下面是使用的更新函数。

def update_insert(df, epochId,cdm):
    print(epochId)
    df.show()
    print("------------------- " + cdm)
    print("printing " + cdm + " columns")
    print(df.columns)

我运行了你的

writeStream
代码。

enter image description here

输出:

enter image description here

看起来下一个循环的打印语句最初是因为

foreachBatch
函数针对每批流数据异步且重复地运行。无论
foreachBatch
之外的打印语句都在驱动程序中打印并在那里打印。

以下是输出。

msdyn_workorder
schema for msdyn_workorder
writing msdyn_workorder
2023-09-05T09:03:10.400+0000: [GC (Allocation Failure) [PSYoungGen: 1857533K->64491K(1965056K)] 2088300K->295274K(6238720K), 0.0468967 secs] [Times: user=0.09 sys=0.02, real=0.05 secs] 
Next Df
nrq_customerassetproperty
schema for nrq_customerassetproperty
writing nrq_customerassetproperty
Next Df
2023-09-05 09:03:16,220 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'publicFile.rolling': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05 09:03:16,222 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'privateFile.rolling': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05 09:03:16,223 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'com.databricks.UsageLogging.appender': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05 09:03:16,224 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'com.databricks.ProductLogging.appender': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05 09:03:16,225 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'com.databricks.LineageLogging.appender': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05 09:03:16,226 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'com.databricks.MetricsLogging.appender': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05 09:03:16,227 stream execution thread for [id = c25505ea-7b7f-4c93-a8f0-e3ba1b04336f, runId = e9c861ab-e425-499d-bdca-7940a72a3164] WARN RollingFileAppender 'dltExecution.rolling': The bufferSize is set to 8192 but bufferedIO is not true
2023-09-05T09:03:16.237+0000: [GC (Metadata GC Threshold) [PSYoungGen: 782632K->62922K(1994240K)] 1013415K->293722K(6267904K), 0.0367179 secs] [Times: user=0.09 sys=0.01, real=0.04 secs] 
2023-09-05T09:03:16.274+0000: [Full GC (Metadata GC Threshold) [PSYoungGen: 62922K->0K(1994240K)] [ParOldGen: 230799K->105180K(4273664K)] 293722K->105180K(6267904K), [Metaspace: 254605K->254253K(1290240K)], 0.2592507 secs] [Times: user=0.56 sys=0.01, real=0.25 secs] 
2023-09-05T09:03:21.380+0000: [GC (Allocation Failure) [PSYoungGen: 1843200K->28324K(1985536K)] 1948380K->133525K(6259200K), 0.0179690 secs] [Times: user=0.04 sys=0.00, real=0.02 secs] 
0
-------------------   nrq_customerassetproperty
printing nrq_customerassetproperty columns
['Col4', 'Col5', 'Col6']
0
-------------------   msdyn_workorder
printing msdyn_workorder columns
['Col1', 'Col2', 'Col3']

要使其同步,您需要使用

awaitTermination

df.writeStream.format("delta").foreachBatch(lambda df, epochId, cdm=cdm: update_insert(df, epochId, cdm)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start().awaitTermination()

输出:

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.