使用数据集约束在 Apache Flink 的批处理模式下处理条件处理

问题描述 投票:0回答:1

我使用 Apache Flink 进行批处理模式文件处理。最初,我将 CSV 文件读入自定义对象 DataSet readCsvData。随后,我对数据执行了各种验证并更新了 readCsvData 中的 isValid 标志。例如,将 isValid 设置为 true 表示有效记录,设置 false 表示无效记录。

现在,我的目标是使用条件语句来确定所有 CSV 文件记录是否有效。但是,由于某些限制:

例如。

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
String fileName = myFile.csv
DataSet<MyObject> readCsvData = readAndProcessCsvData(env, fileName);

DataSet<MyObject> validateFile = validateFile(readCsvData);

if(toCheckIsValidFlag){
    //********** process further ***********
    generateAckFile();
}
else
{
    generateNckFile();
}

env.execute();

MyObject 是

public class MyObject{

    private FileDetails fileDetails;
    private String fileName;
    public boolean isValid ;
    public String invalidReason;

}

我在执行环境(批处理模式)中操作,限制使用 .collect() 或 .count() 等急切函数。 直接设置标志是不可行的,因为自定义代码在作业运行后执行。 如果不使用 .collect() 或 Collector() 等运算符,则无法从 DataSet 中检索 MyObject。 因此,我正在寻找一种方法来检查 readCsvData 中的所有记录是否有效(readCsvData.isValid == true)。基于这个条件,我打算进行进一步的转换并接收数据,或者将所有数据定向到文件接收器。

使用ager函数时显示错误。

例外:

Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. Please make sure your program doesn't call an eager execution function [collect, print, printToErr, count].
java apache-flink batch-processing
1个回答
0
投票

由于您需要处理所有记录来确定下游操作将执行的操作,因此您有两种选择...

  1. 将其作为两个作业运行,其中第一个作业有一个无效记录数计数器,并将所有结果写入临时存储(最好采用比 CSV 更有效的格式,例如 Parquet)。然后第二个作业可以检查计数器并决定如何处理数据。
  2. 缓冲所有数据,这(使用 DataSet API)意味着默认使用内存。然后,您可以根据所有记录的有效标志推迟要做什么的决定。但这显然会对任何一个文件的大小造成内存限制。

请注意,如果您只需要分离数据,则可以使用带有存储桶的 DataStream API 将有效记录与无效记录写入不同位置。

© www.soinside.com 2019 - 2024. All rights reserved.