我正在运行一个测试系统,该系统产生一个Kinesis生产商,该生产商开始将消息(例如:1到100)写入带有两个分片的流中。
在此周期中,消费者开始从流中读取消息。我注意到,使用者在运行后仅读取流中的LATEST
消息。因此,例如,它开始读取消息43。我试图修改Worker.class以使用TRIM_HORIZON
策略,但这似乎不起作用。
KinesisClientLibConfiguration c = new KinesisClientLibConfiguration("MediaPlan", "randeepstream",
DefaultAWSCredentialsProviderChain.getInstance(),
"consumer1")
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
final Worker w = new Worker.Builder()
.recordProcessorFactory(rpf)
.config(kinesisConfig)
.build();
new Thread(() -> w.run()).start();
我的消费者的处理器设置为:
public class ConsumerRecordProcessorImpl implements IRecordProcessor {
public void initialize(InitializationInput initializationInput) {
log.info("Setting up consumer with shard {} starting at {}", initializationInput.getShardId(),
initializationInput.getExtendedSequenceNumber());
}
public void processRecords(ProcessRecordsInput processRecordsInput) {
...
}
}
我希望看到这样的消息:Setting up consumer with shard shardId-000000000000 starting at TRIM_HORIZON 0
但是我得到了:Setting up consumer with shard shardId-000000000000 starting at LATEST 0
如何让我的消费者停止阅读最新消息并阅读所有未处理的消息?
这里是使用amazon-kinesis-client lib v2的示例您将必须使用Schedular(software.amazon.kinesis.coordinator),它会在后台读取记录并为该计划提供Retival配置,如下所示
RetrievalConfig retrievalConfig = setRetrivalConfig();
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
retrievalConfig);
private RetrievalConfig setRetrivalConfig(){
InitialPositionInStreamExtended initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
retrievalConfig.initialPositionInStreamExtended(initialPositionInStreamExtended);
return retrievalConfig;
}
注意InitialPositionInStream.TRIM_HORIZON
,这将通知时间表在上一个已知位置之后开始消耗记录。因此,即使消费者关闭并且生产者仍在运行,在消费者停机期间产生的所有记录也将被消耗。
注意:configBuilder是ConfigsBuilder(software.amazon.kinesis.common)的对象