我正在使用以下代码使用Spring Data Redis使用者组来使用Redis流,但是即使我已注释掉accept命令,但在服务器重新启动后我的消息也不会重新读取。
我希望,如果我不确认该消息,则应在服务器被终止并重新启动后重新读取该消息。我在这里想念什么?
@Bean
@Autowired
public StreamMessageListenerContainer eventStreamPersistenceListenerContainerTwo(RedisConnectionFactory streamRedisConnectionFactory, RedisTemplate streamRedisTemplate) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder().pollTimeout(Duration.ofMillis(100)).build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(streamRedisConnectionFactory,
containerOptions);
container.receive(Consumer.from("my-group", "my-consumer"),
StreamOffset.create("event-stream", ReadOffset.latest()),
message -> {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
//streamRedisTemplate.opsForStream().acknowledge("my-group", message);
});
container.start();
return container;
}
阅读有关流的工作方式的Redis文档之后,我想到了以下内容来自动处理所有未确认但先前传递给使用者的消息: