如何将Windows中的Kafka(Java)应用程序连接到Linux中的Confluent

问题描述 投票:0回答:1

我正在使用Winscp和Putty在Linux服务器上运行Confluent 5.0。我在Windows中有Kafka(Java / Eclipse)应用程序。

当我运行Java应用程序时,它没有识别在Linux上运行的Confluent中的Kafka代理。

我已经测试了我的Java应用程序,它通过在MAC终端中运行Confluent 5.0将数据发送到MACBook中的Kafka主题。现在我试图在Windows中实现相同的Kafka应用程序。由于Windows不支持Confluent,因此我在Linux服务器上运行。

我正在使用Confluent而不是Apache Kafka,因为我在我的应用程序中使用了Schema-registry。

通过使用netstat -tupln&curl -v http:/ localhost:port no。想出了Kafka在8082上的运行和8081 details of ports上的架构注册表。下面是我在Java应用程序中的Kafka属性。

public static Properties producerProperties() {

    // normal producer
    properties.setProperty("bootstrap.servers", "127.0.0.1:8082");
    properties.setProperty("acks", "all");
    properties.setProperty("retries", "10");
    // avro part
    properties.setProperty("key.serializer", StringSerializer .class.getName());
    properties.setProperty("value.serializer", KafkaAvroSerializer .class.getName());
    properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");

    return properties;

}

public static Properties consumerProperties() {

   // Properties properties = new Properties();
    // normal consumer
    properties.setProperty("bootstrap.servers", "127.0.0.1:8082");
    //different for consumer
    properties.setProperty("group.id", "Avro-consumer");
    properties.setProperty("enable.auto.commit", "false");
    properties.setProperty("auto.offset.reset", "earliest");

    // avro part
    properties.setProperty("key.deserializer", StringDeserializer.class.getName());
    properties.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
    properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
    properties.setProperty("specific.avro.reader", "true");

    return properties;
}

public static Properties streamsProperties() {

    // normal consumer
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "com.github.ptn006");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:8082");
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    return properties;
}

预期:写入Kafka主题的数据。

实际:WARN无法建立与节点-1的连接。经纪人可能无法使用。 (org.apache.kafka.clients.NetworkClient:589)

java apache-kafka confluent
1个回答
1
投票

你需要确保Windows机器的advertised.listeners of the server.properties file in Kafka is resolvable。还要确保防火墙允许访问(netstat -tupln | grep LIST),并查找例如在0.0.0.0上收听的Kafka端口。

© www.soinside.com 2019 - 2024. All rights reserved.