Spring Data Redis:无法从类型 [org.springframework.data.redis.connection.stream.StreamRecords$ByteMapBackedRecord] 转换为类型

问题描述 投票:0回答:4

当我尝试将消息从 Stream 转换为我的实体时,我一直在努力解决 Spring Redis Streams 的这个异常。我相信这是由于 Redis Stream 的默认反序列化器出现问题而发生的,但不确定如何解决。

当我将此消息发送到 Redis 流时

XADD my-stream * 从 john 到 smith 类型请求

我从我的 Spring Boot 服务中得到了这个异常

org.springframework.core.convert.ConversionFailedException: Failed to convert from type [org.springframework.data.redis.connection.stream.StreamRecords$ByteMapBackedRecord] to type [com.example.messaging.Dto.NotificationMessage] for value 'MapBackedRecord{recordId=1639826917707-0, kvMap={[B@55ab97aa=[B@510458ff, [B@5b7e99c1=[B@4141b49e, [B@12840469=[B@6aa14d0}}'; nested exception is java.lang.IllegalArgumentException: Value must not be null!
    at org.springframework.data.redis.stream.StreamPollTask.convertRecord(StreamPollTask.java:198) ~[spring-data-redis-2.5.6.jar:2.5.6]
    at org.springframework.data.redis.stream.StreamPollTask.deserializeAndEmitRecords(StreamPollTask.java:176) ~[spring-data-redis-2.5.6.jar:2.5.6]
    at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:148) ~[spring-data-redis-2.5.6.jar:2.5.6]
    at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132) ~[spring-data-redis-2.5.6.jar:2.5.6]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: java.lang.IllegalArgumentException: Value must not be null!
    at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.12.jar:5.3.12]
    at org.springframework.data.redis.connection.stream.Record.of(Record.java:81) ~[spring-data-redis-2.5.6.jar:2.5.6]
    at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147) ~[spring-data-redis-2.5.6.jar:2.5.6]
    at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:138) ~[spring-data-redis-2.5.6.jar:2.5.6]
    at org.springframework.data.redis.core.StreamOperations.map(StreamOperations.java:577) ~[spring-data-redis-2.5.6.jar:2.5.6]
    at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getDeserializer$2(DefaultStreamMessageListenerContainer.java:240) ~[spring-data-redis-2.5.6.jar:2.5.6]
    at org.springframework.data.redis.stream.StreamPollTask.convertRecord(StreamPollTask.java:196) ~[spring-data-redis-2.5.6.jar:2.5.6]
    ... 4 common frames omitted

配置类:

@Configuration
public class RedisConfig {

    @Autowired
    private StreamListener<String, ObjectRecord<String, NotificationMessage>> streamListener;

    @Bean
    public Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, NotificationMessage>> options
                = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                .pollTimeout(Duration.ofMillis(1000))
                .targetType(NotificationMessage.class)
                .build();

        StreamMessageListenerContainer<String, ObjectRecord<String, NotificationMessage>> listenerContainer =
                StreamMessageListenerContainer.create(redisConnectionFactory, options);

        Subscription subscription = listenerContainer.receive(StreamOffset.latest("my-stream"), streamListener);

        listenerContainer.start();
        return subscription;
    }
}

通知消息Dto:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class NotificationMessage {
    private String id;
    private String from;
    private String to;
    private String type;
}
java spring spring-boot redis deserialization
4个回答
2
投票

简单代码示例(spring-data-redis:2.6.4):

直播记录:
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.TypeAlias;

@Data
@NoArgsConstructor
@AllArgsConstructor
@TypeAlias("com.pet.streams.User")
public class User {
    private String name;
    private Integer age;
}
流配置:
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;

@Slf4j
@Configuration
public class StreamsConfig {
    private final RedisTemplate<String, Object> redisTemplate;

    public StreamsConfig(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Bean
    public StreamListener<String, ObjectRecord<String, User>> userStreamListener() {
        // handle message from stream
        return message -> log.info(message.toString());
    }

    @Bean
    public Subscription subscription(RedisConnectionFactory redisConnectionFactory, StreamListener<String, ObjectRecord<String, User>> userStreamListener) throws UnknownHostException {
        String streamKey = "streams:users";
        String groupName = "cg";
        createConsumerGroupIfNotExists(streamKey, groupName);
        StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, User>> options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofMillis(100))
                .targetType(User.class)
                .build();
        StreamMessageListenerContainer<String, ObjectRecord<String, User>>  listenerContainer = StreamMessageListenerContainer
                .create(redisConnectionFactory, options);
        StreamMessageListenerContainer.StreamReadRequest<String> readRequest = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset)
                .consumer(Consumer.from(groupName, InetAddress.getLocalHost().getHostName()))
                .cancelOnError((err) -> false)  // do not stop consuming after error
                .errorHandler((err) -> log.error(err.getMessage()))
                .build();
        Subscription subscription = listenerContainer.register(readRequest, userStreamListener);
        listenerContainer.start();
        return subscription;
    }

    private void createConsumerGroupIfNotExists(String streamKey, String groupName){
        try {
            redisTemplate.opsForStream().createGroup(streamKey, groupName);
        }
        catch (RedisSystemException ex){
            log.error(ex.getMessage());
        }
    }
}

测试:

XADD streams:users * _class com.pet.streams.User name Bob age 30
中输入
redis-cli
可将一条记录添加到流中。应该通过正确运行应用程序来消耗(记录)。

输入

XADD streams:users * name Bob age 30
,不带
_class
字段。应该记录异常(如上所述)。

结论:

Spring Data 在将新记录发布到流时自动设置

_class
字段。但如果您使用外部系统将消息发布到 Redis 流,请确保您在记录中提供了
_class
字段。或者覆盖默认映射器。

额外:

检查

DefaultRedisTypeMapper
课程。将其替换为自定义映射器。我没试过,但看起来有可能。


0
投票

所以我遇到了同样的问题,我将一些消息从一个服务序列化到 Redis 中,并在另一个服务中反序列化,它会抱怨即使对象相同(类路径除外),它也无法转换类型。我必须为我的 Redis 侦听器提供具有来自源服务的确切类路径的模型,并且它可以工作。

这里有人使用 typealias 尝试解决同样的问题时遇到了问题: https://github.com/spring-projects/spring-data-redis/issues/1095

我仍在尝试寻找更优雅的解决方案,也许在我的例子中使用 typealias 注释。


0
投票

实例化streammessagelistenercontaineroptions对象时,不要指定目标类型。即删除

". TargetType (notificationmessage. Class)"
。出现这个问题的原因在于redis流的内部类的序列化。


0
投票

您可以使用默认值

ObjectRecord<S,V>
,而不是传递
MapRecord<String, String, String>
。所以实现会是这样的

    public Subscription subscription(RedisConnectionFactory redisConnectionFactory, StreamListener<String, ObjectRecord<String, User>> userStreamListener) throws UnknownHostException {
            String streamKey = "streams:users";
            String groupName = "cg";
            createConsumerGroupIfNotExists(streamKey, groupName);
            StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
            
            //removed the StreamMessageListenerContainerOptions
            
            StreamMessageListenerContainer<String, ObjectRecord<String, User>>  listenerContainer = StreamMessageListenerContainer
                    .create(redisConnectionFactory);
            StreamMessageListenerContainer.StreamReadRequest<String> readRequest = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset)
                    .consumer(Consumer.from(groupName, InetAddress.getLocalHost().getHostName()))
                    .cancelOnError((err) -> false)  // do not stop consuming after error
                    .errorHandler((err) -> log.error(err.getMessage()))
                    .build();
            Subscription subscription = listenerContainer.register(readRequest, userStreamListener);
            listenerContainer.start();
            return subscription;
    }

在处理程序中,您可以使用

Map<String,String>
获取值
Record.getValue()
并将其手动映射到您的对象。您可以使用像
MapStruct
这样的库来轻松开发映射器。

© www.soinside.com 2019 - 2024. All rights reserved.