kafka-connect-bigquery 连接器故障

问题描述 投票:0回答:1

我正在利用 wepay/kakfa-connect-bigquery:2.5.0 并使用它将表复制到 BigQuery。对于某些主题,写入 BigQuery 时经常会遇到错误,然后数据复制完全中断。 如果我从连接器的配置中删除该主题,其他表的复制将起作用。

我无法复制主题,因为每当我将其添加回配置时,都会出现

DEBUG ProcessingContext is already in failed state. Ignoring requested operation.
的添加调试日志,然后连接器失败并出现以下错误。

如何要求 kafka-connect-bigquery 重置其对错误主题的处理并再次开始复制?

[2023-08-28 08:13:29,103] ERROR WorkerSinkTask{id=bigquery-loanservice-sink-prod-3-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: A write thread has failed with an unrecoverable error (org.apache.kafka.connect.runtime.WorkerSinkTask)
com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: A write thread has failed with an unrecoverable error
Caused by: Failed to write to table
    at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.lambda$maybeThrowEncounteredError$0(KCBQThreadPoolExecutor.java:101)
    at java.base/java.util.Optional.ifPresent(Optional.java:183)
    at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredError(KCBQThreadPoolExecutor.java:100)
    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.writeSinkRecords(BigQuerySinkTask.java:259)
    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to write to table
Caused by: java.lang.IllegalArgumentException
    at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:106)
    ... 3 more
Caused by: com.google.cloud.bigquery.BigQueryException: java.lang.IllegalArgumentException
    at com.google.cloud.bigquery.BigQueryException.translateAndThrow(BigQueryException.java:122)
    at com.google.cloud.bigquery.BigQueryImpl.insertAll(BigQueryImpl.java:1076)
    at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.performWriteRequest(AdaptiveBigQueryWriter.java:96)
    at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:116)
    at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:93)
    ... 3 more
Caused by: java.lang.IllegalArgumentException
    at com.google.common.base.Preconditions.checkArgument(Preconditions.java:131)
    at com.google.api.client.util.Preconditions.checkArgument(Preconditions.java:35)
    at com.google.api.client.json.JsonGenerator.serialize(JsonGenerator.java:128)
    at com.google.api.client.json.JsonGenerator.serialize(JsonGenerator.java:173)
    at com.google.api.client.json.JsonGenerator.serialize(JsonGenerator.java:173)
    at com.google.api.client.json.JsonGenerator.serialize(JsonGenerator.java:173)
    at com.google.api.client.json.JsonGenerator.serialize(JsonGenerator.java:146)
    at com.google.api.client.json.JsonGenerator.serialize(JsonGenerator.java:173)
    at com.google.api.client.json.JsonGenerator.serialize(JsonGenerator.java:105)
    at com.google.api.client.http.json.JsonHttpContent.writeTo(JsonHttpContent.java:73)
    at com.google.api.client.http.GZipEncoding.encode(GZipEncoding.java:53)
    at com.google.api.client.http.HttpEncodingStreamingContent.writeTo(HttpEncodingStreamingContent.java:48)
    at com.google.api.client.http.javanet.NetHttpRequest$DefaultOutputWriter.write(NetHttpRequest.java:76)
    at com.google.api.client.http.javanet.NetHttpRequest.writeContentToOutputStream(NetHttpRequest.java:174)
    at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:117)
    at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:514)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:455)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:565)
    at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.insertAll(HttpBigQueryRpc.java:492)
    at com.google.cloud.bigquery.BigQueryImpl$28.call(BigQueryImpl.java:1068)
    at com.google.cloud.bigquery.BigQueryImpl$28.call(BigQueryImpl.java:1065)
    at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103)
    at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
    at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
    at com.google.cloud.bigquery.BigQueryImpl.insertAll(BigQueryImpl.java:1064)
    ... 6 more
apache-kafka apache-kafka-connect
1个回答
0
投票

重置错误主题的处理

我建议先查看您的

errors.tolerance
设置。

否则,您可以使用

kafka-consumer-groups
命令向前查找接收器连接器组中特定主题的偏移量

顺便说一下,连接器的最新版本是2.5.2,所以也许升级已经解决了这个问题

© www.soinside.com 2019 - 2024. All rights reserved.