Kafka打开的文件太多

问题描述 投票:1回答:1

当我把更多的记录推送到队列中时,我得到了以下异常,我的总实例已经停止。环境操作系统:AIXServer。Jboss 7.2 kafka producer代码

private Map<String, Object> producerConfig(String brokerAddress) {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.LINGER_MS_CONFIG, ”1”);
        properties.put(ProducerConfig.RETRIES_CONFIG,”0”);
        properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,”30000”);
        return properties;
    }

    private KafkaTemplate<String, String> createTemplate(String brokerAddress) {
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig(brokerAddress),
                new StringSerializer(), new StringSerializer()));
}

异常。

java.net.SocketException: Too many open files
at sun.nio.ch.Net.socket0(Native Method)
at sun.nio.ch.Net.socket(Net.java:411)
at sun.nio.ch.Net.socket(Net.java:404)
at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:105)
at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
at java.nio.channels.SocketChannel.open(SocketChannel.java:145)
at org.apache.kafka.common.network.Selector.connect(Selector.java:211)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864)
at org.apache.kafka.clients.NetworkClient.access$700(NetworkClient.java:64)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1035)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:920)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:508)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)

当我不断地推送消息到队列时,我收到以下警告

异常跟踪。

Error registering AppInfo mbean: javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-238
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.jboss.as.jmx.PluggableMBeanServerImpl$TcclMBeanServer.registerMBean(PluggableMBeanServerImpl.java:1499)
at org.jboss.as.jmx.PluggableMBeanServerImpl.registerMBean(PluggableMBeanServerImpl.java:871)
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:451)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:308)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:295)
at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:410)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:346)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:184)
at com.fss.cms.integration.api.dao.impl.APIIntegrationDaoImpl.sendToggleMessage(APIIntegrationDaoImpl.java:220)
at com.fss.cms.integration.api.dao.impl.APIIntegrationDaoImpl.getFailureMessage(APIIntegrationDaoImpl.java:131)
at com.fss.cms.integration.api.service.impl.APIIntegrationServiceImpl.reSendMessage(APIIntegrationServiceImpl.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.fss.cms.scheduler.job.SchedulerJobCall.execute(SchedulerJobCall.java:48)
at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
java spring apache-kafka jboss spring-kafka
1个回答
© www.soinside.com 2019 - 2024. All rights reserved.