我对使用已经在云上创建的 kafka 生产者和使用 new Producer
创建新生产者与调用现有生产者
连接到已在云上创建的 Kafka 生产者需要配置 Kafka 客户端以连接到远程 Kafka 集群。以下是在简单的 Java 类中使用 Kafka 生产者 API 连接到在云上创建的现有 Kafka 生产者的步骤,无需任何 Spring 等框架:
配置 Kafka Producer 属性:
从云提供商或管理员处获取现有 Kafka 生产者的配置详细信息(例如引导服务器、安全设置)。相应地更新您的属性。
Properties props = new Properties();
props.put("bootstrap.servers", "cloud-kafka-broker1:9092,cloud-kafka-broker2:9092");
// Add other required properties like security settings, serializers, etc.
创建Kafka生产者:
使用 KafkaProducer 类创建具有提供的属性的 Kafka 生产者实例。
Producer<String, String> producer = new KafkaProducer<>(props);
发送消息:
您可以使用
send
方法向云端Kafka集群中的主题发送消息。
ProducerRecord<String, String> record = new ProducerRecord<>("your-topic", "key", "value");
producer.send(record);
关闭生产者:
使用完生产者释放资源后,请确保将其关闭。
producer.close();
完整的 Java 类可能如下所示:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CloudKafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "cloud-kafka-broker1:9092,cloud-kafka-broker2:9092");
// Add other required properties like security settings, serializers, etc.
Producer<String, String> producer = new KafkaProducer<>(props);
try {
ProducerRecord<String, String> record = new ProducerRecord<>("your-topic", "key", "value");
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
确保将
"cloud-kafka-broker1:9092,cloud-kafka-broker2:9092"
和 "your-topic"
分别替换为实际的云 Kafka 代理地址和主题名称。此外,根据云提供商的规范配置其他必要的属性。