[当您首次连接到kafka节点时,它将返回您所有的kafka节点和要连接的网址。然后,您的应用程序将尝试直接连接到每个kafka。
问题始终是kafka将为您提供的url吗?这就是为什么卡夫卡会使用KAFKA_ADVERTISED_LISTENERS
来告诉世界如何访问它的原因。
[我在本地计算机上设置了一个单节点Kafka Docker容器,如the Confluent documentation中所述(步骤2-3)。
此外,我还公开了Zookeeper的端口2181和Kafka的端口9092,以便能够从在本地计算机上运行的客户端连接到它们:
$ docker run -d \
-p 2181:2181 \
--net=confluent \
--name=zookeeper \
-e ZOOKEEPER_CLIENT_PORT=2181 \
confluentinc/cp-zookeeper:4.1.0
$ docker run -d \
--net=confluent \
--name=kafka \
-p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka:4.1.0
问题:当我尝试从主机连接到Kafka时,连接失败,因为它是can't resolve address: kafka:9092
。
这是我的Java代码:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "KafkaExampleProducer");
props.put("key.serializer", LongSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<Long, String> producer = new KafkaProducer<>(props);
ProducerRecord<Long, String> record = new ProducerRecord<>("foo", 1L, "Test 1");
producer.send(record).get();
producer.flush();
例外:
java.io.IOException: Can't resolve address: kafka:9092
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.common.network.Selector.connect(Selector.java:214) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864) [kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:265) [kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:266) [kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) [kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:176) [kafka-clients-2.0.0.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
Caused by: java.nio.channels.UnresolvedAddressException: null
at sun.nio.ch.Net.checkAddress(Net.java:101) ~[na:1.8.0_144]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[na:1.8.0_144]
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233) ~[kafka-clients-2.0.0.jar:na]
... 7 common frames omitted
问题:如何连接到在Docker中运行的Kafka?我的代码是从主机而不是Docker运行的。
[注意:我知道从理论上讲我可以使用DNS设置和/etc/hosts
,但这是一种解决方法-它不应该那样。
也存在类似的问题here,但是它基于ches/kafka
图像。我使用基于confluentinc
的图像,但并不相同。
Disclaimer:以下使用
confluentinc
泊坞图像,而不是wurstmeister/kafka
,尽管there is a similar configuration我没有尝试过。如果使用该图像,则read their Connectivity wiki。没有任何内容与
wurstmeister
图像相对应,但它是由社区维护的,不是内置于自动CI / CD发行版中的。[
debezium/kafka
文档上的are mentioned here。注意:不推荐使用公告的主机和端口设置。广告听众涵盖了两个对于
bitnami
Kafka图像,refer their README一天结束时,都是相同的Apache Kafka
在容器中运行。您仅依赖它的配置方式。而哪个变量就是这样。
spotify/kafka
已弃用和过时。fast-data-dev
非常适合一个解决方案,但它很肿]]有关补充阅读和网络图,请参阅this blog by @rmoff
Confluent快速入门文档假定所有生产和消费请求都将在Docker网络内。
您可以通过在自己的容器中运行Kafka客户端代码来解决此问题,但否则,您需要添加更多环境变量以在外部公开该容器,同时使其仍在Docker网络中运行。
首先添加
PLAINTEXT_HOST:PLAINTEXT
的协议映射,它将侦听器协议映射到Kafka协议
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
然后在不同的端口上设置两个播发的侦听器。 (kafka:9092
这里是指Docker容器的名称)。注意协议匹配上面的映射的右侧值
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
运行容器时,为主机端口映射添加-p 29092:29092
tl; dr
(具有以上设置)在运行任何Kafka客户端外部
Docker网络时(包括您可能已在本地安装的CLI工具),对于引导服务器使用localhost:29092
,对于Zookeeper使用localhost:2181
[在Docker网络中运行应用程序在Docker网络中
,对于引导服务器使用kafka:9092
,对于Zookeeper使用zookeeper:2181
See the example Compose file for the full Confluent stack
对于对Kubernetes部署感兴趣的人:https://operatorhub.io/?keyword=Kafka
[当您首次连接到kafka节点时,它将返回您所有的kafka节点和要连接的网址。然后,您的应用程序将尝试直接连接到每个kafka。
问题始终是kafka将为您提供的url吗?这就是为什么卡夫卡会使用KAFKA_ADVERTISED_LISTENERS
来告诉世界如何访问它的原因。
现在为您的用例,需要考虑许多小问题:
假设您设置了plaintext://kafka:9092
kafka
的URL,该URL可通过docker网络解析。 kafka
名称无法解析。==>要解决此问题,您需要拥有一台特定的DNS服务器,例如服务发现服务器,但对于小巧的东西来说是一大麻烦。或者您在每个kafka
/etc/hosts
名称设置为容器ip如果设置plaintext://localhost:9092
==>如果您有此想法,并希望在另一个容器中使用kafka客户端,解决此问题的一种方法是为两个容器(相同ip)共享网络]
最后选项:在名称中设置IP:plaintext://x.y.z.a:9092
每个人都可以...但是如何获得x.y.z.a名称?
唯一的方法是在启动容器时对这个IP进行硬编码:docker run .... --net confluent --ip 10.x.y.z ...
。请注意,您需要使IP适应confluent
子网中的一个有效IP。
[当您首次连接到kafka节点时,它将返回您所有的kafka节点和要连接的网址。然后,您的应用程序将尝试直接连接到每个kafka。
问题始终是kafka将为您提供的url吗?这就是为什么卡夫卡会使用KAFKA_ADVERTISED_LISTENERS
来告诉世界如何访问它的原因。