带有反应式 redis 缓存的 Webflux

问题描述 投票:0回答:0

我有一个

CacheConfig
,我已经在其中设置了
ReactiveRedisConnectionFactory
,我可以连接到Redis。但是,当我尝试将任何内容保存到缓存中时,由于

Caused by: java.lang.IllegalArgumentException: DefaultSerializer requires a Serializable payload but received an object of type [reactor.core.publisher.MonoFlatMapMany]
    at org.springframework.core.serializer.DefaultSerializer.serialize(DefaultSerializer.java:43) ~[spring-core-6.0.7.jar:6.0.7]
    at org.springframework.core.serializer.Serializer.serializeToByteArray(Serializer.java:56) ~[spring-core-6.0.7.jar:6.0.7]
    at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:60) ~[spring-core-6.0.7.jar:6.0.7]

我认为问题是我无法序列化 Flux/Mono 对象,所以我尝试编写一个自定义序列化程序来自己手动处理这个问题,但我认为如果不调用我就无法做到这一点

 .block()
Flux
对象上,这样我就可以将它们的内容放入
byte[]
.

我的缓存配置:

@EnableCaching
@Configuration
@RequiredArgsConstructor
public class CacheConfig implements CachingConfigurer {

    private final ObjectMapper objectMapper;

    @Bean
    public LettuceConnectionFactory redisConnectionFactory() {
        LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
                .commandTimeout(Duration.ofSeconds(2))
                .shutdownTimeout(Duration.ZERO)
                .build();

        RedisStandaloneConfiguration serverConfig = new RedisStandaloneConfiguration("localhost", 6379);

        return new LettuceConnectionFactory(serverConfig, clientConfig);
    }

    @Bean
    public CacheManager cacheManager() {
        RedisCacheConfiguration cacheConfig = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofMinutes(5))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new ReactiveRedisSerializer<>(objectMapper)));

        return RedisCacheManager.builder(redisConnectionFactory())
                .cacheDefaults(cacheConfig)
                .build();
    }

    @Override
    public CacheResolver cacheResolver() {
        return new SimpleCacheResolver(cacheManager());
    }
}

我为

ReactiveRedisSerializer
得到的代码是阻塞的,会抛出 ``

@Slf4j
@RequiredArgsConstructor
public class ReactiveRedisSerializer<T> implements RedisSerializer<T> {

    private final ObjectMapper objectMapper;

    @Override
       @Override
    public byte[] serialize(T t) throws SerializationException {
        if (t == null) {
            return null;
        }

        if (t instanceof Mono<?> mono) {
            try {
                return objectMapper.writeValueAsBytes(mono.toFuture().get());
            } catch (JsonProcessingException | InterruptedException | ExecutionException e) {
                throw new SerializationException("Failed to serialize Mono", e);
            }
        }

        if (t instanceof Flux<?> flux) {
            try {
                List<?> list = flux.collectList().block(); // How to avoid doing this?
                return objectMapper.writeValueAsBytes(list);
            } catch (JsonProcessingException e) {
                throw new SerializationException("Failed to serialize Flux", e);
            }
        }

        try {
            return objectMapper.writeValueAsString(t).getBytes(StandardCharsets.UTF_8);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Failed to serialize object", e);
        }
    }


    @Override
    public T deserialize(byte[] bytes) throws SerializationException {
        if (bytes == null) {
            return null;
        }

        try {
            return (T) objectMapper.readValue(bytes, Object.class);
        } catch (Exception e) {
            throw new SerializationException("Failed to deserialize object", e);
        }
    }
}

感觉这不是正确的方法,那么这里是否有受支持的“开箱即用”方法?

java redis spring-webflux project-reactor
© www.soinside.com 2019 - 2024. All rights reserved.