这是使用 spring 控制器向 Kafka 发出 Web 请求的示例:
@RestController
class KafkaProducerController(val kafkaTemplate: KafkaTemplate<String, String>) {
@PutMapping("/track")
fun sendTopicCallback(@RequestBody body: String): CompletableFuture<ResponseEntity<String>> {
return kafkaTemplate.send("mytopic", body.toString())
.thenApply { ResponseEntity.ok(it.toString()) }
}
}
问题是 - 在这个例子中如何使用线程?
据我了解,spring 将为每个 HTTP 请求固定一个线程。每个线程都会将 HTTP 正文放入 Kafka 模板。一段时间后(或消息量?)“某人”(例如另一个线程)会将消息批量刷新到 Kafka 主题。但是发送消息的请求和线程过多是怎么回事呢?可以设置多线程批量发送消息吗?
是的,事实上
KafkaProducer
javadocs 建议使用单个生产者。
但是,手动冲洗会导致全部阻塞;最好让生产者内部管理批次以避免此类问题。