Kafka Producer在eclipse中不发送消息到主题。

问题描述 投票:0回答:1

我无法从KafkaProducer用java从Windows(Host OS)向运行在Hortonworks Sandbox上的kafka主题发送消息。我的java代码如下

import java.util.Properties;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class Producer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "sandbox-hdp.hortonworks.com:6667");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Future<RecordMetadata> ck = null;
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        try {
            for (int i = 0; i < 1; i++) {
                System.out.println(i);
                ck = kafkaProducer.send(
                        new ProducerRecord<String, String>("kafkatopic", Integer.toString(i), "test message - " + i));
                kafkaProducer.flush();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println(ck.toString());
            // System.out.println(ck.get().toString()); ->gives null
            kafkaProducer.close();
        }
    }
}

当我运行这段java代码时,没有任何错误,它只是打印出消息的索引,在本例中只有0,然后终止,我无法在hortonworks沙盒的cmd界面的console-consumer中看到0。

这就是 pom.xml 依赖性

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.0.1</version>
</dependency>

我添加了本地主机指向沙盒,如下图。这是我的 主机文件 在windows上(主机操作系统)

# Copyright (c) 1993-2009 Microsoft Corp.
#
# This is a sample HOSTS file used by Microsoft TCP/IP for Windows.
#
# This file contains the mappings of IP addresses to host names. Each
# entry should be kept on an individual line. The IP address should
# be placed in the first column followed by the corresponding host name.
# The IP address and the host name should be separated by at least one
# space.
#
# Additionally, comments (such as these) may be inserted on individual
# lines or following the machine name denoted by a '#' symbol.
#
# For example:
#
#      102.54.94.97     rhino.acme.com          # source server
#       38.25.63.10     x.acme.com              # x client host

# localhost name resolution is handled within DNS itself.
#   127.0.0.1       localhost
#   ::1             localhost
127.0.0.1 sandbox-hdp.hortonworks.com

我使用下面这个命令创建了一个名为kafkatopic的主题。

/usr/hdp/3.0.1.0-187/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181  --replication-factor 1 --partitions 1 --topic kafkatopic                                                 

我可以从制作人那里发送消息

[root@sandbox-hdp ~]# /usr/hdp/3.0.1.0-187/kafka/bin/kafka-console-producer.sh --broker-list sandbox-hdp.hortonworks.com:6667 --topic kafkatopic                                                                   
>statement1
>statement2
>statement3
>statement4
>statement5

同时也能在其他标签页中看到来自消费者的信息。

[root@sandbox-hdp ~]# /usr/hdp/3.0.1.0-187/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning                                                                      
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].                     
this is statement1                                                                                                                                                                                                 
this is statement2                                                                                                                                                                                                 
this is statement3                                                                                                                                                                                                 
this is statement4                                                                                                                                                                                                 
this is statement5

我可以在cmd界面上看到消息从生产者发送到消费者的主题,但是我无法从Windows(主机操作系统)的Java外部发送消息到hortonworks沙盒的kafka主题。

这就是 消费者.属性

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092

# consumer group id
group.id=test-consumer-group

# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
#auto.offset.reset=

这就是 生产者.属性

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.producer.ProducerConfig for more details

############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092

# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=

# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=

# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=

# the maximum size of a request in bytes
#max.request.size=

# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=

# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=

这就是 服务器.属性

# Generated by Apache Ambari. Sun May  3 19:25:08 2020

auto.create.topics.enable=true
auto.leader.rebalance.enable=true
compression.type=producer
controlled.shutdown.enable=true
controlled.shutdown.max.retries=3
controlled.shutdown.retry.backoff.ms=5000
controller.message.queue.size=10
controller.socket.timeout.ms=30000
default.replication.factor=1
delete.topic.enable=true
external.kafka.metrics.exclude.prefix=kafka.network.RequestMetrics,kafka.server.DelayedOperationPurgatory,kafka.server.BrokerTopicMetrics.BytesRejectedPerSec
external.kafka.metrics.include.prefix=kafka.network.RequestMetrics.ResponseQueueTimeMs.request.OffsetCommit.98percentile,kafka.network.RequestMetrics.ResponseQueueTimeMs.request.Offsets.95percentile,kafka.network.RequestMetrics.ResponseSendTimeMs.request.Fetch.95percentile,kafka.network.RequestMetrics.RequestsPerSec.request
fetch.purgatory.purge.interval.requests=10000
kafka.ganglia.metrics.group=kafka
kafka.ganglia.metrics.host=localhost
kafka.ganglia.metrics.port=8671
kafka.ganglia.metrics.reporter.enabled=true
kafka.metrics.reporters=
kafka.timeline.metrics.host_in_memory_aggregation=
kafka.timeline.metrics.host_in_memory_aggregation_port=
kafka.timeline.metrics.host_in_memory_aggregation_protocol=
kafka.timeline.metrics.hosts=
kafka.timeline.metrics.maxRowCacheSize=10000
kafka.timeline.metrics.port=
kafka.timeline.metrics.protocol=
kafka.timeline.metrics.reporter.enabled=true
kafka.timeline.metrics.reporter.sendInterval=5900
kafka.timeline.metrics.truststore.password=
kafka.timeline.metrics.truststore.path=
kafka.timeline.metrics.truststore.type=
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=10
listeners=PLAINTEXT://sandbox-hdp.hortonworks.com:6667
log.cleanup.interval.mins=10
log.dirs=/kafka-logs
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.bytes=-1
log.retention.check.interval.ms=600000
log.retention.hours=168
log.roll.hours=168
log.segment.bytes=1073741824
message.max.bytes=1000000
min.insync.replicas=1
num.io.threads=8
num.network.threads=3
num.partitions=1
num.recovery.threads.per.data.dir=1
num.replica.fetchers=1
offset.metadata.max.bytes=4096
offsets.commit.required.acks=-1
offsets.commit.timeout.ms=5000
offsets.load.buffer.size=5242880
offsets.retention.check.interval.ms=600000
offsets.retention.minutes=86400000
offsets.topic.compression.codec=0
offsets.topic.num.partitions=50
offsets.topic.replication.factor=1
offsets.topic.segment.bytes=104857600
port=6667
producer.metrics.enable=false
producer.purgatory.purge.interval.requests=10000
queued.max.requests=500
replica.fetch.max.bytes=1048576
replica.fetch.min.bytes=1
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.lag.max.messages=4000
replica.lag.time.max.ms=10000
replica.socket.receive.buffer.bytes=65536
replica.socket.timeout.ms=30000
sasl.enabled.mechanisms=GSSAPI
sasl.mechanism.inter.broker.protocol=GSSAPI
security.inter.broker.protocol=PLAINTEXT
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
ssl.client.auth=none
ssl.key.password=
ssl.keystore.location=
ssl.keystore.password=
ssl.truststore.location=
ssl.truststore.password=
zookeeper.connect=sandbox-hdp.hortonworks.com:2181
zookeeper.connection.timeout.ms=25000
zookeeper.session.timeout.ms=30000
zookeeper.sync.time.ms=2000

这就是 zookeeper.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

我在下面附上了kafka的端口、配置和属性。

enter image description here

java apache-kafka kafka-producer-api hortonworks-data-platform hortonworks-sandbox
1个回答
2
投票

kafkaProducer.send 实际上是将消息添加到缓冲区内存中并立即返回,之后生产者为了提高效率,分批发送消息

producer由一个缓冲空间池组成,里面存放着尚未传送到服务器的记录,以及一个后台IO线程,负责将这些记录变成请求并传送到集群。如果使用后没有关闭生产者,就会泄露这些资源。

send()方法是异步的。当被调用时,它将记录添加到待发送记录的缓冲区中,并立即返回。这使得生产者可以将各个记录批处理在一起,提高效率。

kafkaProducer.flush 你可以用 flush 缓冲存储器中立即可用的记录

调用该方法会使所有缓冲记录立即可以发送(即使linger.ms大于0),并阻止与这些记录相关联的请求的完成。flush()的后置条件是任何之前发送的记录都将已经完成(例如Future.isDone() == true)。当一个请求根据你指定的acks配置被成功确认时,它就被认为是完成了,否则就会导致一个错误。

© www.soinside.com 2019 - 2024. All rights reserved.