知道我可能做错了什么吗? 这是我正在使用的课程:
@Configuration
public class KafkaConfig {
@Bean
public NewTopic javaTopic() {
return TopicBuilder.name("java").build();
}
}
@Service
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
private KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
LOGGER.info(String.format("Message sent %s", message));
kafkaTemplate.send("java", message);
}
}
@Service
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "java", groupId = "myGroup")
public void consume(String message) {
try {
LOGGER.info(String.format("Message received -> %s", message));
} catch (Exception e) {
LOGGER.error("Error while processing message", e);
}
}
}
@RestController
@RequestMapping(value = "/api/kafka", produces = MediaType.APPLICATION_JSON_VALUE)
public class KafkaController {
private KafkaProducer kafkaProducer;
public KafkaController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@GetMapping("/publish")
public ResponseEntity<String> publish(@RequestParam("message") String message) {
kafkaProducer.sendMessage(message);
return ResponseEntity.ok("Message sent to the topic");
}
}
当我向 /GET 发出请求时,我只在控制台中收到消息
2023-09-02T22:24:45.679-03:00 INFO 17424 --- [nio-8080-exec-3] c.bruno.musicrank.kafka.KafkaProducer : Message sent teste
此外,如果我使用命令查看消费者的记录器,我通常可以看到我在端点上发送的值
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic java --partition 0 --from-beginning
这是我的应用程序.yml
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: myGroup
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
您需要刷新模板生成器。
您应该在同花后登录。否则,您只是“发送”到内存缓冲区,消费者无法访问该缓冲区,因为该数据实际上尚未在代理中