AWS Kinesis Client Java:在Stream中设置TRIM_HORIZON位置不起作用

问题描述 投票:0回答:1

我正在运行一个测试系统,该系统产生一个Kinesis生产商,该生产商开始将消息(例如:1到100)写入带有两个分片的流中。

在此周期中,消费者开始从流中读取消息。我注意到,使用者在运行后仅读取流中的LATEST消息。因此,例如,它开始读取消息43。我试图修改Worker.class以使用TRIM_HORIZON策略,但这似乎不起作用。

KinesisClientLibConfiguration c = new KinesisClientLibConfiguration("MediaPlan", "randeepstream",
    DefaultAWSCredentialsProviderChain.getInstance(),
    "consumer1")
    .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
final Worker w = new Worker.Builder()
    .recordProcessorFactory(rpf)
    .config(kinesisConfig)
    .build();
new Thread(() -> w.run()).start();

我的消费者的处理器设置为:

public class ConsumerRecordProcessorImpl implements IRecordProcessor {

    public void initialize(InitializationInput initializationInput) {
        log.info("Setting up consumer with shard {} starting at {}", initializationInput.getShardId(),
                initializationInput.getExtendedSequenceNumber());
    }

    public void processRecords(ProcessRecordsInput processRecordsInput) {
        ...
    }
}

我希望看到这样的消息:Setting up consumer with shard shardId-000000000000 starting at TRIM_HORIZON 0但是我得到了:Setting up consumer with shard shardId-000000000000 starting at LATEST 0

如何让我的消费者停止阅读最新消息并阅读所有未处理的消息?

java amazon-kinesis amazon-kinesis-client
1个回答
0
投票

这里是使用amazon-kinesis-client lib v2的示例您将必须使用Schedular(software.amazon.kinesis.coordinator),它会在后台读取记录并为该计划提供Retival配置,如下所示

RetrievalConfig retrievalConfig = setRetrivalConfig();

    Scheduler scheduler = new Scheduler(
        configsBuilder.checkpointConfig(),
        configsBuilder.coordinatorConfig(),
        configsBuilder.leaseManagementConfig(),
        configsBuilder.lifecycleConfig(),
        configsBuilder.metricsConfig(),
        configsBuilder.processorConfig(),
        retrievalConfig);

private RetrievalConfig setRetrivalConfig(){
    InitialPositionInStreamExtended initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
    RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
    retrievalConfig.initialPositionInStreamExtended(initialPositionInStreamExtended);
    return retrievalConfig;
  }

注意InitialPositionInStream.TRIM_HORIZON,这将通知时间表在上一个已知位置之后开始消耗记录。因此,即使消费者关闭并且生产者仍在运行,在消费者停机期间产生的所有记录也将被消耗。

注意:configBuilder是ConfigsBuilder(software.amazon.kinesis.common)的对象

© www.soinside.com 2019 - 2024. All rights reserved.