在 java 应用程序中使用 kafka 生产者时获得太多 TCP-ESTABLISHED 连接。为什么?有人能告诉我吗?

问题描述 投票:0回答:0

我正在使用 apache kafka-client-3.2.0(也使用最新的 Apache kafka-client-3.4.0 jar)在 java 中创建生产者。我有 5 个主题,每个主题的复制因子 = 1 和分区 = 1,因为我只有 1 个代理。在这 5 个主题中,1 个由 log4j2 中的 KafkaAppender 使用,4 个由计时器任务使用,该任务定期调用生产者以转储这些主题。当我启动应用程序时,tcp 连接数开始增加。发现已经建立了300多个连接,而且还在增加中

kafka-version kafka-2.13_2.8.0, kafka-2.13_3.3.1 and kafka-2.13_3.4.0

以下是我使用的生产者配置:

```Java
***private KafkaProducer<String, String> producer;
    public KafkaProducer<String, String> getProducer() {
            return producer;
        }
    private KafkaProducer<String, String> 
    createAndGetProducer(String acksConfig, int retryConfig){
            Properties props = new Properties();
         
   props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
            props.put(ProducerConfig.ACKS_CONFIG, acksConfig);
            props.put(ProducerConfig.RETRIES_CONFIG, retryConfig);
         
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
         
   props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
            KafkaProducer<String, String> producer = new 
    KafkaProducer<>(props);
        
        return producer;
    }***
```

下面是我从定时器任务调用来生成数据的方法:


    ***public static void writeToTopic(String topicName, String value){
            logger.info("Writing into topic " + topicName);
            
            ProducerRecord <String, String> producerData = new ProducerRecord <String, String> (topicName, value);
            KafkaProducer<String, String> producer = KafkaConnectionManager.getConnection().getProducer();
            
            logger.info(producer.toString());
            KafkaConnectionManager.getConnection().getProducer().send(producerData);
            KafkaConnectionManager.getConnection().getProducer().flush();
        }***

以下是连接:

tcp6       0      0  X.X.X.X:9092       X.X.X.X:59604      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:48358      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:50668      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:59898      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:50626      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:59444      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:49716      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:61049      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:58516      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:51514      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:51892      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:50802      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:47106      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:60084      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:50588      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:50682      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:59814      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:50788      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:48554      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:51390      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:58576      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:50974      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:51504      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:47262      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:60022      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:50558      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:56118      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:51844      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:59712      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:51834      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:50626      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:59668      ESTABLISHED 4369/java

下面是连接管理器代码


    ***private static KafkaConnectionManager kafkaConnectionManager = null;
    
    public synchronized static KafkaConnectionManager getConnection() {     
            return kafkaConnectionManager;
        }
    
    public synchronized static KafkaConnectionManager initialize(Properties kafkaProps, Logger logger, String hostIp) throws KafkaConnectionManagerException {      
            if (kafkaConnectionManager == null) {
                synchronized (KafkaConnectionManager.class) {
                    if (kafkaConnectionManager == null) {
                        kafkaConnectionManager = new KafkaConnectionManager(kafkaProps, logger, hostIp);
                    }
                }
            }
            return kafkaConnectionManager;
        }
    
    private KafkaConnectionManager(Properties kafkaProps, Logger logger, String hostIp) throws KafkaConnectionManagerException{
            if(kafkaProps == null)
            {
                throw new KafkaConnectionManagerException("kafkaProps property is null");
            }
            else if(logger == null)
            {
                throw new KafkaConnectionManagerException("logger is null");
            }
            else if(hostIp == null)
            {
                throw new KafkaConnectionManagerException("hostIp is null, set the hostIp");
            }
            KafkaConnectionManager.kafkaProps = kafkaProps;
            this.logger = logger;
            this.hostIp = hostIp;
            brokers = kafkaProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).trim();
            logger.info("KAFKA brokers!!! = "+brokers);
            this.acksConfig = kafkaProps.getProperty(ProducerConfig.ACKS_CONFIG).trim();
            this.retryConfig = Integer.parseInt(kafkaProps.getProperty(ProducerConfig.RETRIES_CONFIG).trim());
            
            this.producer = createAndGetProducer(acksConfig, retryConfig);
    logger.info("Kafka Producer has been Initialized successfully");
            
        }***

在上面的代码片段中,方法“public synchronized static KafkaConnectionManager initialize”是从 main 方法调用的。


以下是我正在使用的代理配置(上述所有版本都相同)

############################# 服务器基础 ################## ########### broker.id=1 ############################# 套接字服务器设置################## ########### listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://X.X.X.X:9092 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 #############################日志基础################### ########## log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 #############################内部主题设置################## ########### offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 事务.state.log.min.isr=1 #############################日志刷新策略################## ########### log.flush.interval.messages=10000 log.flush.interval.ms=1000 #############################日志保留策略################## ########### log.retention.hours=168 log.retention.check.interval.ms=300000 ########################### 动物园管理员#################### ######### zookeeper.connect=10.64.223.70:2181 zookeeper.connection.timeout.ms=18000 ############################# 组协调器设置 ################## ########### group.initial.rebalance.delay.ms=0 max.connection.per.ip=100 max.connections=100 listener.name.internal.max.connections=100 request.timeout.ms=180000 connections.max.idle.ms=300000


我也在使用 AdminAPI 创建主题。它也是从 main 方法调用的。下面是使用 Admin API 创建主题的代码

***public void createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions createTopicsOptions) {

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        AdminClient adminClient = KafkaAdminClient.create(props);

        adminClient.createTopics(newTopics, createTopicsOptions);
    }***

我尝试了kafka官方网站中提到的所有配置,用于代理配置和生产者配置期望用于身份验证我现在没有使用任何身份验证。

格式不好请见谅

任何建议表示赞赏。

谢谢

apache-kafka kafka-producer-api
© www.soinside.com 2019 - 2024. All rights reserved.