MirrorMaker 2 - 事件中心连接在身份验证期间终止

问题描述 投票:0回答:1

目标

我们正在尝试使用 MirrorMaker 2 将源 Kafka(开源变体)上的主题镜像到 Azure 事件中心。我已经查看了 Kafka 和事件中心的文档。 MirrorMaker 2 在使用 Azure 事件中心上创建的 SAS 令牌连接到 Azure 事件中心时遇到问题。使用以下命令启动 MirrorMaker:

bin/connect-mirror-maker.sh config/connct-mirror-maker.properties

输出

        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
 (org.apache.kafka.clients.admin.AdminClientConfig:369)
[2023-08-01 15:49:05,445] INFO Successfully logged in. (org.apache.kafka.common.security.authenticator.AbstractLogin:61)
[2023-08-01 15:49:05,536] INFO These configurations '[producer.sasl.jaas.config, producer.bootstrap.servers, group.id, consumer.sasl.mechanism, admin.security.protocol, status.storage.replication.factor, consumer.security.protocol, admin.sasl.mechanism, offset.storage.topic, value.converter, key.converter, consumer.sasl.jaas.config, producer.security.protocol, config.storage.topic, status.storage.topic, header.converter, consumer.bootstrap.servers, producer.sasl.mechanism, config.storage.replication.factor, offset.storage.replication.factor, admin.sasl.jaas.config, admin.bootstrap.servers]' were supplied but are not used yet. (org.apache.kafka.clients.admin.AdminClientConfig:378)
[2023-08-01 15:49:05,536] INFO Kafka version: 3.5.0 (org.apache.kafka.common.utils.AppInfoParser:119)
[2023-08-01 15:49:05,536] INFO Kafka commitId: c97b88d5db4de28d (org.apache.kafka.common.utils.AppInfoParser:120)
[2023-08-01 15:49:05,536] INFO Kafka startTimeMs: 1690897745536 (org.apache.kafka.common.utils.AppInfoParser:121)
[2023-08-01 15:49:05,778] INFO [AdminClient clientId=source->destination] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:977)
[2023-08-01 15:49:05,780] WARN [AdminClient clientId=source->destination] Connection to node -1 (<eventhub-dns-name>/<IP address>:9093) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. (org.apache.kafka.clients.NetworkClient:807)
[2023-08-01 15:49:05,903] INFO [AdminClient clientId=source->destination] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:977)
[2023-08-01 15:49:05,903] WARN [AdminClient clientId=source->destination] Connection to node -1 (<eventhub-dns-name>/<IP address>:9093) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. (org.apache.kafka.clients.NetworkClient:807)
[2023-08-01 15:49:06,130] INFO [AdminClient clientId=source->destination] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:977)
[2023-08-01 15:49:06,130] WARN [AdminClient clientId=source->destination] Connection to node -1 (<eventhub-dns-name>/<IP address>:9093) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. (org.apache.kafka.clients.NetworkClient:807)
[2023-08-01 15:49:06,356] INFO [AdminClient clientId=source->destination] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:977)
[2023-08-01 15:49:06,356] WARN [AdminClient clientId=source->destination] Connection to node -1 (<eventhub-dns-name>/<IP address>:9093) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. (org.apache.kafka.clients.NetworkClient:807)

...

[2023-08-01 15:58:07,440] INFO [AdminClient clientId=source->destination] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata
[2023-08-01 15:58:07,441] INFO App info kafka.admin.client for source->destination unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2023-08-01 15:58:07,442] INFO [AdminClient clientId=source->destination] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: fetchMetadata
[2023-08-01 15:58:07,442] INFO [AdminClient clientId=source->destination] Timed out 1 remaining operation(s) during close. (org.apache.kafka.clients.admin.KafkaAdminClient:1359)
[2023-08-01 15:58:07,446] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:693)
[2023-08-01 15:58:07,446] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:697)
[2023-08-01 15:58:07,447] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:703)
[2023-08-01 15:58:07,447] ERROR Stopping due to error (org.apache.kafka.connect.mirror.MirrorMaker:369)
org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
        at org.apache.kafka.connect.runtime.WorkerConfig.lookupKafkaClusterId(WorkerConfig.java:283)
        at org.apache.kafka.connect.runtime.WorkerConfig.lookupKafkaClusterId(WorkerConfig.java:263)
        at org.apache.kafka.connect.runtime.WorkerConfig.kafkaClusterId(WorkerConfig.java:393)
        at org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:271)
        at java.base/java.lang.Iterable.forEach(Iterable.java:75)
        at org.apache.kafka.connect.mirror.MirrorMaker.<init>(MirrorMaker.java:156)
        at org.apache.kafka.connect.mirror.MirrorMaker.<init>(MirrorMaker.java:168)
        at org.apache.kafka.connect.mirror.MirrorMaker.<init>(MirrorMaker.java:172)
        at org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:356)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
        at org.apache.kafka.connect.runtime.WorkerConfig.lookupKafkaClusterId(WorkerConfig.java:277)
        ... 8 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes

我们的配置文件

clusters = source, destination
source.bootstrap.servers = localhost:9092
destination.bootstrap.servers = <eventhub-dns-name>:9093
destination.security.protocol=SASL_SSL
destination.sasl.mechanism=PLAIN
destination.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://<eventhub-dns-name>/;SharedAccessKeyName=<keyname>;SharedAccessKey=<primary-key>;EntityPath=<topic>";

source->destination.enabled = true
source->destination.topics = .*

replication.factor=3

checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3

offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3

防火墙不会阻止流量。我们已通过

telnet
验证了网络连接。此外,使用假 IP 地址会导致不同的连接错误。

我们已经仔细检查了 SAS 令牌。

我们尝试在同一虚拟机上使用 MirrorMaker 2 和两个本地开源 Kafka 单节点集群(不使用 SASL)。这有效。

我们尝试启用 DEBUG 日志级别,但没有产生更详细的日志记录信息。

我们不确定出了什么问题。有什么想法吗?

apache-kafka apache-kafka-connect azure-eventhub apache-kafka-mirrormaker
1个回答
0
投票

我也遇到同样的问题,请问你有解决办法吗?

© www.soinside.com 2019 - 2024. All rights reserved.