我正在尝试将Flink作业(在EMR上运行v1.8)从使用BucketingSink过渡到较新的StreamingFileSink。
我已经运行了新代码,几乎所有内容都看起来不错。文件被写入S3并转换为完整文件。唯一的问题是S3的ACL设置不像旧代码那样。
我的core-site.xml
设置如下:>
<configuration> <property> <name>fs.s3a.acl.default</name> <value>BucketOwnerFullControl</value> </property> </configuration>
我还将
s3a://
用作StreamingFileSink构建器的forRowFormat()
参数中路径的前缀。
而且,当切换到StreamingFileSink时,我必须向build.gradle添加新的依赖项
flinkShadowJar "org.apache.flink:flink-s3-fs-hadoop:${flinkVersion}"
我对使用BucketingSink api时不使用此jar的s3a://前缀如何写S3不清楚。我现在正在以某种方式写S3的方式不尊重我的core-site.xml设置。
我正在尝试将Flink作业(在EMR上运行v1.8)从使用BucketingSink过渡到较新的StreamingFileSink。我已经运行了新代码,几乎所有内容都看起来不错。文件是...