KafkaTemplate.send() 既不返回也不抛出异常

问题描述 投票:0回答:2

我正在使用 SpringKafka(Springboot 版本 3.1.7)将消息发布到 Azure EventHub 命名空间。生产者应用程序也部署在 Azure 中。直到上周一切都运行良好,但上周应用程序突然停止发布消息。我在控制台中没有收到任何错误。我在控制台中拥有的全部内容如下。

2024-04-02T20:13:22.114622379Z: [INFO]  2024-04-02T20:13:22.114Z  INFO 44 --- [nPool-worker-17] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
2024-04-02T20:13:22.114640679Z: [INFO]      acks = 1
2024-04-02T20:13:22.114646579Z: [INFO]      auto.include.jmx.reporter = true
2024-04-02T20:13:22.114843884Z: [INFO]      batch.size = 16384
2024-04-02T20:13:22.114850884Z: [INFO]      bootstrap.servers = [xxxxxxx-eastus.servicebus.windows.net:9093]
2024-04-02T20:13:22.114855484Z: [INFO]      buffer.memory = 33554432
2024-04-02T20:13:22.114860584Z: [INFO]      client.dns.lookup = use_all_dns_ips
2024-04-02T20:13:22.114864684Z: [INFO]      client.id = producer-1022
2024-04-02T20:13:22.114868784Z: [INFO]      compression.type = none
2024-04-02T20:13:22.114873085Z: [INFO]      connections.max.idle.ms = 180000
2024-04-02T20:13:22.114877185Z: [INFO]      delivery.timeout.ms = 120000
2024-04-02T20:13:22.114881385Z: [INFO]      enable.idempotence = false
2024-04-02T20:13:22.114885385Z: [INFO]      interceptor.classes = []
2024-04-02T20:13:22.114889585Z: [INFO]      key.serializer = class org.apache.kafka.common.serialization.StringSerializer
2024-04-02T20:13:22.114893585Z: [INFO]      linger.ms = 500
2024-04-02T20:13:22.114897585Z: [INFO]      max.block.ms = 60000
2024-04-02T20:13:22.114901485Z: [INFO]      max.in.flight.requests.per.connection = 5
2024-04-02T20:13:22.114905685Z: [INFO]      max.request.size = 1000000
2024-04-02T20:13:22.114909685Z: [INFO]      metadata.max.age.ms = 180000
2024-04-02T20:13:22.114913586Z: [INFO]      metadata.max.idle.ms = 300000
2024-04-02T20:13:22.114917486Z: [INFO]      metric.reporters = []
2024-04-02T20:13:22.114923886Z: [INFO]      metrics.num.samples = 2
2024-04-02T20:13:22.114940986Z: [INFO]      metrics.recording.level = INFO
2024-04-02T20:13:22.114945486Z: [INFO]      metrics.sample.window.ms = 30000
2024-04-02T20:13:22.114949586Z: [INFO]      partitioner.adaptive.partitioning.enable = true
2024-04-02T20:13:22.114953587Z: [INFO]      partitioner.availability.timeout.ms = 6000
2024-04-02T20:13:22.114957587Z: [INFO]      partitioner.class = null
2024-04-02T20:13:22.114961687Z: [INFO]      partitioner.ignore.keys = false
2024-04-02T20:13:22.114965687Z: [INFO]      receive.buffer.bytes = 32768
2024-04-02T20:13:22.114969587Z: [INFO]      reconnect.backoff.max.ms = 1000
2024-04-02T20:13:22.114973687Z: [INFO]      reconnect.backoff.ms = 50
2024-04-02T20:13:22.114977587Z: [INFO]      request.timeout.ms = 30000
2024-04-02T20:13:22.114981687Z: [INFO]      retries = 3
2024-04-02T20:13:22.114985987Z: [INFO]      retry.backoff.ms = 100
2024-04-02T20:13:22.114989987Z: [INFO]      sasl.client.callback.handler.class = null
2024-04-02T20:13:22.114994087Z: [INFO]      sasl.jaas.config = [hidden]
2024-04-02T20:13:22.114998188Z: [INFO]      sasl.kerberos.kinit.cmd = /usr/bin/kinit
2024-04-02T20:13:22.115002288Z: [INFO]      sasl.kerberos.min.time.before.relogin = 60000
2024-04-02T20:13:22.115006388Z: [INFO]      sasl.kerberos.service.name = null
2024-04-02T20:13:22.115010388Z: [INFO]      sasl.kerberos.ticket.renew.jitter = 0.05
2024-04-02T20:13:22.115014588Z: [INFO]      sasl.kerberos.ticket.renew.window.factor = 0.8
2024-04-02T20:13:22.115018688Z: [INFO]      sasl.login.callback.handler.class = class com.xxxxx.config.ProducerAuthCallbackHandler
2024-04-02T20:13:22.115022888Z: [INFO]      sasl.login.class = null
2024-04-02T20:13:22.115026888Z: [INFO]      sasl.login.connect.timeout.ms = null
2024-04-02T20:13:22.115031088Z: [INFO]      sasl.login.read.timeout.ms = null
2024-04-02T20:13:22.115035088Z: [INFO]      sasl.login.refresh.buffer.seconds = 300
2024-04-02T20:13:22.115039089Z: [INFO]      sasl.login.refresh.min.period.seconds = 60
2024-04-02T20:13:22.115043189Z: [INFO]      sasl.login.refresh.window.factor = 0.8
2024-04-02T20:13:22.115047189Z: [INFO]      sasl.login.refresh.window.jitter = 0.05
2024-04-02T20:13:22.115051289Z: [INFO]      sasl.login.retry.backoff.max.ms = 10000
2024-04-02T20:13:22.115055389Z: [INFO]      sasl.login.retry.backoff.ms = 100
2024-04-02T20:13:22.115059389Z: [INFO]      sasl.mechanism = OAUTHBEARER
2024-04-02T20:13:22.115063489Z: [INFO]      sasl.oauthbearer.clock.skew.seconds = 30
2024-04-02T20:13:22.115067489Z: [INFO]      sasl.oauthbearer.expected.audience = null
2024-04-02T20:13:22.115072889Z: [INFO]      sasl.oauthbearer.expected.issuer = null
2024-04-02T20:13:22.115077289Z: [INFO]      sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
2024-04-02T20:13:22.115081390Z: [INFO]      sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
2024-04-02T20:13:22.115085690Z: [INFO]      sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
2024-04-02T20:13:22.115098790Z: [INFO]      sasl.oauthbearer.jwks.endpoint.url = null
2024-04-02T20:13:22.115102990Z: [INFO]      sasl.oauthbearer.scope.claim.name = scope
2024-04-02T20:13:22.115107090Z: [INFO]      sasl.oauthbearer.sub.claim.name = sub
2024-04-02T20:13:22.115110890Z: [INFO]      sasl.oauthbearer.token.endpoint.url = null
2024-04-02T20:13:22.115114990Z: [INFO]      security.protocol = SASL_SSL
2024-04-02T20:13:22.115118790Z: [INFO]      security.providers = null
2024-04-02T20:13:22.115122791Z: [INFO]      send.buffer.bytes = 131072
2024-04-02T20:13:22.115126691Z: [INFO]      socket.connection.setup.timeout.max.ms = 30000
2024-04-02T20:13:22.115147991Z: [INFO]      socket.connection.setup.timeout.ms = 10000
2024-04-02T20:13:22.115152191Z: [INFO]      ssl.cipher.suites = null
2024-04-02T20:13:22.115156391Z: [INFO]      ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
2024-04-02T20:13:22.115160591Z: [INFO]      ssl.endpoint.identification.algorithm = https
2024-04-02T20:13:22.115164792Z: [INFO]      ssl.engine.factory.class = null
2024-04-02T20:13:22.115168992Z: [INFO]      ssl.key.password = null
2024-04-02T20:13:22.115173092Z: [INFO]      ssl.keymanager.algorithm = SunX509
2024-04-02T20:13:22.115177392Z: [INFO]      ssl.keystore.certificate.chain = null
2024-04-02T20:13:22.115386197Z: [INFO]      ssl.keystore.key = null
2024-04-02T20:13:22.115423798Z: [INFO]      ssl.keystore.location = null
2024-04-02T20:13:22.115429298Z: [INFO]      ssl.keystore.password = null
2024-04-02T20:13:22.116032312Z: [INFO]      ssl.keystore.type = JKS
2024-04-02T20:13:22.116052813Z: [INFO]      ssl.protocol = TLSv1.3
2024-04-02T20:13:22.116058413Z: [INFO]      ssl.provider = null
2024-04-02T20:13:22.116062513Z: [INFO]      ssl.secure.random.implementation = null
2024-04-02T20:13:22.116066813Z: [INFO]      ssl.trustmanager.algorithm = PKIX
2024-04-02T20:13:22.116070913Z: [INFO]      ssl.truststore.certificates = null
2024-04-02T20:13:22.116075313Z: [INFO]      ssl.truststore.location = null
2024-04-02T20:13:22.116079413Z: [INFO]      ssl.truststore.password = null
2024-04-02T20:13:22.116083413Z: [INFO]      ssl.truststore.type = JKS
2024-04-02T20:13:22.116087713Z: [INFO]      transaction.timeout.ms = 60000
2024-04-02T20:13:22.116091713Z: [INFO]      transactional.id = null
2024-04-02T20:13:22.116095814Z: [INFO]      value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2024-04-02T20:13:22.116100114Z: [INFO]  
2024-04-02T20:13:22.120245412Z: [INFO]  2024-04-02T20:13:22.120Z  INFO 44 --- [nPool-worker-17] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1022] Closing the Kafka producer with timeoutMillis = 0 ms.
2024-04-02T20:13:22.120278713Z: [INFO]  2024-04-02T20:13:22.120Z  INFO 44 --- [nPool-worker-17] o.apache.kafka.common.metrics.Metrics    : Metrics scheduler closed
2024-04-02T20:13:22.120307313Z: [INFO]  2024-04-02T20:13:22.120Z  INFO 44 --- [nPool-worker-17] o.apache.kafka.common.metrics.Metrics    : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-04-02T20:13:22.120332014Z: [INFO]  2024-04-02T20:13:22.120Z  INFO 44 --- [nPool-worker-17] o.apache.kafka.common.metrics.Metrics    : Metrics reporters closed
2024-04-02T20:13:22.120338014Z: [INFO]  2024-04-02T20:13:22.120Z  INFO 44 --- [nPool-worker-17] o.a.kafka.common.utils.AppInfoParser     : App info kafka.producer for producer-1022 unregistered

以下是发布者代码:

    public void publishOnToInternalTopic(EventMessageType eventMessageType, String message, String logMessage, String internalTopic) {
        
        kafkaTemplate.send(internalTopic, eventMessageType.name(), message)
                .whenComplete((result, ex) -> {

                    if (null == ex) {

                        log.info("### Published {} message");
                    } else {

                        log.error("### Unable to publish message");
                    }

                });

    }

任何人都可以提供一些解决方案/建议吗?

谢谢 比朱

我这边没有做任何改变。 看起来 kafkatemplate.send() 既没有返回控制权也没有抛出任何异常。

spring-kafka
2个回答
0
投票

如果一切正常,突然停止工作,那么您将需要关注基础设施。检查所有日志,如果没有重要的内容,则重新启动所有应用程序,包括 Kafka。

这应该可以解决您的问题,如果问题在一段时间后再次发生,则需要深入研究。


0
投票

当我更改应用服务计划的 SKU 时,它又开始工作了!

© www.soinside.com 2019 - 2024. All rights reserved.