我已经成功部署了一个独立的 Connect Worker,现在我想将其扩展到部署在本地网络上三台不同机器上的三个工作线程,假设它们的主机名分别是
server0, server1 and server2
。
所以我在每台机器上编译了这个 connect-distributed.properties 文件,如下所示:
bootstrap.servers = server0:9092,server1:9092,server2:9092
group.id=connect-group
key.converter.schemas.enable=true
value.converter.schemas.enable = true
offset.storage.replication.factor=3
offset.storage.partitions=25
config.storage.replication.factor=3
status.storage.replication.factor=3
status.storage.partitions=5
offset.flush.interval.ms=30000
listeners=HTTP://[hostnane]:8083
plugin.path=[connectors_dir]
大多数时候,只有一台服务器正确启动,而其余两台服务器只是启动无限的重新平衡过程,从日志来看,如下所示:
[timestamp] INFO [Worker clientId=connect-1, group=connect-group] Herder started (org.apache .kafka.connect.runtime.distributed.DistributedHerder:364)
...
[timestamp] INFO [Worker clientId=connect-1, group=connect-group] Discovered group coordinator server0:9092 (....)
[timestamp] INFO [Worker clientId=connect-1, group=connect-group] Rebalance started (...)
[timestamp] INFO [Worker clientId=connect-1, group=connect-group] (Re-)joining group (....)
[timestamp] INFO [Worker clientId=connect-1, group=connect-group] Succesfully joined goup with generation Generation {generationId= .....}
[timestamp] INFO [Worker clientId=connect-1, group=connect-group] Succesfully synced group in generation Generation {generationId= ....}
[timestamp] INFO [Worker clientId=connect-1, group=connect-group] Joined group at generation 7 with protocol version2 and got assignment: Assignment{error=0, leader= ....} with rebalance delay:0 (..... )
[timestamp] INFO [Worker clientId=connect-1, group=connect-group] Catching up to assignment's config offset. (....(
[timestamp] INFO [Worker clientId=connect-1, group=connect-group] Current config state offset -1 is behind group assignment 1, reading to end of config log (....)
[timestamp] INFO [Worker clientId=connect-1, group=connect-group] Finisched reading to end of lof and updated config spanshot, new config log offset: -1 (....)
[timestamp] INFO [Worker clientId=connect-1, group=connect-group] Current config state offset -1 is behind group assignment 1, reading to end of config log (...)
[timestamp] INFO [Worker clientId=connect-1, group=connect-group] Finished reading to end of log and updated config snapshot, new config offset: -1 (...)
[timestamp] INFO [Worker clientId=connect-1, group=connect-group] Current config state offset -1 is behind group assignment 1, reading to end of config log (...)
.....
ad infinitum
我检查了每个 connect-distributed.properties 文件是否具有相同的
group.id
。
我还尝试在
listeners
属性中写入每个 Connect 工作线程的主机名,但无济于事,这只会导致其他问题,而且我仍然遇到相同的无限重新平衡问题。
最后,我尝试回复
offset.storage.partitions
和status.storage.partitions
,但当然没有结果。
因此,在此处获得一些帮助后,我发现我误解了本文的第 3 节,并且我为集群中的每个 Connect Worker 设置了不同的配置主题(偏移、配置、状态)。
一旦我将相同的主题绑定到每个工人,它就开始运作良好。