我在kafka中有两个用于序列化和反序列化的类。序列化工作正常,但我在反序列化方面遇到问题。
我找到了很多解决方案,但没有任何效果。
具有泛型类 T 的解串器
public class DeserializerU<T> implements Deserializer<T> {
@Override
public void configure(Map map, boolean bln) {
}
@Override
public void close() {
}
@Override
public T deserialize(String string, byte[] bytes) {
ObjectMapper mapper = new ObjectMapper();
T object = null;
try {
object = mapper.readValue(bytes, new TypeReference<T>() {});
} catch (Exception e) {
e.printStackTrace();
}
return object;
}
序列化器
public class MyObjectSerializer implements Serializer {
@Override
public void configure(Map map, boolean bln) {
}
@Override
public byte[] serialize(String string, Object t) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
retVal = objectMapper.writeValueAsString(t).getBytes();
} catch (Exception e) {
e.printStackTrace();
}
return retVal;
}
@Override
public void close() {
}
属性设置反序列化器
Properties props = new Properties();
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new DeserializerU<MyOwnObject>().getClass());
如果我将“TypeRefence(){}”替换为特定类型,则反序列化器可以工作,但我需要对许多对象进行反序列化器。我还尝试了convertValue而不是readValue,但所有内容都返回LinkedHashMap,它无法转换为我的对象。关于如何做到这一点有什么建议吗? 感谢您的帮助
我的回答可能会晚一点,但是,可以帮助别人。
有了这个,我不需要为每个主题创建一个 ConsumerFactory,并且,如果需要,我可以解析 json。
@KafkaListener(topics = "${topic...}")
public void consume(MyObject message) { ... }
无需为每个对象创建 JsonDeserializer。
我的自定义反序列化器类:
@Component
public class CustomJsonDeserializer implements Deserializer<Object> {
private final ObjectMapper mapper;
// This MAP maps the topic to the class that I need to convert.
private final Map<String, Class<?>> maps;
public CustomJsonDeserializer(
// I can access the application properties that were defined
final Environment environment,
final ObjectMapper mapper
) {
this.mapper = mapper;
maps = new HashMap<>(2);
maps.put(environment.getProperty("my-topic-1"), MyClass1.class);
maps.put(environment.getProperty("my-topic-2"), MyClass2.class);
}
@Override
public Object deserialize(String topic, byte[] data) {
if (Objects.isNull(data) || data.length == 0) {
return null;
}
try {
return mapper.readValue(data, target);
} catch (IOException e) {
// TODO
e.printStackTrace();
throw new RuntimeException(e);
}
}
我的KafkaConfiguration类:
@EnableKafka
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
// You need to create a bean of ObjectMapper, so, spring can inject it into here
@Autowired
private CustomJsonDeserializer customJsonDeserializer;
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
final var properties = new HashMap<String, Object>(6);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
return new DefaultKafkaConsumerFactory<String, Object>(properties, new StringDeserializer(), customJsonDeserializer);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
final var factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
然后,在我的消费者身上,我可以写:
我的消费者.java
@KafkaListener(topics = "${my-topic-1}")
public void consume(final MyClass1 item) {
//Do what whatever you want
}
@KafkaListener(topics = "${my-topic-2}")
public void consume(final MyClass2 item) {
//Do what whatever you want
}
在 KafkaListenerContainerFactory 中使用 JsonMessageConverter 设置消息转换器。看我的配置中没有指定任何自定义类。
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServer;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setMessageConverter(new JsonMessageConverter());
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
抱歉我来晚了。希望我的回答可以帮助到你
我假设你的 Kafka 属性是这样的
exampe.kafka.deserializer.mapper =com.example.consumer.model.Example
public class DeserializerU<T> implements Deserializer<T> {
private Class<T> type;
@Override
public void configure(Map map, boolean bln) {
this.type = (Class) map.get("example.kafka.deserializer.mapper");
}
@Override
public void close() {
}
@Override
public T deserialize(String string, byte[] bytes) {
ObjectMapper mapper = new ObjectMapper();
T object = null;
try {
object = mapper.readValue(bytes, type);
} catch (Exception e) {
e.printStackTrace();
}
return object;
}
}