我正在尝试在 k8s 集群上运行 Kafka connect,我面临的问题是在提交连接器期间。我正在运行 4 个不同主题的任务,并希望将数据存储到 S3。
我创建了 4 个不同的连接器,当我尝试提交它时,我发现有时连接器运行良好,但大多数时候连接器启动,但没有任务正在运行,即使连接器显示正在运行。 当我查看错误日志时,我发现以下错误
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:80)
at org.apache.kafka.connect.runtime.rest.HerderRequestHandler.completeOrForwardRequest(HerderRequestHandler.java:112)
at org.apache.kafka.connect.runtime.rest.HerderRequestHandler.completeOrForwardRequest(HerderRequestHandler.java:127)
at org.apache.kafka.connect.runtime.rest.HerderRequestHandler.completeOrForwardRequest(HerderRequestHandler.java:137)
at org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource.putTaskConfigs(InternalClusterResource.java:85)
at jdk.internal.reflect.GeneratedMethodAccessor65.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:134)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:177)
at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:81)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:478)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:400)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81)
at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:256)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:235)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:358)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:311)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1656)
at org.eclipse.jetty.servlets.CrossOriginFilter.handle(CrossOriginFilter.java:319)
at org.eclipse.jetty.servlets.CrossOriginFilter.doFilter(CrossOriginFilter.java:273)
at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:552)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:191)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:516)
at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)
at java.base/java.lang.Thread.run(Thread.java:829)
[2024-02-27 21:16:32,648] INFO 127.0.0.1 - - [27/Feb/2024:21:16:32 +0000] "POST /connectors/SessionizerOutS3SinkConnector-H/tasks HTTP/1.1" 400 74 "-" "kafka-connect" 12 (org.apache.kafka.connect.runtime.rest.RestServer)
[2024-02-27 21:16:32,648] ERROR Error forwarding REST request (org.apache.kafka.connect.runtime.rest.RestClient)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$publishConnectorTaskConfigs$42(DistributedHerder.java:2054)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2024-02-27 21:16:32,650] ERROR [Worker clientId=connect-1, groupId=kafka-connect-master-group] Request to leader to reconfigure connector tasks failed (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$publishConnectorTaskConfigs$42(DistributedHerder.java:2054)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2024-02-27 21:16:32,650] ERROR [Worker clientId=connect-1, groupId=kafka-connect-master-group] Failed to reconfigure connector's tasks (SessionizerOutS3SinkConnector-H), retrying after backoff. (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$publishConnectorTaskConfigs$42(DistributedHerder.java:2054)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
我正在尝试阅读的主题有 50 个分区,并在连接器中尝试了不同数量的任务,但面临同样的问题。
我的Kafka-connect版本是7.5.1。
我尝试多次重新启动集群,但没有运气,尝试使用不同的参数值,认为可能是一些内存问题,但我错了。
有人可以帮我吗?
谢谢
您的问题根源在于Kafka Connect集群的REST API的安全配置。 对于
Kafka Connect
集群中的每个工作人员执行以下操作:
connect.protocol
。如果存在,则将此属性的值设置为 sessioned
。或者简单地从工作配置文件中删除此属性。这将使用默认密钥验证算法正确配置请求验证,以便工作进程和 REST API 端点之间的通信。