如何使用Spring XML触发Kafka Consumer?

问题描述 投票:0回答:1

我开发了一个 Spring XML + Kafka 示例。在这个例子中,我想了解如何触发它的消费者端。我不想为消费者使用 main 方法。

MainApp.java

public class MainApp {
    private static final Faker FAKER = Faker.instance().instance();


    public static void main(String[] args) throws InterruptedException {
        ApplicationContext context = new ClassPathXmlApplicationContext("/context.xml");
        EmployeeProducer employeeProducer = (EmployeeProducer) context.getBean("employeeProducer");

        Employee employee = Employee.builder()
                .empId(ThreadLocalRandom.current().nextInt(1,100))
                .firstName(FAKER.name().firstName())
                .lastName(FAKER.name().lastName())
                .gender(getGender())
                .build();
        System.out.println("Employee : "+ employee);

        employeeProducer.sendMessage("t-employee", employee);


        Thread.sleep(500_000_000);
    }

    private static String getGender(){
        int ramdomN = ThreadLocalRandom.current().nextInt(0,1);
        String sex;
        if (ramdomN == 0) {
            sex = "M";
        } else {
            sex = "F";
        }
        return sex;
    }

KafkaProducer.java

public class EmployeeProducer {

    public void sendMessage(String topicName, Employee employee) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfigs.applicationID);
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());

        KafkaProducer<String, Employee> kafkaProducer = new KafkaProducer<>(properties);

        kafkaProducer.send(new ProducerRecord<>(topicName, employee));
        kafkaProducer.close();
    }
}

KafkaConsumer.java

public class EmployeeConsumer {

    public void consumeMessage() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(JsonDeserializer.VALUE_CLASS_NAME_CONFIG, Employee.class);

        KafkaConsumer<String, Employee> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(List.of(AppConfigs.topicName));

        while (true) {
            ConsumerRecords<String, Employee> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, Employee> record : records) {
                System.out.println("Key: " + record.key() + ", Value:" + record.value());
                System.out.println("Partition:" + record.partition() + ",Offset:" + record.offset());
            }
        }
    }
}

上下文.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd">

    <bean id="employeeProducer" class="com.example.EmployeeProducer" />
    <bean id="employeeConsumer" class="com.example.EmployeeConsumer" />

<!--    <bean class="org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor" />

    <bean class="org.springframework.kafka.config.KafkaListenerEndpointRegistry"/>-->
</beans>

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>spring-conflient-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.6.2</version>
        </dependency>
        <dependency>
            <groupId>com.github.javafaker</groupId>
            <artifactId>javafaker</artifactId>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
            <version>2.15.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.19.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.28</version>
        </dependency>
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.13.4</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>6.0.18</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>6.0.18</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>6.0.18</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>3.1.4</version>
        </dependency>
    </dependencies>

</project>
spring apache-kafka spring-context
1个回答
0
投票

好像没有调用consumeMessage方法,尝试在sendMessage后面加上

new Thread(() -> {employeeConsumer.consumeMessage(statuses);}).start();

© www.soinside.com 2019 - 2024. All rights reserved.