将已处理的文件从 flink 中的一个文件夹移动到另一个文件夹

问题描述 投票:0回答:3

我是 flink 的新手,在解决以下用例方面面临一些挑战

用例描述:

我将在输入的某个文件夹中每天收到一个带有时间戳的 csv 文件。文件格式为 file_name_dd-mm-yy-hh-mm-ss.csv。

现在我的 flink 管道将逐行读取此 csv 文件,并将其写入我的 Kafka 主题。

数据读取完成后,需要立即将此文件移至另一个文件夹历史文件夹。

为什么我需要这个是因为:假设你的ververica服务器突然或手动停止,并且如果所有已处理的文件都位于同一位置,那么在ververica重新启动后,flink将重新读取它之前处理过的所有文件。因此,为了防止这种情况,这些文件需要立即将已读取的文件移动到另一个位置。

我用谷歌搜索了很多,但没有找到任何东西,所以你能指导我实现这一目标吗?

如果还需要什么,请告诉我。

java apache-flink batch-processing flink-streaming flink-batch
3个回答
2
投票

开箱即用的 Flink 提供了监视目录中新文件并读取它们的功能 - 通过

StreamExecutionEnvironment.getExecutionEnvironment.readFile
(请参阅类似的堆栈溢出线程示例 - 如何读取 Flink 目录中新添加的文件 / 监视目录使用 Flink 用于数据流的新文件等)

查看

readFile
函数的源代码,它调用了createFileInput()方法,该方法简单地实例化
ContinuousFileMonitoringFunction
ContinuousFileReaderOperatorFactory
并配置源 -

addSource(monitoringFunction, sourceName, null, boundedness)
                        .transform("Split Reader: " + sourceName, typeInfo, factory);

ContinouslyFileMonitoringFunction实际上是大部分逻辑发生的地方。

因此,如果我要实现您的要求,我会用我自己的逻辑扩展

ContinuousFileMonitoringFunction
的功能,将处理后的文件移动到历史文件夹中,并从此函数构建源代码。

鉴于

run
方法在
checkpointLock
-

内部执行读取和转发
synchronized (checkpointLock) {
    monitorDirAndForwardSplits(fileSystem, context);
}

我想说,移动到检查点完成文件上的历史文件夹是安全的,这些文件的修改日期早于

globalModificationTime
,它在
monitorDirAndForwardSplits
中在分割收集时更新。

也就是说,我将扩展

ContinuousFileMonitoringFunction
类并实现
CheckpointListener
接口,并在
notifyCheckpointComplete
中将已处理的文件移动到历史文件夹:

public class ArchivingContinuousFileMonitoringFunction<OUT> extends ContinuousFileMonitoringFunction<OUT> implements CheckpointListener {
  ...

   @Override
   public void notifyCheckpointComplete(long checkpointId) throws Exception {
          Map<Path, FileStatus> eligibleFiles = listEligibleForArchiveFiles(fs, new Path(path));
        // do move logic
     }

   /**
     * Returns the paths of the files already processed.
     *
     * @param fileSystem The filesystem where the monitored directory resides.
     */
    private Map<Path, FileStatus> listEligibleForArchiveFiles(FileSystem fileSystem, Path path) {

        final FileStatus[] statuses;
        try {
            statuses = fileSystem.listStatus(path);
        } catch (IOException e) {
            // we may run into an IOException if files are moved while listing their status
            // delay the check for eligible files in this case
            return Collections.emptyMap();
        }

        if (statuses == null) {
            LOG.warn("Path does not exist: {}", path);
            return Collections.emptyMap();
        } else {
            Map<Path, FileStatus> files = new HashMap<>();
            // handle the new files
            for (FileStatus status : statuses) {
                if (!status.isDir()) {
                    Path filePath = status.getPath();
                    long modificationTime = status.getModificationTime();
                    if (shouldIgnore(filePath, modificationTime)) {
                        files.put(filePath, status);
                    }
                } else if (format.getNestedFileEnumeration() && format.acceptFile(status)) {
                    files.putAll(listEligibleForArchiveFiles(fileSystem, status.getPath()));
                }
            }
            return files;
        }
    }
}

然后使用自定义函数手动定义数据流:

ContinuousFileMonitoringFunction<OUT> monitoringFunction =
                new ArchivingContinuousFileMonitoringFunction <>(
                        inputFormat, monitoringMode, getParallelism(), interval);

ContinuousFileReaderOperatorFactory<OUT, TimestampedFileInputSplit> factory = new ContinuousFileReaderOperatorFactory<>(inputFormat);

final Boundedness boundedness = Boundedness.CONTINUOUS_UNBOUNDED;

env.addSource(monitoringFunction, sourceName, null, boundedness)
                        .transform("Split Reader: " + sourceName, typeInfo, factory);

0
投票

Flink 本身并没有提供这样做的解决方案。您可能需要自己构建一些东西,或者找到一个可以配置为处理此问题的工作流程工具。

您可以在 flink 用户邮件列表上询问此问题。我知道其他人已经编写了脚本来执行此操作;也许有人可以分享解决方案。


0
投票

嗨,米卡莱·卢什奇茨基,

如果您能提供执行相同用例的 Flink 代码,我们将不胜感激。

© www.soinside.com 2019 - 2024. All rights reserved.