Kafka消费者无法连接到服务器

问题描述 投票:0回答:1

使用SASL_SCRAM-256时,Kafka消费者无法连接到服务器。这里是消费者配置和调试信息,我使用的是spring-kafka-2.1.11.RELEASE,因此Kafka-client的版本是1.0.2。我已经对对方的服务器IP和对方设置了防火墙对方也将我的IP添加到白名单中。 消费者配置:

auto.commit.interval.ms = 1000
112 auto.offset.reset = earliest
113 bootstrap.servers = [114.xxx.xxx.xxx:9092]
114 check.crcs = true
115 client.id =
116 connections.max.idle.ms = 540000
117 enable.auto.commit = true
118 exclude.internal.topics = true
119 fetch.max.bytes = 52428800
120 fetch.max.wait.ms = 500
121 fetch.min.bytes = 1
122 group.id = xxx
123 heartbeat.interval.ms = 100
124 interceptor.classes = null
125 internal.leave.group.on.close = true
126 isolation.level = read_uncommitted
127 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
128 max.partition.fetch.bytes = 1048576
129 max.poll.interval.ms = 300000
130 max.poll.records = 10
131 metadata.max.age.ms = 300000
132 metric.reporters = []
133 metrics.num.samples = 2
134 metrics.recording.level = INFO
135 metrics.sample.window.ms = 30000
136 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
137 receive.buffer.bytes = 65536
138 reconnect.backoff.max.ms = 1000
139 reconnect.backoff.ms = 50
140 request.timeout.ms = 305000
141 retry.backoff.ms = 100
142 sasl.jaas.config = [hidden]
143 sasl.kerberos.kinit.cmd = /usr/bin/kinit
144 sasl.kerberos.min.time.before.relogin = 60000
145 sasl.kerberos.service.name = null
146 sasl.kerberos.ticket.renew.jitter = 0.05
147 sasl.kerberos.ticket.renew.window.factor = 0.8
148 sasl.mechanism = SCRAM-SHA-256
149 security.protocol = SASL_PLAINTEXT
150 send.buffer.bytes = 131072
151 session.timeout.ms = 10000
152 ssl.cipher.suites = null
153 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
154 ssl.endpoint.identification.algorithm = null
155 ssl.key.password = null
156 ssl.keymanager.algorithm = SunX509
157 ssl.keystore.location = null
158 ssl.keystore.password = null
159 ssl.keystore.type = JKS
160 ssl.protocol = TLS
161 ssl.provider = null
162 ssl.secure.random.implementation = null
163 ssl.trustmanager.algorithm = PKIX
164 ssl.truststore.location = null
165 ssl.truststore.password = null
166 ssl.truststore.type = JKS
167 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

调试信息:

2023-12-22 23:54:58.051 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient  [Consumer clientId=consumer-1, groupId=xxx] Give up sending metadata request since no node is available
2023-12-22 23:54:58.102 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient  [Consumer clientId=consumer-1, groupId=xxx] Initialize connection to node 114.xxx.xxx.xxx:9092 (id: -1 rack: null) for sending metadata request
2023-12-22 23:54:58.102 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient  [Consumer clientId=consumer-1, groupId=xxx] Initiating connection to node 114.xxx.xxx.xxx:9092 (id: -1 rack: null)
2023-12-22 23:54:58.102 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 SaslClientAuthenticator  Set SASL client state to SEND_APIVERSIONS_REQUEST
2023-12-22 23:54:58.102 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 SaslClientAuthenticator  Creating SaslClient: client=null;service=kafka;serviceHostname=kafka154;mechs=[SCRAM-SHA-256]
2023-12-22 23:54:58.103 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 ScramSaslClient  Setting SASL/SCRAM_SHA_256 client state to SEND_CLIENT_FIRST_MESSAGE
2023-12-22 23:54:58.109 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 Selector  [Consumer clientId=consumer-1, groupId=xxx] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
2023-12-22 23:54:58.109 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 SaslClientAuthenticator  Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
2023-12-22 23:54:58.109 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient  [Consumer clientId=consumer-1, groupId=xxx] Completed connection to node -1. Fetching API versions.
2023-12-22 23:55:02.115 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 Selector  [Consumer clientId=consumer-1, groupId=xxx] Connection with kafka154/114.xxx.xxx.xxx disconnected
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:197)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104)
        at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:122)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
        at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:306)
        at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:392)
        at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:180)
        at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:81)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:474)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:258)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:230)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:221)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:153)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:284)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:712)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.lang.Thread.run(Thread.java:748)
2023-12-22 23:55:02.116 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient  [Consumer clientId=consumer-1, groupId=xxx] Node -1 disconnected.
2023-12-22 23:55:02.116 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient  [Consumer clientId=consumer-1, groupId=xxx] Give up sending metadata request since no node is available

知道是什么原因造成的吗?

apache-kafka spring-kafka sasl
1个回答
0
投票

问题已确定。机房的出口IP不包含在对方Kafka的白名单中。虽然我之前曾要求对方添加过一次白名单,但我并没有注意到本机的出口IP并不在前述的白名单中。 因为我能够和对方的broker建立TCP连接,所以一开始我没有考虑IP限制。实际上,对方添加的白名单并不是类似防火墙的白名单(否则连接不应该建立)。如果以后出现类似问题,建议先确认网络问题。

© www.soinside.com 2019 - 2024. All rights reserved.