自定义Kafka连接器:连接器已出错,但任务仍在运行。

问题描述 投票:0回答:1

我正在构建一个自定义Kafka源连接器。这个连接器已经启动并运行。我使用Rest API来更新连接器的配置设置(例如Source端点和连接密钥)。我使用下面的Rest API

PUT /connectors/{name}/config

由于某些问题(例如,错误的密钥),API不成功,连接器失败。

API输出如下。请注意,连接器处于失败状态,但任务仍在运行。事实上,它们仍然 "连接 "到旧的端点和连接密钥,而不是优雅地终止任务。

{
    "name": "CustomSrcConnector_V1",
    "connector": {
        "state": "FAILED",
        "worker_id": "XXX.XX.XX.XX:8083",
        "trace": "org.apache.kafka.connect.errors.ConnectException: Exception happened at Create : Exception in getPartitions()\n\tat com.xxx.yyy.kafka.connect.eventhub.CustomSourceConnector.start(CustomSourceConnector.java:102)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)\n\tat org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:242)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:908)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.processConnectorConfigUpdates(DistributedHerder.java:345)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:317)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:219)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "XXX.XX.XX.XX:8083"
        },
        {
            "id": 1,
            "state": "RUNNING",
            "worker_id": "XXX.XX.XX.XX:8083"
        }
    ],
    "type": "source"
}

我知道任务和连接器通常是解耦的。然而,我想知道,如果连接器在Conenctor配置更新期间出错,是否有任何方法可以优雅地关闭任务。

apache-kafka task apache-kafka-connect connector
1个回答
0
投票

到目前为止,还没有办法在连接器失败时强制执行失败任务。

到目前为止,我发现的解决方案是在源连接器中捕获任何异常,然后不在源连接器中引发异常,而是允许 taskConfigs 方法运行并重新初始化任务。我们需要允许任务失败,所以结果是我们可以让连接器处于RUNNING状态,但任务为FAILED.这将与上面Iskuskov Alexander分享的例子一致。

https:/issues.apache.orgjirabrowseKAFKA-9639。

© www.soinside.com 2019 - 2024. All rights reserved.