我正在使用 apache kafka-client-3.2.0(也使用最新的 Apache kafka-client-3.4.0 jar)在 java 中创建生产者。我有 5 个主题,每个主题的复制因子 = 1 和分区 = 1,因为我只有 1 个代理。在这 5 个主题中,1 个由 log4j2 中的 KafkaAppender 使用,4 个由计时器任务使用,该任务定期调用生产者以转储这些主题。当我启动应用程序时,tcp 连接数开始增加。发现已经建立了300多个连接,而且还在增加中
kafka-version kafka-2.13_2.8.0, kafka-2.13_3.3.1 and kafka-2.13_3.4.0
以下是我使用的生产者配置:
```Java
***private KafkaProducer<String, String> producer;
public KafkaProducer<String, String> getProducer() {
return producer;
}
private KafkaProducer<String, String>
createAndGetProducer(String acksConfig, int retryConfig){
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ProducerConfig.ACKS_CONFIG, acksConfig);
props.put(ProducerConfig.RETRIES_CONFIG, retryConfig);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new
KafkaProducer<>(props);
return producer;
}***
```
下面是我从定时器任务调用来生成数据的方法:
***public static void writeToTopic(String topicName, String value){
logger.info("Writing into topic " + topicName);
ProducerRecord <String, String> producerData = new ProducerRecord <String, String> (topicName, value);
KafkaProducer<String, String> producer = KafkaConnectionManager.getConnection().getProducer();
logger.info(producer.toString());
KafkaConnectionManager.getConnection().getProducer().send(producerData);
KafkaConnectionManager.getConnection().getProducer().flush();
}***
以下是连接:
tcp6 0 0 X.X.X.X:9092 X.X.X.X:59604 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:48358 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:50668 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:59898 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:50626 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:59444 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:49716 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:61049 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:58516 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:51514 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:51892 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:50802 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:47106 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:60084 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:50588 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:50682 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:59814 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:50788 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:48554 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:51390 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:58576 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:50974 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:51504 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:47262 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:60022 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:50558 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:56118 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:51844 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:59712 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:51834 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:50626 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:59668 ESTABLISHED 4369/java
下面是连接管理器代码
***private static KafkaConnectionManager kafkaConnectionManager = null;
public synchronized static KafkaConnectionManager getConnection() {
return kafkaConnectionManager;
}
public synchronized static KafkaConnectionManager initialize(Properties kafkaProps, Logger logger, String hostIp) throws KafkaConnectionManagerException {
if (kafkaConnectionManager == null) {
synchronized (KafkaConnectionManager.class) {
if (kafkaConnectionManager == null) {
kafkaConnectionManager = new KafkaConnectionManager(kafkaProps, logger, hostIp);
}
}
}
return kafkaConnectionManager;
}
private KafkaConnectionManager(Properties kafkaProps, Logger logger, String hostIp) throws KafkaConnectionManagerException{
if(kafkaProps == null)
{
throw new KafkaConnectionManagerException("kafkaProps property is null");
}
else if(logger == null)
{
throw new KafkaConnectionManagerException("logger is null");
}
else if(hostIp == null)
{
throw new KafkaConnectionManagerException("hostIp is null, set the hostIp");
}
KafkaConnectionManager.kafkaProps = kafkaProps;
this.logger = logger;
this.hostIp = hostIp;
brokers = kafkaProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).trim();
logger.info("KAFKA brokers!!! = "+brokers);
this.acksConfig = kafkaProps.getProperty(ProducerConfig.ACKS_CONFIG).trim();
this.retryConfig = Integer.parseInt(kafkaProps.getProperty(ProducerConfig.RETRIES_CONFIG).trim());
this.producer = createAndGetProducer(acksConfig, retryConfig);
logger.info("Kafka Producer has been Initialized successfully");
}***
在上面的代码片段中,方法“public synchronized static KafkaConnectionManager initialize”是从 main 方法调用的。
以下是我正在使用的代理配置(上述所有版本都相同)
############################# 服务器基础 ################## ########### broker.id=1 ############################# 套接字服务器设置################## ########### listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://X.X.X.X:9092 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 #############################日志基础################### ########## log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 #############################内部主题设置################## ########### offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 事务.state.log.min.isr=1 #############################日志刷新策略################## ########### log.flush.interval.messages=10000 log.flush.interval.ms=1000 #############################日志保留策略################## ########### log.retention.hours=168 log.retention.check.interval.ms=300000 ########################### 动物园管理员#################### ######### zookeeper.connect=10.64.223.70:2181 zookeeper.connection.timeout.ms=18000 ############################# 组协调器设置 ################## ########### group.initial.rebalance.delay.ms=0 max.connection.per.ip=100 max.connections=100 listener.name.internal.max.connections=100 request.timeout.ms=180000 connections.max.idle.ms=300000
我也在使用 AdminAPI 创建主题。它也是从 main 方法调用的。下面是使用 Admin API 创建主题的代码
***public void createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions createTopicsOptions) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = KafkaAdminClient.create(props);
adminClient.createTopics(newTopics, createTopicsOptions);
}***
我尝试了kafka官方网站中提到的所有配置,用于代理配置和生产者配置期望用于身份验证我现在没有使用任何身份验证。
格式不好请见谅
任何建议表示赞赏。
谢谢