我使用 Apache Flink 进行批处理模式文件处理。最初,我将 CSV 文件读入自定义对象 DataSet
现在,我的目标是使用条件语句来确定所有 CSV 文件记录是否有效。但是,由于某些限制:
例如。
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
String fileName = myFile.csv
DataSet<MyObject> readCsvData = readAndProcessCsvData(env, fileName);
DataSet<MyObject> validateFile = validateFile(readCsvData);
if(toCheckIsValidFlag){
//********** process further ***********
generateAckFile();
}
else
{
generateNckFile();
}
env.execute();
MyObject 是
public class MyObject{
private FileDetails fileDetails;
private String fileName;
public boolean isValid ;
public String invalidReason;
}
我在执行环境(批处理模式)中操作,限制使用 .collect() 或 .count() 等急切函数。 直接设置标志是不可行的,因为自定义代码在作业运行后执行。 如果不使用 .collect() 或 Collector() 等运算符,则无法从 DataSet 中检索 MyObject。 因此,我正在寻找一种方法来检查 readCsvData 中的所有记录是否有效(readCsvData.isValid == true)。基于这个条件,我打算进行进一步的转换并接收数据,或者将所有数据定向到文件接收器。
使用ager函数时显示错误。
例外:
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. Please make sure your program doesn't call an eager execution function [collect, print, printToErr, count].
由于您需要处理所有记录来确定下游操作将执行的操作,因此您有两种选择...
请注意,如果您只需要分离数据,则可以使用带有存储桶的 DataStream API 将有效记录与无效记录写入不同位置。