在Kafka-Connect中自动重新连接失败的任务

问题描述 投票:2回答:1

我正在通过Kafka-connect使用mongo-source插件。我检查了source任务状态,它正在运行并且正在监听mongo集合。

我手动停止了mongod服务,等待了大约1分钟,然后重新启动了它。

我检查了源任务,看是否有什么东西可以自行修复,并且在30分钟后似乎什么也没有用。

仅在重新启动连接器后,它才能再次开始工作。

由于mongo-source没有设置超时时重试+退避的选项,我搜索了一个适合简单情况的配置:使用Kafka-connect配置在X时间后重新启动失败的任务。找不到任何..:/我可以用一个简单的脚本来做到这一点,但在Kafka-connect中必须有一些可以管理失败任务的东西。甚至在mongo-source中...我都不希望它在1分钟后就这么快失败...:/

apache-kafka apache-kafka-connect mongodb-kafka-connector
1个回答
3
投票

除了使用REST API找不到失败的任务并提交重新启动请求外,没有其他方法-然后定期运行此请求。例如

curl -s "http://localhost:8083/connectors?expand=status" | \
  jq -c -M 'map({name: .status.name } +  {tasks: .status.tasks}) | .[] | {task: ((.tasks[]) + {name: .name})}  | select(.task.state=="FAILED") | {name: .task.name, task_id: .task.id|tostring} | ("/connectors/"+ .name + "/tasks/" + .task_id + "/restart")' | \
  xargs -I{connector_and_task} curl -v -X POST "http://localhost:8083"\{connector_and_task\}

来源:https://rmoff.net/2019/06/06/automatically-restarting-failed-kafka-connect-tasks/

© www.soinside.com 2019 - 2024. All rights reserved.