当我尝试将消息从 Stream 转换为我的实体时,我一直在努力解决 Spring Redis Streams 的这个异常。我相信这是由于 Redis Stream 的默认反序列化器出现问题而发生的,但不确定如何解决。
当我将此消息发送到 Redis 流时
XADD my-stream * 从 john 到 smith 类型请求
我从我的 Spring Boot 服务中得到了这个异常
org.springframework.core.convert.ConversionFailedException: Failed to convert from type [org.springframework.data.redis.connection.stream.StreamRecords$ByteMapBackedRecord] to type [com.example.messaging.Dto.NotificationMessage] for value 'MapBackedRecord{recordId=1639826917707-0, kvMap={[B@55ab97aa=[B@510458ff, [B@5b7e99c1=[B@4141b49e, [B@12840469=[B@6aa14d0}}'; nested exception is java.lang.IllegalArgumentException: Value must not be null!
at org.springframework.data.redis.stream.StreamPollTask.convertRecord(StreamPollTask.java:198) ~[spring-data-redis-2.5.6.jar:2.5.6]
at org.springframework.data.redis.stream.StreamPollTask.deserializeAndEmitRecords(StreamPollTask.java:176) ~[spring-data-redis-2.5.6.jar:2.5.6]
at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:148) ~[spring-data-redis-2.5.6.jar:2.5.6]
at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132) ~[spring-data-redis-2.5.6.jar:2.5.6]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: java.lang.IllegalArgumentException: Value must not be null!
at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.12.jar:5.3.12]
at org.springframework.data.redis.connection.stream.Record.of(Record.java:81) ~[spring-data-redis-2.5.6.jar:2.5.6]
at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147) ~[spring-data-redis-2.5.6.jar:2.5.6]
at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:138) ~[spring-data-redis-2.5.6.jar:2.5.6]
at org.springframework.data.redis.core.StreamOperations.map(StreamOperations.java:577) ~[spring-data-redis-2.5.6.jar:2.5.6]
at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getDeserializer$2(DefaultStreamMessageListenerContainer.java:240) ~[spring-data-redis-2.5.6.jar:2.5.6]
at org.springframework.data.redis.stream.StreamPollTask.convertRecord(StreamPollTask.java:196) ~[spring-data-redis-2.5.6.jar:2.5.6]
... 4 common frames omitted
配置类:
@Configuration
public class RedisConfig {
@Autowired
private StreamListener<String, ObjectRecord<String, NotificationMessage>> streamListener;
@Bean
public Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, NotificationMessage>> options
= StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.pollTimeout(Duration.ofMillis(1000))
.targetType(NotificationMessage.class)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, NotificationMessage>> listenerContainer =
StreamMessageListenerContainer.create(redisConnectionFactory, options);
Subscription subscription = listenerContainer.receive(StreamOffset.latest("my-stream"), streamListener);
listenerContainer.start();
return subscription;
}
}
通知消息Dto:
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class NotificationMessage {
private String id;
private String from;
private String to;
private String type;
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.TypeAlias;
@Data
@NoArgsConstructor
@AllArgsConstructor
@TypeAlias("com.pet.streams.User")
public class User {
private String name;
private Integer age;
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
@Slf4j
@Configuration
public class StreamsConfig {
private final RedisTemplate<String, Object> redisTemplate;
public StreamsConfig(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Bean
public StreamListener<String, ObjectRecord<String, User>> userStreamListener() {
// handle message from stream
return message -> log.info(message.toString());
}
@Bean
public Subscription subscription(RedisConnectionFactory redisConnectionFactory, StreamListener<String, ObjectRecord<String, User>> userStreamListener) throws UnknownHostException {
String streamKey = "streams:users";
String groupName = "cg";
createConsumerGroupIfNotExists(streamKey, groupName);
StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, User>> options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(100))
.targetType(User.class)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, User>> listenerContainer = StreamMessageListenerContainer
.create(redisConnectionFactory, options);
StreamMessageListenerContainer.StreamReadRequest<String> readRequest = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset)
.consumer(Consumer.from(groupName, InetAddress.getLocalHost().getHostName()))
.cancelOnError((err) -> false) // do not stop consuming after error
.errorHandler((err) -> log.error(err.getMessage()))
.build();
Subscription subscription = listenerContainer.register(readRequest, userStreamListener);
listenerContainer.start();
return subscription;
}
private void createConsumerGroupIfNotExists(String streamKey, String groupName){
try {
redisTemplate.opsForStream().createGroup(streamKey, groupName);
}
catch (RedisSystemException ex){
log.error(ex.getMessage());
}
}
}
在
XADD streams:users * _class com.pet.streams.User name Bob age 30
中输入 redis-cli
可将一条记录添加到流中。应该通过正确运行应用程序来消耗(记录)。
输入
XADD streams:users * name Bob age 30
,不带 _class
字段。应该记录异常(如上所述)。
Spring Data 在将新记录发布到流时自动设置
_class
字段。但如果您使用外部系统将消息发布到 Redis 流,请确保您在记录中提供了 _class
字段。或者覆盖默认映射器。
检查
DefaultRedisTypeMapper
课程。将其替换为自定义映射器。我没试过,但看起来有可能。
所以我遇到了同样的问题,我将一些消息从一个服务序列化到 Redis 中,并在另一个服务中反序列化,它会抱怨即使对象相同(类路径除外),它也无法转换类型。我必须为我的 Redis 侦听器提供具有来自源服务的确切类路径的模型,并且它可以工作。
这里有人使用 typealias 尝试解决同样的问题时遇到了问题: https://github.com/spring-projects/spring-data-redis/issues/1095
我仍在尝试寻找更优雅的解决方案,也许在我的例子中使用 typealias 注释。
实例化streammessagelistenercontaineroptions对象时,不要指定目标类型。即删除
". TargetType (notificationmessage. Class)"
。出现这个问题的原因在于redis流的内部类的序列化。
您可以使用默认值
ObjectRecord<S,V>
,而不是传递 MapRecord<String, String, String>
。所以实现会是这样的
public Subscription subscription(RedisConnectionFactory redisConnectionFactory, StreamListener<String, ObjectRecord<String, User>> userStreamListener) throws UnknownHostException {
String streamKey = "streams:users";
String groupName = "cg";
createConsumerGroupIfNotExists(streamKey, groupName);
StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
//removed the StreamMessageListenerContainerOptions
StreamMessageListenerContainer<String, ObjectRecord<String, User>> listenerContainer = StreamMessageListenerContainer
.create(redisConnectionFactory);
StreamMessageListenerContainer.StreamReadRequest<String> readRequest = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset)
.consumer(Consumer.from(groupName, InetAddress.getLocalHost().getHostName()))
.cancelOnError((err) -> false) // do not stop consuming after error
.errorHandler((err) -> log.error(err.getMessage()))
.build();
Subscription subscription = listenerContainer.register(readRequest, userStreamListener);
listenerContainer.start();
return subscription;
}
在处理程序中,您可以使用
Map<String,String>
获取值 Record.getValue()
并将其手动映射到您的对象。您可以使用像MapStruct
这样的库来轻松开发映射器。