我正在尝试设置Confluent Kafka平台的docker环境,并与Debezium SQL Server源连接器集成。
我遵循了Kafka平台的this Confluent指南,然后是SQL Server Source Connector的this Debezium教程。
我的经纪人容器被命名为broker
,该容器与其余的其他容器(包括connect
容器)位于同一网络中,并且确保它们可以ping通和telnet。
在Debezium教程中,我陷入了Start The Debezium SQL Server Connector的步骤,因为我收到一条错误消息,该错误指示连接器正在尝试通过localhost:9092
而不是broker:9092
访问Kafka代理:
[2020-03-29 10:46:30,907] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:32,114] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:32,969] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:34,127] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:35,333] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:36,238] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
直到最终超时:
[2020-03-29 10:46:38,664] ERROR WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times in 60000ms
有趣的是,我可以看到在日志开头成功接收到我的配置(查找broker:9092
):
[2020-03-29 10:45:38,618] INFO database.history.kafka.bootstrap.servers = broker:9092 (io.debezium.connector.common.BaseSourceTask)
...
[2020-03-29 10:45:38,655] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [broker:9092]
check.crcs = true
client.dns.lookup = default
client.id = server1-dbhistory
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = server1-dbhistory
group.instance.id = null
这是我的配置文件:register-sqlserver.json
:
{
"name": "inventory-connector",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max" : "1",
"database.server.name" : "server1",
"database.hostname" : "sqlserver_1",
"database.port" : "1433",
"database.user" : "sa",
"database.password" : "Password!",
"database.dbname" : "testDB",
"database.history.kafka.bootstrap.servers" : "broker:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
我通过主机添加连接器,如下所示(与指南一样):
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
我显示的日志是connect
容器日志的输出。
没有其他localhost
词出现在我的完整日志中,因此不必担心默认值localhost
的其他配置,我可能会错过它。
将非常感谢您的帮助:)
问题归结于公告的听众。
您正在连接到9092
上的代理,每个the config用于将其主机广告为localhost
的侦听器
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
这意味着客户端(在本例中为Debezium)最初将连接到您为其提供的引导服务器(broker:9092
),但是代理将把已播发的主机(localhost
)移交给客户端。然后客户端将尝试连接到该客户端。由于它们位于单独的实例上,因此Debeziu的localhost
连接器不是代理,并且连接失败。
参考:https://rmoff.net/2018/08/02/kafka-listeners-explained/
解决方案:
使用端口29092
,根据上面的配置,该端口绑定到broker
通告的主机,该主机将从Debezium容器中正确解析]
"database.history.kafka.bootstrap.servers" : "broker:29092"