我正在使用Winscp和Putty在Linux服务器上运行Confluent 5.0。我在Windows中有Kafka(Java / Eclipse)应用程序。
当我运行Java应用程序时,它没有识别在Linux上运行的Confluent中的Kafka代理。
我已经测试了我的Java应用程序,它通过在MAC终端中运行Confluent 5.0将数据发送到MACBook中的Kafka主题。现在我试图在Windows中实现相同的Kafka应用程序。由于Windows不支持Confluent,因此我在Linux服务器上运行。
我正在使用Confluent而不是Apache Kafka,因为我在我的应用程序中使用了Schema-registry。
通过使用netstat -tupln&curl -v http:/ localhost:port no。想出了Kafka在8082上的运行和8081 details of ports上的架构注册表。下面是我在Java应用程序中的Kafka属性。
public static Properties producerProperties() {
// normal producer
properties.setProperty("bootstrap.servers", "127.0.0.1:8082");
properties.setProperty("acks", "all");
properties.setProperty("retries", "10");
// avro part
properties.setProperty("key.serializer", StringSerializer .class.getName());
properties.setProperty("value.serializer", KafkaAvroSerializer .class.getName());
properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
return properties;
}
public static Properties consumerProperties() {
// Properties properties = new Properties();
// normal consumer
properties.setProperty("bootstrap.servers", "127.0.0.1:8082");
//different for consumer
properties.setProperty("group.id", "Avro-consumer");
properties.setProperty("enable.auto.commit", "false");
properties.setProperty("auto.offset.reset", "earliest");
// avro part
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
properties.setProperty("specific.avro.reader", "true");
return properties;
}
public static Properties streamsProperties() {
// normal consumer
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "com.github.ptn006");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:8082");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return properties;
}
预期:写入Kafka主题的数据。
实际:WARN无法建立与节点-1的连接。经纪人可能无法使用。 (org.apache.kafka.clients.NetworkClient:589)
你需要确保Windows机器的advertised.listeners
of the server.properties
file in Kafka is resolvable。还要确保防火墙允许访问(netstat -tupln | grep LIST
),并查找例如在0.0.0.0
上收听的Kafka端口。