Apache Flink-无法将本地Kinesis用于FlinkKinesisConsumer

问题描述 投票:7回答:1

到目前为止,我已经按照Flink的kinesis连接器记录的说明使用本地Kinesis。

Using Non-AWS Kinesis Endpoints for Testing

Properties producerConfig = new Properties();
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");

[对于Flink生产者,这些说明适用于局部运动(我使用Kinesalite)。

但是,对于Flink使用者,我得到一个例外,不允许aws.regionaws.endpoint both。但是区域是必需的,这意味着无法覆盖端点。

org.apache.flink.client.program.ProgramInvocationException:导致错误的主要方法:对于FlinkKinesisConsumer,必须在配置中设置AWS区域('aws.region')或AWS端点('aws.endpoint')。

这是连接器中的错误吗?我看到一个相关的PR:https://github.com/apache/flink/pull/6045

[我找到了workaround on Flink's mailing list,但他们将其描述为生产者而非消费者的问题,而我认为相反(我认为),因此不确定。真是令人困惑。

java apache-flink amazon-kinesis
1个回答
-1
投票

问题与验证检查中的XOR条件有关。如您所见,validateConsumerConfiguration方法在if语句中执行XOR验证。因此,您只能指定选中的两个参数之一。enter image description here

要设置自定义URL,您需要删除AWSConfigConstants.AWS_REGION属性并仅使用链接。

// Set the given URL
consumerConfig.put(AWSConfigConstants.AWS_ENDPOINT, URL);
// Remove the region
consumerConfig.remove(AWSConfigConstants.AWS_REGION);

此解决方案,修复与以下StackTrace相关的错误:

java.lang.IllegalArgumentException: For FlinkKinesisConsumer either AWS region ('aws.region') or AWS endpoint ('aws.endpoint') must be set in the config.

at org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.validateConsumerConfiguration(KinesisConfigUtil.java:92)

© www.soinside.com 2019 - 2024. All rights reserved.