我正在学习 Pluralsight 课程“Apache Kafka 入门”,来到关于使用 Kafka 生产者生成消息的模块 4。然后,我想重现作者的演示,其中包括用 Java 创建和运行 Apache Kafka 生产者应用程序。这个想法是完全用 shell 程序完成的事情。集群设置:三个分区,三个代理,复制因子为 3。跨分区没有全局顺序。
从一开始,作者就设定了先决条件:安装了 Linux 操作系统、Java 8 JDK 和 Scala 2.11.x。获得的Kafka版本是2.11-0.10.0.1,这正是我正在使用的版本。此外,技术堆栈是相同的。使用虚拟机,一切顺利,如课程中所示。我从生产者那里编写消息,然后由消费者显示它们。
障碍来到了前面提到的章节。对于新的演示,它介绍并设置了 Apache Kafka 开发环境,添加了自己的依赖项并浏览了 API。因此,除了已经描述的先决条件之外,还需要 Maven 和对测试 Kafka 集群的访问。该项目采用IntelliJ IDEA 2016.2.2,Java 1.8 SDK。最后,应用程序运行并生成消息,并且与之前的演示一样,这些消息也打印在消费者控制台上。
我这边有什么变化?我使用 Windows 10 作为主机,使用 IntelliJ IDEA 2023.1.5 进行编码和执行。另一方面,其余模块在由以下Vagrantfile定义的来宾计算机中运行:
Vagrant.configure("2") do |config|
config.vm.box = "hashicorp/bionic64"
config.vm.network :forwarded_port, guest: 80, host: 8080, id: "http" # Map host's port 8080 to guest's port 80
config.vm.synced_folder "./practice", "/home/vagrant", type: "virtualbox"
end
尽管应用程序从我的主机端输出与 SLF4J 相同的内容,但我的来宾端没有发生任何影响,这意味着消费者仍在等待。这是可重现的代码:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerApp {
public static void main(String[] args) {
// Create a properties dictionary for the required/optional Producer config settings:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> myProducer = new KafkaProducer<String, String>(props);
try {
for (int i = 0; i < 150; i++){
myProducer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i),
"MyMessage: " + Integer.toString(i)));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
myProducer.close();
}
}
}
我做错了什么导致 Kafka 中没有可用消息?两个操作系统之间的有效通信缺少哪些 Vagrant 配置?我仍然尝试根据这篇文章的建议进行相应的修改,但没有成功。另外,我假设 OpenSSL 可能会带来与可执行源相关的安全风险。
在您的 Vagrant 配置中,您将端口转发 80 到 8080:
config.vm.network :forwarded_port, guest: 80, host: 8080, id: "http" # Map host's port 8080 to guest's port 80
但是,Kafka(默认情况下)为客户端公开端口 9092。
因此将 Vagrant 文件的该行更改为:
config.vm.network :forwarded_port, guest: 9092, host: 9092, id: "kafka" # Map host's port 9092 to guest's port 9092
此外,在 Java 代码中,您将
bootstrap.servers
设置为 localhost:9092,localhost:9093
,但端口 9093 并不适合客户端连接。 (更多信息这里)