如何使嵌入式Kafka规则不显示两个测试产生的数据

问题描述 投票:0回答:1

我正在尝试在Junit中设置集成测试。它正在使用Kafka作为Spring,它是EmbeddedKafkaRule。

我在Junit中有两个测试,每个测试将产生不同的数据集。

但是,当我在Idea IDE中读取两个测试时,我注意到第一个测试仅运行第一个测试的数据,而第二个测试同时运行第一个和第二个数据。无论测试顺序如何,都会发生这种情况,并且单独运行每个测试时不会发生这种情况。

这是他在设置嵌入式kafka规则时所做的配置。

private KafkaTemplate<Integer, String> producer;

public static final String TEST_TOPIC = "test-topic";

KafkaMessageListenerContainer container;

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true,
        TEST_TOPIC);
    private LinkedBlockingQueue<Object> records;


@Before
public void setupKafka(){
    Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
    ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
    producer = new KafkaTemplate<>(pf);

    producer.setDefaultTopic(TEST_TOPIC);

    waitForConsumerToStart();
}

private void waitForConsumerToStart() {

    Map<String, Object> consumerProperties =
        KafkaTestUtils.consumerProps("test", "false",
            embeddedKafka.getEmbeddedKafka());
    // create a Kafka consumer factory
    DefaultKafkaConsumerFactory<String, String> consumerFactory =
        new DefaultKafkaConsumerFactory<String, String>(
            consumerProperties);
    // set the topic that needs to be consumed
    ContainerProperties containerProperties =
        new ContainerProperties(TEST_TOPIC);

    // create a Kafka MessageListenerContainer
    container = new KafkaMessageListenerContainer<>(consumerFactory,
        containerProperties);

    // create a thread safe queue to store the received message
    records = new LinkedBlockingQueue<>();

    // setup a Kafka message listener
    container
        .setupMessageListener(new MessageListener<String, String>() {
            @Override
            public void onMessage(
                ConsumerRecord<String, String> record) {
                System.out.println("test-listener received message='{}' " +
                    record.toString());
                records.add(record);
            }
        });

    // start the container and underlying message listener
    container.start();


    // wait until the container has the required number of assigned partitions
    ContainerTestUtils.waitForAssignment(container,
        embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());

}
intellij-idea apache-kafka spring-kafka
1个回答
0
投票

最简单的解决方案是对每个测试使用不同的主题

© www.soinside.com 2019 - 2024. All rights reserved.