以下是我的Kafka代理配置
broker.id=1
port=9092
host.name=127.0.0.1
advertised.listeners=PLAINTEXT://127.0.0.1:9092
listeners=PLAINTEXT://127.0.0.1:9092
控制台生产者和消费者工作完美,但是当我尝试通过 java 连接时,它抛出代理不可用错误。但 Kafka Broker 正在运行,并且能够通过控制台生成和消费消息。
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String,String>(props);
producer.send(new ProducerRecord<String, String>("Sample","Hey","From java program"));
producer.close();
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available.
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected
我正在使用STS IDE编写java程序并使用JDK 1.8,kafka 2.8.1 windows 操作系统并使用 ubuntu 20.04.4 LTS 来执行控制台生产者和消费者。
提前致谢!
这个回复为时已晚,但希望这对某人有帮助。
使用 ifconfig 或 ip addr 或主机名 -I 识别 WSL2 的 IP 地址
从 Windows 命令提示符(以管理员身份)运行以下命令
> netsh interface portproxy add v4tov4 listenport=9092 listenaddress=0.0.0.0 connectport=9092 connectaddress=172.X.X.X
172.X.X.X 是 WSL2 的 IP
通过此更改,我能够访问 Kafka 集群。
参考链接
我已经在这里回答了: https://stackoverflow.com/a/76325645/4594452
这是该链接的答案:
这是对我有用的 Apache Kafka 3.4.0:
server.properties
步骤1
在 WSL2 中
config/server.properties
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://[::1]:9092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT
第2步
您还必须告诉 WSL2 更喜欢 IPV4 而不是 IPV6[1],如下所示:
$ sudo vi /etc/gai.conf
#
# scopev4 <mask> <value>
# Add another rule to the RFC 6724 scope table for IPv4 addresses.
# By default the scope IDs described in section 3.2 in RFC 6724 are
# used. Changing these defaults should hardly ever be necessary.
# The defaults are equivalent to:
#
scopev4 ::ffff:169.254.0.0/112 2
scopev4 ::ffff:127.0.0.0/104 2
scopev4 ::ffff:0.0.0.0/96 14
第3步
在 Windows 10 Java 代码中:
hostname -I | cut -d' ' -f1
Properties props = new Properties();
// WSL2 IP = 172.xx.xxx.xx:9092
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.xx.xxx.xx:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> message = new ProducerRecord<>("test-topic", "Hello, World!");
producer.send(message);
} catch (Exception e) {
e.printStackTrace();
}