[我看到了一些示例,其中可以使用喷气机使用CSV文件。
BatchSource<SalesRecordLine> source = Sources.filesBuilder(sourceDir)
.glob("*.csv")
.build(path -> Files.lines(path).skip(1).map(SalesRecordLine::parse));
在多节点设置中,所有节点都将开始提取文件(例如共享NFS)还是采用智能锁定(例如Apache Camel的幂等文件使用方方法?)。 Jet在读取之前如何知道文件已完全刷新到磁盘?
谢谢
如果使用NFS,则将sharedFileSystem
属性设置为true
:
BatchSource<SalesRecordLine> source = Sources.filesBuilder(sourceDir)
.glob("*.csv")
.sharedFileSystem(true)
.build(path -> Files.lines(path).skip(1).map(SalesRecordLine::parse));
从方法javadoc:
设置文件是否在所有成员都可见的共享存储中。默认值是假的。如果sharedFileSystem为true,Jet将承担所有成员看到相同的文件。他们将分工,以便每个人成员将读取文件的一部分。如果sharedFileSystem为false,每个成员将读取目录中的所有文件,假设本地。
Jet没有办法知道何时由第三方清除文件。对于BatchSource,您需要在开始作业之前确保它们已经存在。
如果您想以仅追加方式运行作业时接收更新,则可能要使用FileSourceBuilder#buildWatcher
而不是build
,但这仅处理新行。