@SpringBootApplication
public class SpringKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(SpringKafkaApplication.class, args);
}
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private String ms = "Hello Kafka";
public void sendMessage(String msg) {
kafkaTemplate.send("main", msg);
}
}
@Configuration
public class KafkaTopicConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return new NewTopic("main", 1, (short) 1);
}
}
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
public Map<String,Object> producerConfig() {
Map<String,Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
ProducerFactory<String, String> producerFactory(){
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
application.properties
spring.kafka.bootstrap-servers=localhost:9092
当我启动应用程序时,我无法从 Producer 向 Kafka Broker 发送消息。管理客户端未注册。请帮我解决这个问题。 日志文件如下所示: 卡夫卡提交 ID:5e3c2b738d253ff5 Kafka startTimeMs: 1712071898984 adminclient-1 的应用程序信息 kafka.admin.client 未注册
我编写了一个 Springboot Kafka 集成应用程序,用于将消息从 Springboot 发送到 Kafka 代理,但无法按照未注册的日志中所述发送,并且无法发送消息。我想从 Spring boot 向 Kafka 代理发送消息。
针对本地 Apache Kafka 的最小工作代码应该是这样的:
@SpringBootApplication
public class SpringKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(SpringKafkaApplication.class, args);
}
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private String ms = "Hello Kafka";
public void sendMessage(String msg) {
kafkaTemplate.send("main", msg);
}
@Bean
public NewTopic topic1() {
return new NewTopic("main", 1, (short) 1);
}
}
不确定在哪里调用它
sendMessage()
,但必须在整个应用程序上下文准备就绪时使用它。