Kafka 连接提交任务失败

问题描述 投票:0回答:1

我正在尝试在 k8s 集群上运行 Kafka connect,我面临的问题是在提交连接器期间。我正在运行 4 个不同主题的任务,并希望将数据存储到 S3。

我创建了 4 个不同的连接器,当我尝试提交它时,我发现有时连接器运行良好,但大多数时候连接器启动,但没有任务正在运行,即使连接器显示正在运行。 当我查看错误日志时,我发现以下错误

org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
 at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
 at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
 at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:80)
 at org.apache.kafka.connect.runtime.rest.HerderRequestHandler.completeOrForwardRequest(HerderRequestHandler.java:112)
 at org.apache.kafka.connect.runtime.rest.HerderRequestHandler.completeOrForwardRequest(HerderRequestHandler.java:127)
 at org.apache.kafka.connect.runtime.rest.HerderRequestHandler.completeOrForwardRequest(HerderRequestHandler.java:137)
 at org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource.putTaskConfigs(InternalClusterResource.java:85)
 at jdk.internal.reflect.GeneratedMethodAccessor65.invoke(Unknown Source)
 at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566)
 at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
 at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:134)
 at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:177)
 at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159)
 at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:81)
 at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:478)
 at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:400)
 at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81)
 at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:256)
 at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
 at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
 at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
 at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
 at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
 at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
 at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:235)
 at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)
 at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
 at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
 at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:358)
 at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:311)
 at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
 at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
 at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1656)
 at org.eclipse.jetty.servlets.CrossOriginFilter.handle(CrossOriginFilter.java:319)
 at org.eclipse.jetty.servlets.CrossOriginFilter.doFilter(CrossOriginFilter.java:273)
 at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
 at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)
 at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:552)
 at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
 at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624)
 at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
 at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440)
 at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
 at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505)
 at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594)
 at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
 at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355)
 at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
 at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:191)
 at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181)
 at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
 at org.eclipse.jetty.server.Server.handle(Server.java:516)
 at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
 at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
 at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
 at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
 at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
 at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
 at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
 at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
 at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
 at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
 at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
 at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
 at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
 at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)
 at java.base/java.lang.Thread.run(Thread.java:829)
[2024-02-27 21:16:32,648] INFO 127.0.0.1 - - [27/Feb/2024:21:16:32 +0000] "POST /connectors/SessionizerOutS3SinkConnector-H/tasks HTTP/1.1" 400 74 "-" "kafka-connect" 12 (org.apache.kafka.connect.runtime.rest.RestServer)
[2024-02-27 21:16:32,648] ERROR Error forwarding REST request (org.apache.kafka.connect.runtime.rest.RestClient)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
 at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
 at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
 at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$publishConnectorTaskConfigs$42(DistributedHerder.java:2054)
 at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
 at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:829)
[2024-02-27 21:16:32,650] ERROR [Worker clientId=connect-1, groupId=kafka-connect-master-group] Request to leader to reconfigure connector tasks failed (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
 at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
 at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
 at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$publishConnectorTaskConfigs$42(DistributedHerder.java:2054)
 at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
 at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:829)
[2024-02-27 21:16:32,650] ERROR [Worker clientId=connect-1, groupId=kafka-connect-master-group] Failed to reconfigure connector's tasks (SessionizerOutS3SinkConnector-H), retrying after backoff. (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
 at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
 at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
 at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$publishConnectorTaskConfigs$42(DistributedHerder.java:2054)
 at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
 at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:829)

我正在尝试阅读的主题有 50 个分区,并在连接器中尝试了不同数量的任务,但面临同样的问题。

我的Kafka-connect版本是7.5.1。

我尝试多次重新启动集群,但没有运气,尝试使用不同的参数值,认为可能是一些内存问题,但我错了。

有人可以帮我吗?

谢谢

apache-kafka kafka-consumer-api apache-kafka-connect s3-kafka-connector
1个回答
0
投票

您的问题根源在于Kafka Connect集群的REST API的安全配置。 对于

Kafka Connect
集群中的每个工作人员执行以下操作:

  1. 检查工作配置文件中的属性
    connect.protocol
    。如果存在,则将此属性的值设置为
    sessioned
    。或者简单地从工作配置文件中删除此属性。
  2. 重新启动工作线程。

这将使用默认密钥验证算法正确配置请求验证,以便工作进程和 REST API 端点之间的通信。

更多参考:Apache 论坛有关保护 REST 端点安全的讨论

© www.soinside.com 2019 - 2024. All rights reserved.