我能够成功运行我的flink工作,使用./bin/flink run ...
保存到S3。
为了实现这一点,我必须将flink-s3-fs-presto jar复制到我的$FLINK_HOME/lib
文件夹中,我还必须在我的flink-conf.yaml
中配置我的S3连接细节:
你需要在Flink的flink-conf.yaml中配置s3.access-key和s3.secret-key:
s3.access-key: your-access-key s3.secret-key: your-secret-key
资料来源:flink aws docs
我还必须设置属性s3.endpoint
,因为我正在使用IBM Cloud中的S3。
当我使用./bin/flink run
运行时,一切正常。
但是,当我尝试从我的IDE(IntelliJ)运行我的工作时,我收到以下错误:
org.apache.flink.runtime.client.JobExecutionException:无法初始化任务'DataSink(TextOutputFormat(s3:// xxxx / folder) - UTF-8)':无法从服务端点加载凭据
我在IDE运行作业中设置了一个环境变量,FLINK_CONF_DIR
指向我的flink-conf.yaml,我可以看到我的配置属性被选中:
11:04:39,487 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: s3.access-key, ****
11:04:39,487 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: s3.secret-key, ****
11:04:39,487 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: s3.endpoint, s3-api.us-geo.objectstorage.softlayer.net
但是,当我从IDE运行时,我收到一条错误,表明这些属性没有传递给presto库:
Caused by: org.apache.flink.fs.s3presto.shaded.com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
另外,为了验证这个理论,如果我在从IDE运行时单步执行代码,我可以看到我的端点属性未应用:
...并钻进Hadoop配置,我可以看到flink配置是一个空映射:
深入挖掘,我可以看到org.apache.flink.core.fs.FileSystem#getUnguardedFileSystem()
正在创建一个新的空配置:
// this "default" initialization makes sure that the FileSystem class works
// even when not configured with an explicit Flink configuration, like on
// JobManager or TaskManager setup
if (FS_FACTORIES.isEmpty()) {
initialize(new Configuration());
}
从IDE运行时,如何配置s3.access-key
,s3.secret-key
和s3.endpoint
属性?
创建core-site.xml
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value>xxxx</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>xxxxx</value>
</property>
</configuration>
并添加
Map par = new HashMap();par.put("fs.hdfs.hadoopconf", "path to core-site.xml";
ParameterTool pt = ParameterTool.fromMap(par);
env.getConfig().setGlobalJobParameters(pt);
只需致电
FileSystem.initialize(GlobalConfiguration.loadConfiguration(System.getenv("FLINK_CONF_DIR")));
之前
env.execute()
将解决问题。
请记住,您仍然必须将您的密钥和访问密钥放在flink-conf.yaml中。