如何将在我的Spring Kafka使用者上收到的JSON转换为准备保存在银行中的实体?

问题描述 投票:1回答:1

我需要使用Kafka主题中的JSON并将其转换为实体,因此可以将Postgres保存到数据库中,以防万一

消费者配置:

@Configuration
@EnableKafka
public class ConsumerConfiguration {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        properties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "Kafka");
        return properties;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
                new StringDeserializer(),
                new JsonDeserializer<>(String.class));
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public EventConsumer consumer() {
        return new EventConsumer();
    }
}

消费类:

@Slf4j
@Data
public class EventConsumer {
    private EntidadeComercialRepository entidadeComercialRepository;
    private CountDownLatch latch = new CountDownLatch(1);

    @KafkaListener(topics = "${topic.entidades_comerciais}")
    public void kafkaConsumer(@Payload EntidadesComerciais entidadesComerciais) {
        entidadeComercialRepository.save(entidadesComerciais);
        log.info("received payload='{}'", entidadesComerciais);
        latch.countDown();
    }
}

部分实体类

@lombok.Data
@NoArgsConstructor
@AllArgsConstructor
@Entity(name = "ENTIDADES_COMERCIAIS")
@Builder
public class EntidadesComerciais {
    @Id
    @Column(name = "CODIGO_DA_ENTIDADE_COMERCIAL")
    private Long codigoDaEntidadeComercial;

    @Column(name = "NOME_DA_ENTIDADE_COMERCIAL")
    private String nomeDaEntidadeComercial;

    @Column(name = "NOME_COMERCIAL")
    private String nomeComercial;

    @Column(name = "TIPO_DA_ENTIDADE_COMERCIAL")
    private String tipoDaEntidadeComercial;
    ...

我正在使用Spring Kafka和Postgres。程序启动时,我收到此异常:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void br.com.dchristofolli.kafka.EventConsumer.kafkaConsumer(br.com.dchristofolli.kafka.entity.EntidadesComerciais)]
Bean [EventConsumer(entidadeComercialRepository=null, latch=java.util.concurrent.CountDownLatch@144e36ae[Count = 1])]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [br.com.dchristofolli.kafka.entity.EntidadesComerciais] for GenericMessage [payload={"topic":"ENTIDADES_COMERCIAIS"...

我需要知道我做错了什么,或者是否需要修改我的代码的任何部分以能够转换实体ENTIDADES_COMERCIAIS中收到的有效载荷,谢谢任何能帮助我的人

java apache-kafka spring-kafka
1个回答
0
投票

我的猜测是您收到一个EntidadesComerciais的JSONArray。在这种情况下,您可能需要更改

public void kafkaConsumer(@Payload EntidadesComerciais entidadesComerciais)

public void kafkaConsumer(@Payload List<EntidadesComerciais> entidadesComerciais)
© www.soinside.com 2019 - 2024. All rights reserved.