我需要使用Kafka主题中的JSON并将其转换为实体,因此可以将Postgres保存到数据库中,以防万一
消费者配置:
@Configuration
@EnableKafka
public class ConsumerConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
properties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "Kafka");
return properties;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(String.class));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public EventConsumer consumer() {
return new EventConsumer();
}
}
消费类:
@Slf4j
@Data
public class EventConsumer {
private EntidadeComercialRepository entidadeComercialRepository;
private CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(topics = "${topic.entidades_comerciais}")
public void kafkaConsumer(@Payload EntidadesComerciais entidadesComerciais) {
entidadeComercialRepository.save(entidadesComerciais);
log.info("received payload='{}'", entidadesComerciais);
latch.countDown();
}
}
部分实体类
@lombok.Data
@NoArgsConstructor
@AllArgsConstructor
@Entity(name = "ENTIDADES_COMERCIAIS")
@Builder
public class EntidadesComerciais {
@Id
@Column(name = "CODIGO_DA_ENTIDADE_COMERCIAL")
private Long codigoDaEntidadeComercial;
@Column(name = "NOME_DA_ENTIDADE_COMERCIAL")
private String nomeDaEntidadeComercial;
@Column(name = "NOME_COMERCIAL")
private String nomeComercial;
@Column(name = "TIPO_DA_ENTIDADE_COMERCIAL")
private String tipoDaEntidadeComercial;
...
我正在使用Spring Kafka和Postgres。程序启动时,我收到此异常:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void br.com.dchristofolli.kafka.EventConsumer.kafkaConsumer(br.com.dchristofolli.kafka.entity.EntidadesComerciais)]
Bean [EventConsumer(entidadeComercialRepository=null, latch=java.util.concurrent.CountDownLatch@144e36ae[Count = 1])]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [br.com.dchristofolli.kafka.entity.EntidadesComerciais] for GenericMessage [payload={"topic":"ENTIDADES_COMERCIAIS"...
我需要知道我做错了什么,或者是否需要修改我的代码的任何部分以能够转换实体ENTIDADES_COMERCIAIS
中收到的有效载荷,谢谢任何能帮助我的人
我的猜测是您收到一个EntidadesComerciais
的JSONArray。在这种情况下,您可能需要更改
public void kafkaConsumer(@Payload EntidadesComerciais entidadesComerciais)
到
public void kafkaConsumer(@Payload List<EntidadesComerciais> entidadesComerciais)