kafka 中的通用反序列化

问题描述 投票:0回答:3

我在kafka中有两个用于序列化和反序列化的类。序列化工作正常,但我在反序列化方面遇到问题。

我找到了很多解决方案,但没有任何效果。

具有泛型类 T 的解串器

public class DeserializerU<T> implements Deserializer<T> {

@Override
public void configure(Map map, boolean bln) {
}

@Override
public void close() {
}

@Override
public T deserialize(String string, byte[] bytes) {
    ObjectMapper mapper = new ObjectMapper();
    T object = null;
    try {
      object = mapper.readValue(bytes, new TypeReference<T>() {});
    } catch (Exception e) {
        e.printStackTrace();
    }
    return object;
}

序列化器

public class MyObjectSerializer implements Serializer {

@Override
public void configure(Map map, boolean bln) {
}

@Override
public byte[] serialize(String string, Object t) {
    byte[] retVal = null;
    ObjectMapper objectMapper = new ObjectMapper();
    try {
      retVal = objectMapper.writeValueAsString(t).getBytes();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return retVal;
}

@Override
public void close() {   
} 

属性设置反序列化器

Properties props = new Properties();
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new DeserializerU<MyOwnObject>().getClass());

如果我将“TypeRefence(){}”替换为特定类型,则反序列化器可以工作,但我需要对许多对象进行反序列化器。我还尝试了convertValue而不是readValue,但所有内容都返回LinkedHashMap,它无法转换为我的对象。关于如何做到这一点有什么建议吗? 感谢您的帮助

java serialization apache-kafka deserialization
3个回答
0
投票

我的回答可能会晚一点,但是,可以帮助别人。

有了这个,我不需要为每个主题创建一个 ConsumerFactory,并且,如果需要,我可以解析 json。

@KafkaListener(topics = "${topic...}")
public void consume(MyObject message) { ... }

无需为每个对象创建 JsonDeserializer。

我的自定义反序列化器类:

@Component
public class CustomJsonDeserializer implements Deserializer<Object> {

    private final ObjectMapper mapper;

    // This MAP maps the topic to the class that I need to convert.
    private final Map<String, Class<?>> maps; 

    public CustomJsonDeserializer(
            // I can access the application properties that were defined
            final Environment environment,
            final ObjectMapper mapper
    ) {
        this.mapper = mapper;
        maps = new HashMap<>(2);
        maps.put(environment.getProperty("my-topic-1"), MyClass1.class);
        maps.put(environment.getProperty("my-topic-2"), MyClass2.class);

    }

    @Override
    public Object deserialize(String topic, byte[] data) {
        if (Objects.isNull(data) || data.length == 0) {
            return null;
        }
        try {
            return mapper.readValue(data, target);
        } catch (IOException e) {
            // TODO
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

我的KafkaConfiguration类:

@EnableKafka
@Configuration
public class KafkaConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    // You need to create a bean of ObjectMapper, so, spring can inject it into here
    @Autowired
    private CustomJsonDeserializer customJsonDeserializer;

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        final var properties = new HashMap<String, Object>(6);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
        return new DefaultKafkaConsumerFactory<String, Object>(properties, new StringDeserializer(), customJsonDeserializer);
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
        final var factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

然后,在我的消费者身上,我可以写:
我的消费者.java


    @KafkaListener(topics = "${my-topic-1}")
    public void consume(final MyClass1 item) {
        //Do what whatever you want
    }

    @KafkaListener(topics = "${my-topic-2}")
    public void consume(final MyClass2 item) {
        //Do what whatever you want
    }

0
投票

在 KafkaListenerContainerFactory 中使用 JsonMessageConverter 设置消息转换器。看我的配置中没有指定任何自定义类。

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServer;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setMessageConverter(new JsonMessageConverter());
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

0
投票

抱歉我来晚了。希望我的回答可以帮助到你

我假设你的 Kafka 属性是这样的

exampe.kafka.deserializer.mapper =com.example.consumer.model.Example
public class DeserializerU<T> implements Deserializer<T> {

private Class<T> type;

@Override
public void configure(Map map, boolean bln) {
   this.type = (Class) map.get("example.kafka.deserializer.mapper");
}

@Override
public void close() {
}

@Override
public T deserialize(String string, byte[] bytes) {
    ObjectMapper mapper = new ObjectMapper();
    T object = null;
    try {
      object = mapper.readValue(bytes, type);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return object;
}
}
© www.soinside.com 2019 - 2024. All rights reserved.